瀏覽代碼

git-svn-id: https://192.168.57.71/svn/jsgkj@311 931142cf-59ea-a443-aa0e-51397b428577

xt_yuanxd 9 年之前
父節點
當前提交
baa450538e

+ 5 - 1
xtdsp/trunk/pom.xml

@@ -123,7 +123,11 @@
 			<artifactId>quartz</artifactId>
 			<version>2.2.3</version>
 		</dependency>
-
+		<dependency>
+			<groupId>com.yuanxd.tools</groupId>
+			<artifactId>x-tools-io</artifactId>
+			<version>1.0-SNAPSHOT</version>
+		</dependency>
 	</dependencies>
 	<build>
 		<!-- 插件配置 -->

+ 36 - 0
xtdsp/trunk/src/main/java/com/xt/dsp/controller/TaskCtl.java

@@ -0,0 +1,36 @@
+package com.xt.dsp.controller;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+import com.xt.dsp.model.TaskBean;
+import com.xt.dsp.service.TaskService;
+import com.xt.dsp.service.TaskSqlService;
+import com.yuanxd.tools.io.http.JsonResult;
+
+@Controller
+@RequestMapping("task")
+public class TaskCtl {
+	@Autowired
+	private TaskService taskService;
+	@Autowired
+	private TaskSqlService taskSqlService;
+
+	@RequestMapping("run/{code}")
+	@ResponseBody
+	public JsonResult runTask(@PathVariable String code, String condition) {
+		TaskBean task = taskService.selectByCode(code);
+		JsonResult result = new JsonResult();
+		if (TaskBean.TYPE_SQL.equals(task.getType())) {
+			int res = taskSqlService.runTask(task, condition);
+			if (res == 0) {
+				result.setSuccess(true);
+			}
+		}
+		result.setSuccess(false);
+		return result;
+	}
+}

+ 4 - 366
xtdsp/trunk/src/main/java/com/xt/dsp/job/SqlSynJob.java

@@ -1,17 +1,6 @@
 package com.xt.dsp.job;
 
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
@@ -21,25 +10,10 @@ import org.quartz.SchedulerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationContext;
-import org.springframework.dao.DataAccessException;
-import org.springframework.jdbc.core.ConnectionCallback;
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.core.ResultSetExtractor;
-import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
-import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
-import org.springframework.jdbc.core.namedparam.SqlParameterSource;
 import org.springframework.util.StringUtils;
 
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.xt.dsp.bean.TaskCacheBean;
-import com.xt.dsp.jdbc.JdbcTemplateUtils;
-import com.xt.dsp.model.DataSourceBean;
 import com.xt.dsp.model.JobBean;
 import com.xt.dsp.model.TaskBean;
-import com.xt.dsp.model.TaskSqlBean;
-import com.xt.dsp.service.CacheBeanService;
-import com.xt.dsp.service.DataSourceService;
 import com.xt.dsp.service.JobService;
 import com.xt.dsp.service.TaskService;
 import com.xt.dsp.service.TaskSqlService;
@@ -55,21 +29,14 @@ public class SqlSynJob implements Job {
 	private final Logger log = LoggerFactory.getLogger(SqlSynJob.class);
 	/** 任务服务接口 */
 	private TaskService taskService;
-	/** sql抽取任务服务接口 */
+	/** SQL抽取任务服务接口 */
 	private TaskSqlService taskSqlService;
-	/** 数据源服务接口 */
-	private DataSourceService dataSourceService;
-	/** 当前执行的任务对象 */
-	private JobBean job = null;
-	/** 任务缓存服务 */
-	private CacheBeanService cacheBeanService;
+	/** 工作信息服务 */
 	private JobService jobService = null;
 
 	private void initServices(ApplicationContext appCtx) {
 		this.taskService = appCtx.getBean(TaskService.class);
 		this.taskSqlService = appCtx.getBean(TaskSqlService.class);
-		this.dataSourceService = appCtx.getBean(DataSourceService.class);
-		this.cacheBeanService = appCtx.getBean(CacheBeanService.class);
 		this.jobService = appCtx.getBean(JobService.class);
 	}
 
@@ -85,349 +52,20 @@ public class SqlSynJob implements Job {
 		}
 		initServices((ApplicationContext) schCtx.get("applicationContext"));
 		String jobId = context.getJobDetail().getJobDataMap().getString(JobService.PARAM_JOB);
-		job = jobService.findOne(jobId);
+		JobBean job = jobService.findOne(jobId);
 		if (null == job) {
 			throw new IllegalArgumentException("未找到执行任务信息!");
 		}
 		if (StringUtils.isEmpty(job.getCode())) {
 			throw new IllegalArgumentException("job code不可为空!");
 		}
-
 		List<TaskBean> tasks = taskService.selectByJobCode(job.getCode());
 		for (TaskBean t : tasks) {
 			// SQL 同步任务
 			if (TaskBean.TYPE_SQL.equals(t.getType())) {
-				doTaskSqlBeans(t);
+				taskSqlService.runTask(t, null);
 			}
 		}
 		log.info("任务执行完成,耗时:{}", System.currentTimeMillis() - start);
 	}
