Bläddra i källkod

git-svn-id: https://192.168.57.71/svn/jsgkj@290 931142cf-59ea-a443-aa0e-51397b428577

xt_yuanxd 9 år sedan
förälder
incheckning
417bde8533

+ 52 - 16
xtdsp/trunk/src/main/java/com/xt/dsp/bean/TaskCacheBean.java

@@ -12,28 +12,43 @@ import org.springframework.util.StringUtils;
  * @author yuanxd
  *
  */
-public class TaskCacheBean {
+public class TaskCacheBean implements Comparable<TaskCacheBean> {
 	private final Logger log = LoggerFactory.getLogger(TaskCacheBean.class);
 	private String cacheFolder;
 	private String subFolder;
+	private Object cacheData;
+	private String fileName;
+	private long timeMillis;
+	private long num = 0;
+	private String taskId;
 
-	public String getSubFolder() {
-		return subFolder;
+	@Override
+	public int compareTo(TaskCacheBean o) {
+		int t = Long.compare(this.timeMillis, o.timeMillis);
+		if (t == 0) {
+			t = Long.compare(this.num, o.num);
+		}
+		return t;
 	}
 
-	public void setSubFolder(String subFolder) {
-		this.subFolder = subFolder;
+	public Object getCacheData() {
+		return cacheData;
 	}
 
-	private Object cacheData;
-	private String fileName;
+	public String getCacheFolder() {
+		return cacheFolder;
+	}
 
 	public String getFileName() {
 		return fileName;
 	}
 
-	public void setFileName(String fileName) {
-		this.fileName = fileName;
+	public String getSubFolder() {
+		return subFolder;
+	}
+
+	public String getTaskId() {
+		return taskId;
 	}
 
 	/**
@@ -66,26 +81,47 @@ public class TaskCacheBean {
 			if (f.isDirectory() && f.canRead() && f.canWrite()) {
 				this.cacheFolder = "";
 			} else {
-				log.error("设置缓存目录异常:{}", folder);
+				log.error("设置缓存目录{}异常,将使用默认目录缓存!", folder);
 				return -1;
 			}
 		}
 		return 0;
 	}
 
-	public String getCacheFolder() {
-		return cacheFolder;
+	public void setCacheData(Object cacheData) {
+		this.cacheData = cacheData;
 	}
 
 	public void setCacheFolder(String cacheFolder) {
 		this.cacheFolder = cacheFolder;
 	}
 
-	public Object getCacheData() {
-		return cacheData;
+	public void setFileName(String fileName) {
+		this.fileName = fileName;
+		if (fileName != null && fileName.length() > 0) {
+			String[] props = fileName.split("-");
+			if (props.length > 0) {
+				this.taskId = props[0];
+			}
+			if (props.length > 1) {
+				this.timeMillis = Long.parseLong(props[1]);
+			}
+			if (props.length > 2) {
+				this.num = Long.parseLong(props[2]);
+			}
+		}
 	}
 
-	public void setCacheData(Object cacheData) {
-		this.cacheData = cacheData;
+	public void setSubFolder(String subFolder) {
+		this.subFolder = subFolder;
+	}
+
+	public void setTaskId(String taskId) {
+		this.taskId = taskId;
+	}
+
+	@Override
+	public String toString() {
+		return "TaskCacheBean [fileName=" + fileName + "]";
 	}
 }

+ 106 - 57
xtdsp/trunk/src/main/java/com/xt/dsp/job/SqlSynJob.java

@@ -8,6 +8,7 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -19,7 +20,6 @@ import org.quartz.SchedulerContext;
 import org.quartz.SchedulerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationContext;
 import org.springframework.dao.DataAccessException;
 import org.springframework.jdbc.core.ConnectionCallback;
@@ -32,7 +32,6 @@ import org.springframework.util.StringUtils;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.xt.dsp.bean.TaskCacheBean;
-import com.xt.dsp.jdbc.JDBCTools;
 import com.xt.dsp.jdbc.JdbcTemplateUtils;
 import com.xt.dsp.model.DataSourceBean;
 import com.xt.dsp.model.JobBean;
@@ -44,16 +43,29 @@ import com.xt.dsp.service.JobService;
 import com.xt.dsp.service.TaskService;
 import com.xt.dsp.service.TaskSqlService;
 
+/**
+ * 工作调度控制
+ * 
+ * @author yuanxd
+ *
+ */
 public class SqlSynJob implements Job {
+	/** 日志记录 */
 	private final Logger log = LoggerFactory.getLogger(SqlSynJob.class);
+	/** 任务服务接口 */
 	private TaskService taskService;
+	/** sql抽取任务服务接口 */
 	private TaskSqlService taskSqlService;
+	/** 数据源服务接口 */
 	private DataSourceService dataSourceService;
+	/** 当前执行的任务对象 */
 	private JobBean job = null;
+	/** 任务缓存服务 */
 	private CacheBeanService cacheBeanService;
 
 	@Override
 	public void execute(JobExecutionContext context) throws JobExecutionException {
+		long start = System.currentTimeMillis();
 		job = (JobBean) context.getJobDetail().getJobDataMap().get(JobService.PARAM_JOB);
 		if (null == job) {
 			throw new IllegalArgumentException("未找到执行任务信息!");
@@ -61,14 +73,9 @@ public class SqlSynJob implements Job {
 		if (StringUtils.isEmpty(job.getCode())) {
 			throw new IllegalArgumentException("job code不可为空!");
 		}
-		// 获取JobExecutionContext中的service对象
 		try {
-			// 获取JobExecutionContext中的service对象
 			SchedulerContext schCtx = context.getScheduler().getContext();
-			// 获取Spring中的上下文
 			ApplicationContext appCtx = (ApplicationContext) schCtx.get("applicationContext");
-			// User u = securityMgr.userService().findByUname("admin");
-			// System.err.println(u.getId());
 			this.taskService = appCtx.getBean(TaskService.class);
 			this.taskSqlService = appCtx.getBean(TaskSqlService.class);
 			this.dataSourceService = appCtx.getBean(DataSourceService.class);
@@ -82,33 +89,97 @@ public class SqlSynJob implements Job {
 			}
 		} catch (SchedulerException e1) {
 			e1.printStackTrace();
+		} finally {
+			long cost = System.currentTimeMillis() - start;
+			log.debug("任务执行完成,耗时:{}", cost);
 		}
 	}
 
 	/**
-	 * sql同步任务处理
+	 * SQL同步任务处理
 	 * 
 	 * @param taskSqlBeans
 	 */
 	private void doTaskSqlBeans(TaskBean t) {
+		// 执行缓存
+		executeCache(t);
 		List<TaskSqlBean> taskSqlBeans = taskSqlService.selectByTaskId(t.getId());
 		if (taskSqlBeans == null || taskSqlBeans.size() == 0) {
 			return;
 		}
 		for (TaskSqlBean ts : taskSqlBeans) {
 			JSONObject result = getSqlResultJsonObject(ts);
-			int rest = runTaskSqlWithJdbcTemplate(ts, result);
-			if (rest != 0) {
-				String fileName = ts.getId() + "-" + System.currentTimeMillis();
-				TaskCacheBean cacheBean = new TaskCacheBean();
-				cacheBean.setAvalidCacheFolder(t.getCacheFolder());
-				cacheBean.setCacheData(result);
-				cacheBean.setFileName(fileName);
-				cacheBeanService.writeCache(cacheBean);
+			if (null == result || result.getJSONArray("data").size() == 0) {
+				continue;
+			}
+			if (TaskBean.CACHE_USE.equals(t.getCacheUse())) {
+				saveCache(t, ts, result);
+			} else {
+				int rest = runTaskSqlWithJdbcTemplate(t, ts, result);
+				if (rest != 0) {
+
+				}
 			}
 		}
 	}
 
+	/**
+	 * 执行缓存,执行成功返回0,否则返回其他
+	 * 
+	 * @param t
+	 * @return
+	 */
+	private int executeCache(TaskBean t) {
+		List<TaskCacheBean> taskDatas = cacheBeanService.readCache(t.getCacheFolder());
+		// 无缓存数据
+		if (taskDatas.size() == 0) {
+			t.setCacheUse(TaskBean.CACHE_UNUSE);
+			taskService.updateByPrimaryKey(t);
+			return 0;
+		}
+		Collections.sort(taskDatas);
+		Map<String, TaskSqlBean> tsbs = new HashMap<>();
+		for (TaskCacheBean tc : taskDatas) {
+			String tsbid = tc.getTaskId();
+			TaskSqlBean bean = null;
+			if (tsbs.containsKey(tsbid)) {
+				bean = tsbs.get(tsbid);
+			} else {
+				bean = taskSqlService.selectByPrimaryKey(tsbid);
+				if (null == bean) {
+					return -1;
+				}
+				tsbs.put(tsbid, bean);
+			}
+			if (null == bean) {
+				return -1;
+			}
+			runTaskSqlWithJdbcTemplate(t, bean, JSONObject.parseObject((String) tc.getCacheData()));
+		}
+		t.setCacheUse(TaskBean.CACHE_UNUSE);
+		taskService.updateByPrimaryKey(t);
+		return 0;
+	}
+
+	private void saveCache(TaskBean t, TaskSqlBean ts, JSONObject result) {
+		String fileName = ts.getId() + "-" + System.currentTimeMillis();
+		TaskCacheBean cacheBean = new TaskCacheBean();
+		cacheBean.setAvalidCacheFolder(t.getCacheFolder());
+		cacheBean.setCacheData(result);
+		cacheBean.setFileName(fileName);
+		cacheBeanService.writeCache(cacheBean);
+		cacheBeanService.writeCache(cacheBean);
+		cacheBeanService.writeCache(cacheBean);
+		t.setCacheUse(TaskBean.CACHE_USE);
+		taskService.updateByPrimaryKey(t);
+	}
+
+	/**
+	 * 获取sql检索结果
+	 * 
+	 * @param taskSqlBean
+	 * @return
+	 */
 	private JSONObject getSqlResultJsonObject(TaskSqlBean taskSqlBean) {
 		String dsid = taskSqlBean.getSrcConn();
 		DataSourceBean dsb = dataSourceService.selectByPrimaryKey(dsid);
@@ -135,21 +206,34 @@ public class SqlSynJob implements Job {
 					}
 					results.add(resultJson);
 				}
+				if (results.size() == 0) {
+					return null;
+				}
 				result.put("columns", columns.toString());
 				result.put("data", results);
 				return result;
 			}
 		});
-		log.info("result:{}", result.toJSONString());
+		log.debug("result:{}", result);
 		return result;
 	}
 
-	private int runTaskSqlWithJdbcTemplate(final TaskSqlBean taskSqlBean, JSONObject result) {
+	/**
+	 * 结果同步到目标源
+	 * 
+	 * @param taskSqlBean
+	 * @param result
+	 * @return
+	 */
+	private int runTaskSqlWithJdbcTemplate(final TaskBean t, final TaskSqlBean taskSqlBean, JSONObject result) {
 		try {
 			// 此处应在插入或更新失败时回滚,并把查询结果以文件方式保存,留待以后再执行此任务;
 			String[] srcColumns = result.getString("columns").split(";");
 			String dsid = taskSqlBean.getTargetConn();
 			DataSourceBean dsb = dataSourceService.selectByPrimaryKey(dsid);
+			if (null == dsb) {
+				throw new RuntimeException("数据源获取失败:" + dsid);
+			}
 			JdbcTemplate template = JdbcTemplateUtils.getJdbcTemplate(dsb);
 			final Collection<String> srcColumnsCollection = Arrays.asList(srcColumns);
 			Map<String, List<String>> targetTableColumns = template
@@ -225,11 +309,13 @@ public class SqlSynJob implements Job {
 
 			insertSql.append(")VALUES(").append(insertParaSql);
 			insertSql.append(")");
-			System.err.println(insertSql.toString());
-			System.err.println(updateSql.toString());
+			log.debug("insertSql:{}", insertSql.toString());
+			log.debug("updateSql:{}", updateSql.toString());
 			JSONArray data = (JSONArray) result.get("data");
 			return doCommit(data, insertSql.toString(), updateSql.toString(), tableColumns, template);
 		} catch (Exception e) {
+			log.error("执行插入失败:{}", e.getMessage());
+			saveCache(t, taskSqlBean, result);
 			return -1;
 		}
 	}
@@ -253,7 +339,6 @@ public class SqlSynJob implements Job {
 					param.put(col, jo.get(col));
 				}
 				int count = nameJdbcTemplate.update(updateSql, new MapSqlParameterSource(param));
-				System.err.println("count:" + count);
 				if (count == 0) {
 					nameJdbcTemplate.update(insertSql, param);
 				}
@@ -264,40 +349,4 @@ public class SqlSynJob implements Job {
 
 		return 0;
 	}
-
-	/**
-	 * 执行单个同步任务
-	 * 
-	 * @param taskSqlBean
-	 */
-	private void runTaskSql(TaskSqlBean taskSqlBean) {
-		JSONObject srcJson = JSONObject.parseObject(taskSqlBean.getSrcConn());
-		JSONObject targetJson = JSONObject.parseObject(taskSqlBean.getTargetConn());
-		String sql = taskSqlBean.getSql();
-		JDBCTools jdbc = new JDBCTools(srcJson);
-		JSONObject result = new JSONObject();
-		try {
-			ResultSet rs = jdbc.getResultSet(sql);
-			ResultSetMetaData rsmd = rs.getMetaData();
-			List<String> columns = new ArrayList<>();
-			for (int i = 0; i < rsmd.getColumnCount(); i++) {
-				columns.add(rsmd.getColumnName(i + 1));
-			}
-			JSONArray results = new JSONArray();
-			while (rs.next()) {
-				JSONObject resultJson = new JSONObject();
-				for (String column : columns) {
-					resultJson.put(column, rs.getObject(column));
-				}
-				results.add(resultJson);
-			}
-			result.put("columns", columns);
-			result.put("data", results);
-		} catch (SQLException e) {
-			e.printStackTrace();
-		} finally {
-			jdbc.releaseDB();
-		}
-		log.info("result:{}", result.toJSONString());
-	}
 }

+ 14 - 1
xtdsp/trunk/src/main/java/com/xt/dsp/model/TaskBean.java

@@ -9,6 +9,10 @@ package com.xt.dsp.model;
 public class TaskBean {
 	/** SQL->TABLE任务 */
 	public static final String TYPE_SQL = "1";
+	/** 使用系统缓存 */
+	public static final String CACHE_USE = "1";
+	/** 未使用系统缓存 */
+	public static final String CACHE_UNUSE = "0";
 
 	private String id;
 
@@ -19,14 +23,19 @@ public class TaskBean {
 	private String jobCode;
 
 	private String type;
-
 	/** 文件缓存目录 */
 	private String cacheFolder;
+	/** 缓存是否在使用中 */
+	private String cacheUse;
 
 	public String getCacheFolder() {
 		return cacheFolder;
 	}
 
+	public String getCacheUse() {
+		return cacheUse;
+	}
+
 	public String getCode() {
 		return code;
 	}
@@ -51,6 +60,10 @@ public class TaskBean {
 		this.cacheFolder = cacheFolder;
 	}
 
+	public void setCacheUse(String cacheUse) {
+		this.cacheUse = cacheUse;
+	}
+
 	public void setCode(String code) {
 		this.code = code == null ? null : code.trim();
 	}

+ 3 - 1
xtdsp/trunk/src/main/java/com/xt/dsp/service/CacheBeanService.java

@@ -1,9 +1,11 @@
 package com.xt.dsp.service;
 
+import java.util.List;
+
 import com.xt.dsp.bean.TaskCacheBean;
 
 public interface CacheBeanService {
 	public int writeCache(TaskCacheBean taskCacheBean);
 
-	public TaskCacheBean readCache(TaskCacheBean taskCacheBean);
+	public List<TaskCacheBean> readCache(String folder);
 }

+ 7 - 0
xtdsp/trunk/src/main/java/com/xt/dsp/service/TaskService.java

@@ -6,4 +6,11 @@ import com.xt.dsp.model.TaskBean;
 
 public interface TaskService {
 	List<TaskBean> selectByJobCode(String code);
+
+	/**
+	 * 更新
+	 * @param record
+	 * @return
+	 */
+	public int updateByPrimaryKey(TaskBean record);
 }

+ 2 - 0
xtdsp/trunk/src/main/java/com/xt/dsp/service/TaskSqlService.java

@@ -6,4 +6,6 @@ import com.xt.dsp.model.TaskSqlBean;
 
 public interface TaskSqlService {
 	List<TaskSqlBean> selectByTaskId(String taskId);
+
+	TaskSqlBean selectByPrimaryKey(String id);
 }

+ 46 - 23
xtdsp/trunk/src/main/java/com/xt/dsp/serviceImpl/CacheBeanServiceImpl.java

@@ -1,13 +1,19 @@
 package com.xt.dsp.serviceImpl;
 
 import java.io.File;
+import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
+import org.springframework.util.FileCopyUtils;
 
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
 import com.xt.dsp.bean.TaskCacheBean;
 import com.xt.dsp.service.CacheBeanService;
 
@@ -15,6 +21,43 @@ import com.xt.dsp.service.CacheBeanService;
 public class CacheBeanServiceImpl implements CacheBeanService {
 	private final Logger log = LoggerFactory.getLogger(CacheBeanServiceImpl.class);
 
+	private String getTaskCacheBeanData(TaskCacheBean taskCacheBean) {
+		Object data = taskCacheBean.getCacheData();
+		if (data == null) {
+			return "";
+		}
+		if (data instanceof String) {
+			return (String) data;
+		}
+		if (data instanceof JSONObject) {
+			JSONObject dataJson = (JSONObject) data;
+			return dataJson.toJSONString();
+		}
+		return String.valueOf(data);
+	}
+
+	@Override
+	public List<TaskCacheBean> readCache(String cacheFolder) {
+		File folder = new File(cacheFolder);
+		if (!folder.exists()) {
+			return null;
+		}
+		List<TaskCacheBean> taskCacheBeanList = new ArrayList<>();
+		for (File f : folder.listFiles()) {
+			TaskCacheBean tc = new TaskCacheBean();
+			tc.setFileName(f.getName());
+			try {
+				String data = FileCopyUtils.copyToString(new FileReader(f));
+				tc.setCacheData(data);
+				taskCacheBeanList.add(tc);
+				f.delete();
+			} catch (IOException e) {
+				log.error("file read error:{}", e.getMessage());
+			}
+		}
+		return taskCacheBeanList;
+	}
+
 	@Override
 	public int writeCache(TaskCacheBean taskCacheBean) {
 		// String fileName = job.getId() + "-" + ts.getTaskId() + "-" +
@@ -22,39 +65,19 @@ public class CacheBeanServiceImpl implements CacheBeanService {
 		String cacheFolder = taskCacheBean.getCacheFolder() == null ? "" : taskCacheBean.getCacheFolder();
 		File folder = new File(cacheFolder);
 		String fileName = taskCacheBean.getFileName();
-		File f = new File(folder, fileName);
 		int count = 0;
+		File f = new File(folder, fileName + "-" + (count));
 		while (f.exists()) {
 			f = new File(folder, fileName + "-" + (++count));
 		}
 		if (count > 0) {
 			f = new File(folder, fileName + "-" + count);
 		}
-		FileWriter fw = null;
 		try {
-			log.info("write to file:{}", f.getAbsolutePath());
-			fw = new FileWriter(f);
-			fw.write(getTaskCacheBeanData(taskCacheBean));
+			FileCopyUtils.copy(getTaskCacheBeanData(taskCacheBean), new FileWriter(f));
 		} catch (IOException e) {
-			log.error("缓存{}保存失败:{}", taskCacheBean.getFileName(), e.getMessage());
-			return -1;
-		} finally {
-			if (null != fw) {
-				try {
-					fw.close();
-				} catch (IOException e) {
-				}
-			}
+			log.error("缓存写入失败:{}", e.getMessage());
 		}
 		return 0;
 	}
-
-	private String getTaskCacheBeanData(TaskCacheBean taskCacheBean) {
-		return "";
-	}
-
-	@Override
-	public TaskCacheBean readCache(TaskCacheBean taskCacheBean) {
-		return null;
-	}
 }

+ 11 - 0
xtdsp/trunk/src/main/java/com/xt/dsp/serviceImpl/TaskServiceImpl.java

@@ -19,4 +19,15 @@ public class TaskServiceImpl implements TaskService {
 		return mapper.selectByJobCode(code);
 	}
 
+	/**
+	 * 根据主键更新记录
+	 * 
+	 * @param record
+	 *            T
+	 * @return
+	 */
+	@Override
+	public int updateByPrimaryKey(TaskBean record) {
+		return mapper.updateByPrimaryKey(record);
+	}
 }

+ 11 - 0
xtdsp/trunk/src/main/java/com/xt/dsp/serviceImpl/TaskSqlServiceImpl.java

@@ -17,4 +17,15 @@ public class TaskSqlServiceImpl implements TaskSqlService {
 	public List<TaskSqlBean> selectByTaskId(String taskId) {
 		return mapper.selectByTaskId(taskId);
 	}
+
+	/**
+	 * 根据主键查找记录
+	 * 
+	 * @param id
+	 *            String
+	 * @return T
+	 */
+	public TaskSqlBean selectByPrimaryKey(String id) {
+		return mapper.selectByPrimaryKey(id);
+	}
 }