Browse Source

git-svn-id: https://192.168.57.71/svn/jsgkj@307 931142cf-59ea-a443-aa0e-51397b428577

xt_yuanxd 9 năm trước cách đây
mục cha
commit
546864f3c7

+ 49 - 9
xtdsp/trunk/src/main/java/com/xt/dsp/job/SqlSynJob.java

@@ -27,6 +27,7 @@ import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.ResultSetExtractor;
 import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
 import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
 import org.springframework.util.StringUtils;
 
 import com.alibaba.fastjson.JSONArray;
@@ -180,7 +181,7 @@ public class SqlSynJob implements Job {
 		cacheBeanService.writeCache(cacheBean);
 		t.setCacheUse(TaskBean.CACHE_USE);
 		taskService.updateByPrimaryKey(t);
-		log.info("saveCache cost:{}", System.currentTimeMillis() - start);
+		log.info("Task[] saveCache cost:{}", ts.getId(), System.currentTimeMillis() - start);
 	}
 
 	/**
@@ -224,8 +225,8 @@ public class SqlSynJob implements Job {
 				return result;
 			}
 		});
-		log.debug("result:{}", result);
-		log.info("getSqlResultJsonObject cost:{}", (System.currentTimeMillis() - start));
+		log.debug("Task[{}] result:{}", taskSqlBean.getId(), result);
+		log.info("Task[{}] getSqlResultJsonObject cost:{}", taskSqlBean.getId(), (System.currentTimeMillis() - start));
 		return result;
 	}
 
@@ -308,7 +309,8 @@ public class SqlSynJob implements Job {
 		taskSqlBean.setUpdateSql(updateSql.toString());
 		taskSqlBean.setRefreshSql(TaskSqlBean.REFRESH_SQL_FALSE);
 		taskSqlService.updateByPrimaryKey(taskSqlBean);
-		log.info("do sql task prepare finished, cost:{}", (System.currentTimeMillis() - start));
+		log.info("Task[]do {} sql task prepare finished, cost:{}", taskSqlBean.getId(),
+				(System.currentTimeMillis() - start));
 	}
 
 	public static String listToString(List<String> stringList) {
@@ -348,9 +350,10 @@ public class SqlSynJob implements Job {
 				prepare(result.getString("columns"), taskSqlBean, template);
 			}
 			JSONArray data = (JSONArray) result.get("data");
-			return doCommit(data, taskSqlBean, template);
+			return TaskSqlBean.MODE_ALL.equals(taskSqlBean.getMode()) ? doAllCommit(data, taskSqlBean, template)
+					: doOneCommit(data, taskSqlBean, template);
 		} catch (Exception e) {
-			log.error("执行插入失败:{}", e.getMessage());
+			log.error("Task[{}] 执行插入失败:{}", taskSqlBean.getId(), e.getMessage());
 			e.printStackTrace();
 			saveCache(t, taskSqlBean, result);
 			return -1;
@@ -365,7 +368,7 @@ public class SqlSynJob implements Job {
 	 * @param updateSql
 	 * @return
 	 */
-	private int doCommit(JSONArray data, TaskSqlBean taskSqlBean, JdbcTemplate template) {
+	private int doOneCommit(JSONArray data, TaskSqlBean taskSqlBean, JdbcTemplate template) {
 		long start = System.currentTimeMillis();
 		String insertSql = taskSqlBean.getInsertSql();
 		String updateSql = taskSqlBean.getUpdateSql();
@@ -383,11 +386,48 @@ public class SqlSynJob implements Job {
 					nameJdbcTemplate.update(insertSql, param);
 				}
 			}
-			log.info("{} task update {} records", taskSqlBean.getId(), data.size());
+			log.info("Task[{}] update {} records", taskSqlBean.getId(), data.size());
 		} catch (Exception e) {
 			return -1;
 		}
-		log.info("doCommit cost:{}", System.currentTimeMillis() - start);
+		log.info("Task[{}] doCommit cost:{}", taskSqlBean.getId(), System.currentTimeMillis() - start);
+		return 0;
+	}
+
+	/**
+	 * 处理事务提交
+	 * 
+	 * @param data
+	 * @param insertSql
+	 * @param updateSql
+	 * @return
+	 */
+	private int doAllCommit(JSONArray data, TaskSqlBean taskSqlBean, JdbcTemplate template) {
+		long start = System.currentTimeMillis();
+		String insertSql = taskSqlBean.getInsertSql();
+		List<String> tableColumns = Arrays.asList(taskSqlBean.getColumns().split(","));
+		try {
+			NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template);
+			List<SqlParameterSource> params = new ArrayList<>();
+			for (int i = 0; i < data.size(); i++) {
+				Map<String, Object> param = new HashMap<>();
+				JSONObject jo = data.getJSONObject(i);
+				for (String col : tableColumns) {
+					param.put(col, jo.get(col));
+				}
+				params.add(new MapSqlParameterSource(param));
+			}
+			SqlParameterSource[] sqlParams = new SqlParameterSource[params.size()];
+			String deleteAllSql = "DELETE FROM " + taskSqlBean.getTargetTable();
+			Map<String, ?> paramMap = new HashMap<>();
+			nameJdbcTemplate.update(deleteAllSql, paramMap);
+			log.info("Task[{}] delete all:{}", taskSqlBean.getId(), deleteAllSql);
+			nameJdbcTemplate.batchUpdate(insertSql, params.toArray(sqlParams));
+			log.info("Task[{}] update {} records", taskSqlBean.getId(), data.size());
+		} catch (Exception e) {
+			return -1;
+		}
+		log.info("Task[{}] doCommit cost:{}", taskSqlBean.getId(), System.currentTimeMillis() - start);
 		return 0;
 	}
 }