-
-	/**
-	 * SQL同步任务处理
-	 * 
-	 * @param taskSqlBeans
-	 */
-	private void doTaskSqlBeans(TaskBean t) {
-		// 执行缓存
-		executeCache(t);
-		List<TaskSqlBean> taskSqlBeans = taskSqlService.selectByTaskId(t.getId());
-		if (taskSqlBeans == null || taskSqlBeans.size() == 0) {
-			return;
-		}
-		for (TaskSqlBean ts : taskSqlBeans) {
-			JSONObject result = getSqlResultJsonObject(ts);
-			if (null == result || result.getJSONArray("data").size() == 0) {
-				continue;
-			}
-			if (TaskBean.CACHE_USE.equals(t.getCacheUse())) {
-				saveCache(t, ts, result);
-			} else {
-				runTaskSqlWithJdbcTemplate(t, ts, result);
-			}
-		}
-	}
-
-	/**
-	 * 执行缓存,执行成功返回0,否则返回其他
-	 * 
-	 * @param t
-	 * @return
-	 */
-	private int executeCache(TaskBean t) {
-		long start = System.currentTimeMillis();
-		List<TaskCacheBean> taskDatas = cacheBeanService.readCache(t.getCacheFolder());
-		// 无缓存数据
-		if (taskDatas == null || taskDatas.size() == 0) {
-			t.setCacheUse(TaskBean.CACHE_UNUSE);
-			taskService.updateByPrimaryKey(t);
-			return 0;
-		}
-		Collections.sort(taskDatas);
-		Map<String, TaskSqlBean> tsbs = new HashMap<>();
-		for (TaskCacheBean tc : taskDatas) {
-			String tsbid = tc.getTaskId();
-			TaskSqlBean bean = null;
-			if (tsbs.containsKey(tsbid)) {
-				bean = tsbs.get(tsbid);
-			} else {
-				bean = taskSqlService.selectByPrimaryKey(tsbid);
-				if (null == bean) {
-					return -1;
-				}
-				tsbs.put(tsbid, bean);
-			}
-			if (null == bean) {
-				return -1;
-			}
-			runTaskSqlWithJdbcTemplate(t, bean, JSONObject.parseObject((String) tc.getCacheData()));
-		}
-		t.setCacheUse(TaskBean.CACHE_UNUSE);
-		taskService.updateByPrimaryKey(t);
-		long cost = System.currentTimeMillis() - start;
-		log.info("execute cache cost:" + cost);
-		return 0;
-	}
-
-	private void saveCache(TaskBean t, TaskSqlBean ts, JSONObject result) {
-		long start = System.currentTimeMillis();
-		String fileName = ts.getId() + "-" + System.currentTimeMillis();
-		TaskCacheBean cacheBean = new TaskCacheBean();
-		cacheBean.setAvalidCacheFolder(t.getCacheFolder());
-		cacheBean.setCacheData(result);
-		cacheBean.setFileName(fileName);
-		cacheBeanService.writeCache(cacheBean);
-		cacheBeanService.writeCache(cacheBean);
-		cacheBeanService.writeCache(cacheBean);
-		t.setCacheUse(TaskBean.CACHE_USE);
-		taskService.updateByPrimaryKey(t);
-		log.info("Task[] saveCache cost:{}", ts.getId(), System.currentTimeMillis() - start);
-	}
-
-	/**
-	 * 获取sql检索结果
-	 * 
-	 * @param taskSqlBean
-	 * @return
-	 */
-	private JSONObject getSqlResultJsonObject(TaskSqlBean taskSqlBean) {
-		long start = System.currentTimeMillis();
-		String dsid = taskSqlBean.getSrcConn();
-		DataSourceBean dsb = dataSourceService.selectByPrimaryKey(dsid);
-		JdbcTemplate template = JdbcTemplateUtils.getJdbcTemplate(dsb);
-		String sql = taskSqlBean.getQuerySql();
-		JSONObject result = template.query(sql, new ResultSetExtractor<JSONObject>() {
-			@Override
-			public JSONObject extractData(ResultSet rs) throws SQLException, DataAccessException {
-				JSONObject result = new JSONObject();
-				ResultSetMetaData rsmd = rs.getMetaData();
-				StringBuilder columns = new StringBuilder();
-				for (int i = 0; i < rsmd.getColumnCount(); i++) {
-					columns.append(rsmd.getColumnName(i + 1).toUpperCase());
-					columns.append(";");
-				}
-				if (columns.length() > 0) {
-					columns.deleteCharAt(columns.length() - 1);
-				}
-				JSONArray results = new JSONArray();
-				while (rs.next()) {
-					JSONObject resultJson = new JSONObject();
-					for (String column : columns.toString().split(";")) {
-						resultJson.put(column, rs.getObject(column));
-					}
-					results.add(resultJson);
-				}
-				if (results.size() == 0) {
-					return null;
-				}
-				result.put("columns", columns.toString());
-				result.put("data", results);
-				return result;
-			}
-		});
-		log.debug("Task[{}] result:{}", taskSqlBean.getId(), result);
-		log.info("Task[{}] getSqlResultJsonObject cost:{}", taskSqlBean.getId(), (System.currentTimeMillis() - start));
-		return result;
-	}
-
-	private void prepare(String srcCols, final TaskSqlBean taskSqlBean, JdbcTemplate template) {
-		long start = System.currentTimeMillis();
-		final Collection<String> srcColumnsCollection = Arrays.asList(srcCols.split(";"));
-		Map<String, List<String>> targetTableColumns = template
-				.execute(new ConnectionCallback<Map<String, List<String>>>() {
-					@Override
-					public Map<String, List<String>> doInConnection(Connection con)
-							throws SQLException, DataAccessException {
-						Map<String, List<String>> tColumns = new HashMap<String, List<String>>();
-						DatabaseMetaData dmd = con.getMetaData();
-						ResultSet rs = dmd.getColumns(null, dmd.getUserName(),
-								taskSqlBean.getTargetTable().toUpperCase(), null);
-						List<String> columns = new ArrayList<>();
-						while (rs.next()) {
-							String column = rs.getString("COLUMN_NAME").toUpperCase();
-							// 只保留对应的列
-							if (srcColumnsCollection.contains(column)) {
-								columns.add(column);
-							}
-						}
-						tColumns.put("columns", columns);
-						try {
-							rs.close();
-						} catch (Exception e) {
-
-						}
-						rs = dmd.getPrimaryKeys(null, dmd.getUserName(), taskSqlBean.getTargetTable().toUpperCase());
-						List<String> keys = new ArrayList<>();
-						while (rs.next()) {
-							String key = rs.getString(4);
-							if (srcColumnsCollection.contains(key)) {
-								keys.add(key);
-							}
-						}
-						tColumns.put("keys", keys);
-						try {
-							rs.close();
-						} catch (Exception e) {
-
-						}
-						return tColumns;
-					}
-				});
-		List<String> tableColumns = targetTableColumns.get("columns");
-		taskSqlBean.setColumns(StringUtils.collectionToDelimitedString(tableColumns, ","));
-		List<String> keys = targetTableColumns.get("keys");
-		StringBuilder insertSql = new StringBuilder();
-		StringBuilder insertParaSql = new StringBuilder();
-		StringBuilder updateSql = new StringBuilder();
-		insertSql.append("INSERT INTO ").append(taskSqlBean.getTargetTable()).append("(");
-		updateSql.append("UPDATE ").append(taskSqlBean.getTargetTable()).append(" SET ");
-		for (int i = 0; i < tableColumns.size(); i++) {
-			insertSql.append(tableColumns.get(i)).append(",");
-			insertParaSql.append(":").append(tableColumns.get(i)).append(",");
-			if (!keys.contains(tableColumns.get(i))) {
-				updateSql.append(tableColumns.get(i)).append("=").append(":").append(tableColumns.get(i)).append(",");
-			}
-		}
-		insertSql.deleteCharAt(insertSql.length() - 1);
-		insertParaSql.deleteCharAt(insertParaSql.length() - 1);
-		updateSql.deleteCharAt(updateSql.length() - 1);
-		updateSql.append(" WHERE ");
-		if (keys.size() > 0) {
-			for (int i = 0; i < keys.size(); i++) {
-				if (i != 0) {
-					updateSql.append(" AND ");
-				}
-				updateSql.append(keys.get(i)).append(" = :").append(keys.get(i));
-			}
-		} else {
-			updateSql = null;
-		}
-
-		insertSql.append(")VALUES(").append(insertParaSql);
-		insertSql.append(")");
-		taskSqlBean.setInsertSql(insertSql.toString());
-		taskSqlBean.setUpdateSql(updateSql.toString());
-		taskSqlBean.setRefreshSql(TaskSqlBean.REFRESH_SQL_FALSE);
-		taskSqlService.updateByPrimaryKey(taskSqlBean);
-		log.info("Task[]do {} sql task prepare finished, cost:{}", taskSqlBean.getId(),
-				(System.currentTimeMillis() - start));
-	}
-
-	public static String listToString(List<String> stringList) {
-		if (stringList == null) {
-			return null;
-		}
-		StringBuilder result = new StringBuilder();
-		boolean flag = false;
-		for (String string : stringList) {
-			if (flag) {
-				result.append(",");
-			} else {
-				flag = true;
-			}
-			result.append(string);
-		}
-		return result.toString();
-	}
-
-	/**
-	 * 结果同步到目标源
-	 * 
-	 * @param taskSqlBean
-	 * @param result
-	 * @return
-	 */
-	private int runTaskSqlWithJdbcTemplate(final TaskBean t, final TaskSqlBean taskSqlBean, JSONObject result) {
-		try {
-			// 此处应在插入或更新失败时回滚,并把查询结果以文件方式保存,留待以后再执行此任务;
-			String dsid = taskSqlBean.getTargetConn();
-			DataSourceBean dsb = dataSourceService.selectByPrimaryKey(dsid);
-			if (null == dsb) {
-				throw new RuntimeException("数据源获取失败:" + dsid);
-			}
-			JdbcTemplate template = JdbcTemplateUtils.getJdbcTemplate(dsb);
-			if (TaskSqlBean.REFRESH_SQL_TRUE.equals(taskSqlBean.getRefreshSql())) {
-				prepare(result.getString("columns"), taskSqlBean, template);
-			}
-			JSONArray data = (JSONArray) result.get("data");
-			return TaskSqlBean.MODE_ALL.equals(taskSqlBean.getMode()) ? doAllCommit(data, taskSqlBean, template)
-					: doOneCommit(data, taskSqlBean, template);
-		} catch (Exception e) {
-			log.error("Task[{}] 执行插入失败:{}", taskSqlBean.getId(), e.getMessage());
-			e.printStackTrace();
-			saveCache(t, taskSqlBean, result);
-			return -1;
-		}
-	}
-
-	/**
-	 * 处理事务提交
-	 * 
-	 * @param data
-	 * @param insertSql
-	 * @param updateSql
-	 * @return
-	 */
-	private int doOneCommit(JSONArray data, TaskSqlBean taskSqlBean, JdbcTemplate template) {
-		long start = System.currentTimeMillis();
-		String insertSql = taskSqlBean.getInsertSql();
-		String updateSql = taskSqlBean.getUpdateSql();
-		List<String> tableColumns = Arrays.asList(taskSqlBean.getColumns().split(","));
-		try {
-			NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template);
-			for (int i = 0; i < data.size(); i++) {
-				Map<String, Object> param = new HashMap<>();
-				JSONObject jo = data.getJSONObject(i);
-				for (String col : tableColumns) {
-					param.put(col, jo.get(col));
-				}
-				int count = nameJdbcTemplate.update(updateSql, new MapSqlParameterSource(param));
-				if (count == 0) {
-					nameJdbcTemplate.update(insertSql, param);
-				}
-			}
-			log.info("Task[{}] update {} records", taskSqlBean.getId(), data.size());
-		} catch (Exception e) {
-			return -1;
-		}
-		log.info("Task[{}] doCommit cost:{}", taskSqlBean.getId(), System.currentTimeMillis() - start);
-		return 0;
-	}
-
-	/**
-	 * 处理事务提交
-	 * 
-	 * @param data
-	 * @param insertSql
-	 * @param updateSql
-	 * @return
-	 */
-	private int doAllCommit(JSONArray data, TaskSqlBean taskSqlBean, JdbcTemplate template) {
-		long start = System.currentTimeMillis();
-		String insertSql = taskSqlBean.getInsertSql();
-		List<String> tableColumns = Arrays.asList(taskSqlBean.getColumns().split(","));
-		try {
-			NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template);
-			List<SqlParameterSource> params = new ArrayList<>();
-			for (int i = 0; i < data.size(); i++) {
-				Map<String, Object> param = new HashMap<>();
-				JSONObject jo = data.getJSONObject(i);
-				for (String col : tableColumns) {
-					param.put(col, jo.get(col));
-				}
-				params.add(new MapSqlParameterSource(param));
-			}
-			SqlParameterSource[] sqlParams = new SqlParameterSource[params.size()];
-			String deleteAllSql = "DELETE FROM " + taskSqlBean.getTargetTable();
-			Map<String, ?> paramMap = new HashMap<>();
-			nameJdbcTemplate.update(deleteAllSql, paramMap);
-			log.info("Task[{}] delete all:{}", taskSqlBean.getId(), deleteAllSql);
-			nameJdbcTemplate.batchUpdate(insertSql, params.toArray(sqlParams));
-			log.info("Task[{}] update {} records", taskSqlBean.getId(), data.size());
-		} catch (Exception e) {
-			return -1;
-		}
-		log.info("Task[{}] doCommit cost:{}", taskSqlBean.getId(), System.currentTimeMillis() - start);
-		return 0;
-	}
 }