+ 48 - 40
xtdsp/trunk/src/main/java/com/xt/dsp/model/TaskSqlBean.java

@@ -3,99 +3,107 @@ package com.xt.dsp.model;
 public class TaskSqlBean {
 	public static final String REFRESH_SQL_TRUE = "1";
 	public static final String REFRESH_SQL_FALSE = "0";
+	public static final String MODE_ONE = "1";
+	public static final String MODE_ALL = "2";
 	private String id;
-
 	private String taskId;
-
 	private String srcConn;
-
 	private String targetConn;
-
 	private String querySql;
 	private String updateSql;
 	private String insertSql;
 	private String refreshSql;
 	private String columns;
+	/** 数据同步方式;1:逐条更新、2:全部删除后插入 */
+	private String mode;
+
+	private String targetTable;
 
 	public String getColumns() {
 		return columns;
 	}
 
-	public void setColumns(String columns) {
-		this.columns = columns;
+	public String getId() {
+		return id;
+	}
+
+	public String getInsertSql() {
+		return insertSql;
+	}
+
+	public String getMode() {
+		return mode;
+	}
+
+	public String getQuerySql() {
+		return querySql;
 	}
 
 	public String getRefreshSql() {
 		return refreshSql;
 	}
 
-	public void setRefreshSql(String refreshSql) {
-		this.refreshSql = refreshSql;
+	public String getSrcConn() {
+		return srcConn;
 	}
 
-	public String getUpdateSql() {
-		return updateSql;
+	public String getTargetConn() {
+		return targetConn;
 	}
 
-	public void setUpdateSql(String updateSql) {
-		this.updateSql = updateSql;
+	public String getTargetTable() {
+		return targetTable;
 	}
 
-	public String getInsertSql() {
-		return insertSql;
+	public String getTaskId() {
+		return taskId;
 	}
 
-	public void setInsertSql(String insertSql) {
-		this.insertSql = insertSql;
+	public String getUpdateSql() {
+		return updateSql;
 	}
 
-	private String targetTable;
-
-	public String getId() {
-		return id;
+	public void setColumns(String columns) {
+		this.columns = columns;
 	}
 
 	public void setId(String id) {
 		this.id = id == null ? null : id.trim();
 	}
 
-	public String getTaskId() {
-		return taskId;
+	public void setInsertSql(String insertSql) {
+		this.insertSql = insertSql;
 	}
 
-	public void setTaskId(String taskId) {
-		this.taskId = taskId == null ? null : taskId.trim();
+	public void setMode(String mode) {
+		this.mode = mode;
 	}
 
-	public String getSrcConn() {
-		return srcConn;
+	public void setQuerySql(String querySql) {
+		this.querySql = querySql == null ? null : querySql.trim();
 	}
 
-	public void setSrcConn(String srcConn) {
-		this.srcConn = srcConn == null ? null : srcConn.trim();
+	public void setRefreshSql(String refreshSql) {
+		this.refreshSql = refreshSql;
 	}
 
-	public String getTargetConn() {
-		return targetConn;
+	public void setSrcConn(String srcConn) {
+		this.srcConn = srcConn == null ? null : srcConn.trim();
 	}
 
 	public void setTargetConn(String targetConn) {
 		this.targetConn = targetConn == null ? null : targetConn.trim();
 	}
 
-	public String getQuerySql() {
-		return querySql;
-	}
-
-	public void setQuerySql(String querySql) {
-		this.querySql = querySql == null ? null : querySql.trim();
+	public void setTargetTable(String targetTable) {
+		this.targetTable = targetTable == null ? null : targetTable.trim();
 	}
 
-	public String getTargetTable() {
-		return targetTable;
+	public void setTaskId(String taskId) {
+		this.taskId = taskId == null ? null : taskId.trim();
 	}
 
-	public void setTargetTable(String targetTable) {
-		this.targetTable = targetTable == null ? null : targetTable.trim();
+	public void setUpdateSql(String updateSql) {
+		this.updateSql = updateSql;
 	}
 }