+ 2 - 0
xtdsp/trunk/src/main/java/com/xt/dsp/mappers/TaskBeanMapper.java

@@ -8,4 +8,6 @@ import com.xt.dsp.system.BaseMapper;
 public interface TaskBeanMapper extends BaseMapper<TaskBean> {
 
 	List<TaskBean> selectByJobCode(String code);
+
+	TaskBean selectByCode(String code);
 }

+ 19 - 0
xtdsp/trunk/src/main/java/com/xt/dsp/service/TaskRun.java

@@ -0,0 +1,19 @@
+package com.xt.dsp.service;
+
+import com.xt.dsp.model.TaskBean;
+
+/**
+ * 可执行任务接口
+ * 
+ * @author yuanxd
+ *
+ */
+public interface TaskRun {
+	/**
+	 * 执行任务
+	 * 
+	 * @param t
+	 * @return
+	 */
+	public int runTask(TaskBean t, String condition);
+}

+ 21 - 0
xtdsp/trunk/src/main/java/com/xt/dsp/service/TaskService.java

@@ -4,11 +4,32 @@ import java.util.List;
 
 import com.xt.dsp.model.TaskBean;
 
+/**
+ * 任务服务接口
+ * 
+ * @author yuanxd
+ *
+ */
 public interface TaskService {
+	/**
+	 * 根据工作编码查询工作下所有任务
+	 * 
+	 * @param code
+	 * @return
+	 */
 	List<TaskBean> selectByJobCode(String code);
 
 	/**
+	 * 根据任务唯一编码查询任务
+	 * 
+	 * @param code
+	 * @return
+	 */
+	TaskBean selectByCode(String code);
+
+	/**
 	 * 更新
+	 * 
 	 * @param record
 	 * @return
 	 */

+ 2 - 1
xtdsp/trunk/src/main/java/com/xt/dsp/service/TaskSqlService.java

@@ -4,7 +4,8 @@ import java.util.List;
 
 import com.xt.dsp.model.TaskSqlBean;
 
-public interface TaskSqlService {
+public interface TaskSqlService extends TaskRun {
+
 	List<TaskSqlBean> selectByTaskId(String taskId);
 
 	TaskSqlBean selectByPrimaryKey(String id);

+ 6 - 0
xtdsp/trunk/src/main/java/com/xt/dsp/serviceImpl/TaskServiceImpl.java

@@ -30,4 +30,10 @@ public class TaskServiceImpl implements TaskService {
 	public int updateByPrimaryKey(TaskBean record) {
 		return mapper.updateByPrimaryKey(record);
 	}
+
+	@Override
+	public TaskBean selectByCode(String code) {
+		return mapper.selectByCode(code);
+	}
+	
 }

+ 386 - 0
xtdsp/trunk/src/main/java/com/xt/dsp/serviceImpl/TaskSqlServiceImpl.java

@@ -1,18 +1,59 @@
 package com.xt.dsp.serviceImpl;
 
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.ConnectionCallback;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.ResultSetExtractor;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
 import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
 
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.xt.dsp.bean.TaskCacheBean;
+import com.xt.dsp.jdbc.JdbcTemplateUtils;
 import com.xt.dsp.mappers.TaskSqlBeanMapper;
+import com.xt.dsp.model.DataSourceBean;
+import com.xt.dsp.model.TaskBean;
 import com.xt.dsp.model.TaskSqlBean;
+import com.xt.dsp.service.CacheBeanService;
+import com.xt.dsp.service.DataSourceService;
+import com.xt.dsp.service.TaskService;
 import com.xt.dsp.service.TaskSqlService;
 
 @Service
 public class TaskSqlServiceImpl implements TaskSqlService {
+	/** 日志记录 */
+	private final Logger log = LoggerFactory.getLogger(TaskSqlServiceImpl.class);
 	@Autowired
 	private TaskSqlBeanMapper mapper;
+	/** sql抽取任务服务接口 */
+	@Autowired
+	private TaskService taskService;
+	/** 任务缓存服务 */
+	@Autowired
+	private CacheBeanService cacheBeanService;
+	/** 数据源服务接口 */
+	@Autowired
+	private DataSourceService dataSourceService;
 
 	public List<TaskSqlBean> selectByTaskId(String taskId) {
 		return mapper.selectByTaskId(taskId);
@@ -32,4 +73,349 @@ public class TaskSqlServiceImpl implements TaskSqlService {
 	public int updateByPrimaryKey(TaskSqlBean record) {
 		return mapper.updateByPrimaryKey(record);
 	}
+
+	/**
+	 * SQL同步任务处理
+	 * 
+	 * @param taskSqlBeans
+	 */
+	public int runTask(TaskBean t, String condition) {
+		if (!TaskBean.TYPE_SQL.equals(t.getType())) {
+			return -1;
+		}
+		long start = System.currentTimeMillis();
+		// 执行缓存
+		executeCache(t);
+		List<TaskSqlBean> taskSqlBeans = selectByTaskId(t.getId());
+		if (taskSqlBeans == null || taskSqlBeans.size() == 0) {
+			return 0;
+		}
+		for (TaskSqlBean ts : taskSqlBeans) {
+			JSONObject result = getSqlResultJsonObject(ts, condition);
+			if (null == result || result.getJSONArray("data").size() == 0) {
+				continue;
+			}
+			if (TaskBean.CACHE_USE.equals(t.getCacheUse())) {
+				saveCache(t, ts, result);
+			} else {
+				runTaskSqlWithJdbcTemplate(t, ts, result);
+			}
+		}
+		log.info("SqlTask[{}] finished,cost:{}", t.getCode(), System.currentTimeMillis() - start);
+		return 0;
+	}
+
+	/**
+	 * 执行缓存,执行成功返回0,否则返回其他
+	 * 
+	 * @param t
+	 * @return
+	 */
+	private int executeCache(TaskBean t) {
+		long start = System.currentTimeMillis();
+		List<TaskCacheBean> taskDatas = cacheBeanService.readCache(t.getCacheFolder());
+		// 无缓存数据
+		if (taskDatas == null || taskDatas.size() == 0) {
+			t.setCacheUse(TaskBean.CACHE_UNUSE);
+			taskService.updateByPrimaryKey(t);
+			return 0;
+		}
+		Collections.sort(taskDatas);
+		Map<String, TaskSqlBean> tsbs = new HashMap<>();
+		for (TaskCacheBean tc : taskDatas) {
+			String tsbid = tc.getTaskId();
+			TaskSqlBean bean = null;
+			if (tsbs.containsKey(tsbid)) {
+				bean = tsbs.get(tsbid);
+			} else {
+				bean = selectByPrimaryKey(tsbid);
+				if (null == bean) {
+					return -1;
+				}
+				tsbs.put(tsbid, bean);
+			}
+			if (null == bean) {
+				return -1;
+			}
+			runTaskSqlWithJdbcTemplate(t, bean, JSONObject.parseObject((String) tc.getCacheData()));
+		}
+		t.setCacheUse(TaskBean.CACHE_UNUSE);
+		taskService.updateByPrimaryKey(t);
+		long cost = System.currentTimeMillis() - start;
+		log.info("SqlTask[{}],execute cache cost:{}", t.getCode(), cost);
+		return 0;
+	}
+
+	/**
+	 * 获取sql检索结果
+	 * 
+	 * @param taskSqlBean
+	 * @return
+	 */
+	private JSONObject getSqlResultJsonObject(TaskSqlBean taskSqlBean, String condition) {
+		long start = System.currentTimeMillis();
+		String dsid = taskSqlBean.getSrcConn();
+		DataSourceBean dsb = dataSourceService.selectByPrimaryKey(dsid);
+		JdbcTemplate template = JdbcTemplateUtils.getJdbcTemplate(dsb);
+		String sql = taskSqlBean.getQuerySql();
+		if (StringUtils.hasText(condition)) {
+			sql += " " + condition;
+		}
+		JSONObject result = template.query(sql, new ResultSetExtractor<JSONObject>() {
+			@Override
+			public JSONObject extractData(ResultSet rs) throws SQLException, DataAccessException {
+				JSONObject result = new JSONObject();
+				ResultSetMetaData rsmd = rs.getMetaData();
+				StringBuilder columns = new StringBuilder();
+				for (int i = 0; i < rsmd.getColumnCount(); i++) {
+					columns.append(rsmd.getColumnName(i + 1).toUpperCase());
+					columns.append(";");
+				}
+				if (columns.length() > 0) {
+					columns.deleteCharAt(columns.length() - 1);
+				}
+				JSONArray results = new JSONArray();
+				while (rs.next()) {
+					JSONObject resultJson = new JSONObject();
+					for (String column : columns.toString().split(";")) {
+						resultJson.put(column, rs.getObject(column));
+					}
+					results.add(resultJson);
+				}
+				if (results.size() == 0) {
+					return null;
+				}
+				result.put("columns", columns.toString());
+				result.put("data", results);
+				return result;
+			}
+		});
+		if (log.isInfoEnabled()) {
+			int size = 0;
+			if (result != null && result.getJSONArray("data") != null) {
+				size = result.getJSONArray("data").size();
+			}
+			log.info("TaskSql[{}] getSqlResultJsonObject get {} records, cost:{}", taskSqlBean.getId(), size,
+					(System.currentTimeMillis() - start));
+		}
+
+		return result;
+	}
+
+	private void prepare(String srcCols, final TaskSqlBean taskSqlBean, JdbcTemplate template) {
+		long start = System.currentTimeMillis();
+		final Collection<String> srcColumnsCollection = Arrays.asList(srcCols.split(";"));
+		Map<String, List<String>> targetTableColumns = template
+				.execute(new ConnectionCallback<Map<String, List<String>>>() {
+					@Override
+					public Map<String, List<String>> doInConnection(Connection con)
+							throws SQLException, DataAccessException {
+						Map<String, List<String>> tColumns = new HashMap<String, List<String>>();
+						DatabaseMetaData dmd = con.getMetaData();
+						ResultSet rs = dmd.getColumns(null, dmd.getUserName(),
+								taskSqlBean.getTargetTable().toUpperCase(), null);
+						List<String> columns = new ArrayList<>();
+						while (rs.next()) {
+							String column = rs.getString("COLUMN_NAME").toUpperCase();
+							// 只保留对应的列
+							if (srcColumnsCollection.contains(column)) {
+								columns.add(column);
+							}
+						}
+						tColumns.put("columns", columns);
+						try {
+							rs.close();
+						} catch (Exception e) {
+
+						}
+						rs = dmd.getPrimaryKeys(null, dmd.getUserName(), taskSqlBean.getTargetTable().toUpperCase());
+						List<String> keys = new ArrayList<>();
+						while (rs.next()) {
+							String key = rs.getString(4);
+							if (srcColumnsCollection.contains(key)) {
+								keys.add(key);
+							}
+						}
+						tColumns.put("keys", keys);
+						try {
+							rs.close();
+						} catch (Exception e) {
+
+						}
+						return tColumns;
+					}
+				});
+		List<String> tableColumns = targetTableColumns.get("columns");
+		taskSqlBean.setColumns(StringUtils.collectionToDelimitedString(tableColumns, ","));
+		List<String> keys = targetTableColumns.get("keys");
+		StringBuilder insertSql = new StringBuilder();
+		StringBuilder insertParaSql = new StringBuilder();
+		StringBuilder updateSql = new StringBuilder();
+		insertSql.append("INSERT INTO ").append(taskSqlBean.getTargetTable()).append("(");
+		updateSql.append("UPDATE ").append(taskSqlBean.getTargetTable()).append(" SET ");
+		for (int i = 0; i < tableColumns.size(); i++) {
+			insertSql.append(tableColumns.get(i)).append(",");
+			insertParaSql.append(":").append(tableColumns.get(i)).append(",");
+			if (!keys.contains(tableColumns.get(i))) {
+				updateSql.append(tableColumns.get(i)).append("=").append(":").append(tableColumns.get(i)).append(",");
+			}
+		}
+		insertSql.deleteCharAt(insertSql.length() - 1);
+		insertParaSql.deleteCharAt(insertParaSql.length() - 1);
+		updateSql.deleteCharAt(updateSql.length() - 1);
+		updateSql.append(" WHERE ");
+		if (keys.size() > 0) {
+			for (int i = 0; i < keys.size(); i++) {
+				if (i != 0) {
+					updateSql.append(" AND ");
+				}
+				updateSql.append(keys.get(i)).append(" = :").append(keys.get(i));
+			}
+		} else {
+			updateSql = null;
+		}
+
+		insertSql.append(")VALUES(").append(insertParaSql);
+		insertSql.append(")");
+		taskSqlBean.setInsertSql(insertSql.toString());
+		taskSqlBean.setUpdateSql(updateSql.toString());
+		taskSqlBean.setRefreshSql(TaskSqlBean.REFRESH_SQL_FALSE);
+		updateByPrimaryKey(taskSqlBean);
+		log.info("SqlTask[]do {} sql task prepare finished, cost:{}", taskSqlBean.getId(),
+				(System.currentTimeMillis() - start));
+	}
+
+	public static String listToString(List<String> stringList) {
+		if (stringList == null) {
+			return null;
+		}
+		StringBuilder result = new StringBuilder();
+		boolean flag = false;
+		for (String string : stringList) {
+			if (flag) {
+				result.append(",");
+			} else {
+				flag = true;
+			}
+			result.append(string);
+		}
+		return result.toString();
+	}
+
+	/**
+	 * 结果同步到目标源
+	 * 
+	 * @param taskSqlBean
+	 * @param result
+	 * @return
+	 */
+	private int runTaskSqlWithJdbcTemplate(final TaskBean t, final TaskSqlBean taskSqlBean, JSONObject result) {
+		try {
+			// 此处应在插入或更新失败时回滚,并把查询结果以文件方式保存,留待以后再执行此任务;
+			String dsid = taskSqlBean.getTargetConn();
+			DataSourceBean dsb = dataSourceService.selectByPrimaryKey(dsid);
+			if (null == dsb) {
+				throw new RuntimeException("数据源获取失败:" + dsid);
+			}
+			JdbcTemplate template = JdbcTemplateUtils.getJdbcTemplate(dsb);
+			if (TaskSqlBean.REFRESH_SQL_TRUE.equals(taskSqlBean.getRefreshSql())) {
+				prepare(result.getString("columns"), taskSqlBean, template);
+			}
+			JSONArray data = (JSONArray) result.get("data");
+			return TaskSqlBean.MODE_ALL.equals(taskSqlBean.getMode()) ? doAllCommit(data, taskSqlBean, template)
+					: doOneCommit(data, taskSqlBean, template);
+		} catch (Exception e) {
+			log.error("SqlTask[{}] 执行插入失败:{}", t.getCode(), e.getMessage());
+			e.printStackTrace();
+			saveCache(t, taskSqlBean, result);
+			return -1;
+		}
+	}
+
+	/**
+	 * 处理事务提交
+	 * 
+	 * @param data
+	 * @param insertSql
+	 * @param updateSql
+	 * @return
+	 */
+	private int doOneCommit(JSONArray data, TaskSqlBean taskSqlBean, JdbcTemplate template) {
+		long start = System.currentTimeMillis();
+		String insertSql = taskSqlBean.getInsertSql();
+		String updateSql = taskSqlBean.getUpdateSql();
+		List<String> tableColumns = Arrays.asList(taskSqlBean.getColumns().split(","));
+		try {
+			NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template);
+			for (int i = 0; i < data.size(); i++) {
+				Map<String, Object> param = new HashMap<>();
+				JSONObject jo = data.getJSONObject(i);
+				for (String col : tableColumns) {
+					param.put(col, jo.get(col));
+				}
+				int count = nameJdbcTemplate.update(updateSql, new MapSqlParameterSource(param));
+				if (count == 0) {
+					nameJdbcTemplate.update(insertSql, param);
+				}
+			}
+			log.info("SqlTask[{}] update {} records", taskSqlBean.getId(), data.size());
+		} catch (Exception e) {
+			return -1;
+		}
+		log.info("SqlTask[{}] doCommit cost:{}", taskSqlBean.getId(), System.currentTimeMillis() - start);
+		return 0;
+	}
+
+	/**
+	 * 处理事务提交
+	 * 
+	 * @param data
+	 * @param insertSql
+	 * @param updateSql
+	 * @return
+	 */
+	private int doAllCommit(JSONArray data, TaskSqlBean taskSqlBean, JdbcTemplate template) {
+		long start = System.currentTimeMillis();
+		String insertSql = taskSqlBean.getInsertSql();
+		List<String> tableColumns = Arrays.asList(taskSqlBean.getColumns().split(","));
+		try {
+			NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template);
+			List<SqlParameterSource> params = new ArrayList<>();
+			for (int i = 0; i < data.size(); i++) {
+				Map<String, Object> param = new HashMap<>();
+				JSONObject jo = data.getJSONObject(i);
+				for (String col : tableColumns) {
+					param.put(col, jo.get(col));
+				}
+				params.add(new MapSqlParameterSource(param));
+			}
+			SqlParameterSource[] sqlParams = new SqlParameterSource[params.size()];
+			String deleteAllSql = "DELETE FROM " + taskSqlBean.getTargetTable();
+			Map<String, ?> paramMap = new HashMap<>();
+			nameJdbcTemplate.update(deleteAllSql, paramMap);
+			log.info("SqlTask[{}] delete all:{}", taskSqlBean.getId(), deleteAllSql);
+			nameJdbcTemplate.batchUpdate(insertSql, params.toArray(sqlParams));
+			log.info("SqlTask[{}] update {} records", taskSqlBean.getId(), data.size());
+		} catch (Exception e) {
+			return -1;
+		}
+		log.info("SqlTask[{}] doCommit cost:{}", taskSqlBean.getId(), System.currentTimeMillis() - start);
+		return 0;
+	}
+
+	private void saveCache(TaskBean t, TaskSqlBean ts, JSONObject result) {
+		long start = System.currentTimeMillis();
+		String fileName = ts.getId() + "-" + System.currentTimeMillis();
+		TaskCacheBean cacheBean = new TaskCacheBean();
+		cacheBean.setAvalidCacheFolder(t.getCacheFolder());
+		cacheBean.setCacheData(result);
+		cacheBean.setFileName(fileName);
+		cacheBeanService.writeCache(cacheBean);
+		cacheBeanService.writeCache(cacheBean);
+		cacheBeanService.writeCache(cacheBean);
+		t.setCacheUse(TaskBean.CACHE_USE);
+		taskService.updateByPrimaryKey(t);
+		log.info("SqlTask[] saveCache cost:{}", t.getCode(), System.currentTimeMillis() - start);
+	}
+
 }

+ 5 - 0
xtdsp/trunk/src/main/resources/com/xt/dsp/mappers/TaskBeanMapper.xml

@@ -40,6 +40,11 @@
     from DSP_TASK
     where JOB_CODE = #{jobcode,jdbcType=VARCHAR}
   </select>
+  <select id="selectByCode" resultMap="BaseResultMap" parameterType="java.lang.String" >
+    select ID, NAME, CODE, JOB_CODE, TYPE,CACHE_FOLDER,CACHE_USE
+    from DSP_TASK
+    where CODE = #{jobcode,jdbcType=VARCHAR}
+  </select>
   <select id="selectAll" resultMap="BaseResultMap" >
     select ID, NAME, CODE, JOB_CODE, TYPE,CACHE_FOLDER,CACHE_USE
     from DSP_TASK

+ 3 - 3
xtdsp/trunk/src/main/resources/log4j/log4j.xml

@@ -14,7 +14,7 @@
 		<!-- 输出自定义内容的LOG -->
 		<layout class="org.apache.log4j.PatternLayout">
 			<!-- 输出时Log内容的具体定义 -->
-			<param name="ConversionPattern" value="[%d{yyyy-MM-dd HH:mm:ss,SSS}][%c] %-5p %t %m%n" />
+			<param name="ConversionPattern" value="[%d{yyyy-MM-dd HH:mm:ss,SSS}][%c] %-5p %m%n" />
 		</layout>
 	</appender>
 
@@ -38,7 +38,7 @@
 
 		<!-- 输出时Log内容的具体定义 -->
 		<layout class="org.apache.log4j.PatternLayout">
-			<param name="ConversionPattern" value="[%d{yyyy-MM-dd HH:mm:ss,SSS}][%-20c{1}] %-5p %t %m%n" />
+			<param name="ConversionPattern" value="[%d{yyyy-MM-dd HH:mm:ss,SSS}][%-20c{1}] %-5p %m%n" />
 		</layout>
 
 		<!-- 过滤输出时Log内容,在这里,LevelMin,LevelMax都定义了DEBUG, 所以只输出DEBUG 级别LOG的数据 -->
@@ -56,7 +56,7 @@
 		<param name="MaxFileSize" value="5000KB" />
 		<param name="MaxBackupIndex" value="3" />
 		<layout class="org.apache.log4j.PatternLayout">
-			<param name="ConversionPattern" value="[%d{yyyy-MM-dd HH:mm:ss,SSS}][%-20c{1}] %-5p %t %m%n" />
+			<param name="ConversionPattern" value="[%d{yyyy-MM-dd HH:mm:ss,SSS}][%-20c{1}] %-5p %m%n" />
 		</layout>
 		<filter class="org.apache.log4j.varia.LevelRangeFilter">
 			<param name="LevelMin" value="INFO" />

+ 1 - 1
xtdsp/trunk/src/main/resources/spring/mvc.xml

@@ -6,7 +6,7 @@
         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
 
     <!-- 自动扫描且只扫描@Controller -->
-    <context:component-scan base-package="com.xt.hb.data" use-default-filters="false">
+    <context:component-scan base-package="com.xt.dsp" use-default-filters="false">
         <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller" />
     </context:component-scan>