瀏覽代碼

git-svn-id: https://192.168.57.71/svn/jsgkj@295 931142cf-59ea-a443-aa0e-51397b428577

xt_yuanxd 9 年之前
父節點
當前提交
499ca2399c

+ 1 - 1
xtdsp/trunk/src/main/java/com/xt/dsp/job/CommonJobListener.java

@@ -49,7 +49,7 @@ public class CommonJobListener implements JobListener {
 	@Override
 	public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
 		JobBean info = getJobInfoFromContext(context);
-		log.info("{} job was executed.", info.getJobDisplay());
+		log.info("{} job was executed,cost:{}", info.getJobDisplay(), context.getJobRunTime());
 		info.setEndTime(new Date());
 		jobService.save(info);
 	}

+ 121 - 87
xtdsp/trunk/src/main/java/com/xt/dsp/job/SqlSynJob.java

@@ -91,7 +91,7 @@ public class SqlSynJob implements Job {
 			e1.printStackTrace();
 		} finally {
 			long cost = System.currentTimeMillis() - start;
-			log.debug("任务执行完成,耗时:{}", cost);
+			log.info("任务执行完成,耗时:{}", cost);
 		}
 	}
 
@@ -115,10 +115,7 @@ public class SqlSynJob implements Job {
 			if (TaskBean.CACHE_USE.equals(t.getCacheUse())) {
 				saveCache(t, ts, result);
 			} else {
-				int rest = runTaskSqlWithJdbcTemplate(t, ts, result);
-				if (rest != 0) {
-
-				}
+				runTaskSqlWithJdbcTemplate(t, ts, result);
 			}
 		}
 	}
@@ -130,9 +127,10 @@ public class SqlSynJob implements Job {
 	 * @return
 	 */
 	private int executeCache(TaskBean t) {
+		long start = System.currentTimeMillis();
 		List<TaskCacheBean> taskDatas = cacheBeanService.readCache(t.getCacheFolder());
 		// 无缓存数据
-		if (taskDatas.size() == 0) {
+		if (taskDatas == null || taskDatas.size() == 0) {
 			t.setCacheUse(TaskBean.CACHE_UNUSE);
 			taskService.updateByPrimaryKey(t);
 			return 0;
@@ -158,10 +156,13 @@ public class SqlSynJob implements Job {
 		}
 		t.setCacheUse(TaskBean.CACHE_UNUSE);
 		taskService.updateByPrimaryKey(t);
+		long cost = System.currentTimeMillis() - start;
+		log.info("execute cache cost:" + cost);
 		return 0;
 	}
 
 	private void saveCache(TaskBean t, TaskSqlBean ts, JSONObject result) {
+		long start = System.currentTimeMillis();
 		String fileName = ts.getId() + "-" + System.currentTimeMillis();
 		TaskCacheBean cacheBean = new TaskCacheBean();
 		cacheBean.setAvalidCacheFolder(t.getCacheFolder());
@@ -172,6 +173,7 @@ public class SqlSynJob implements Job {
 		cacheBeanService.writeCache(cacheBean);
 		t.setCacheUse(TaskBean.CACHE_USE);
 		taskService.updateByPrimaryKey(t);
+		log.info("saveCache cost:{}", System.currentTimeMillis() - start);
 	}
 
 	/**
@@ -181,10 +183,11 @@ public class SqlSynJob implements Job {
 	 * @return
 	 */
 	private JSONObject getSqlResultJsonObject(TaskSqlBean taskSqlBean) {
+		long start = System.currentTimeMillis();
 		String dsid = taskSqlBean.getSrcConn();
 		DataSourceBean dsb = dataSourceService.selectByPrimaryKey(dsid);
 		JdbcTemplate template = JdbcTemplateUtils.getJdbcTemplate(dsb);
-		String sql = taskSqlBean.getSql();
+		String sql = taskSqlBean.getQuerySql();
 		JSONObject result = template.query(sql, new ResultSetExtractor<JSONObject>() {
 			@Override
 			public JSONObject extractData(ResultSet rs) throws SQLException, DataAccessException {
@@ -215,9 +218,109 @@ public class SqlSynJob implements Job {
 			}
 		});
 		log.debug("result:{}", result);
+		log.info("getSqlResultJsonObject cost:{}", (System.currentTimeMillis() - start));
 		return result;
 	}
 
+	private void prepare(String srcCols, final TaskSqlBean taskSqlBean, JdbcTemplate template) {
+		long start = System.currentTimeMillis();
+		final Collection<String> srcColumnsCollection = Arrays.asList(srcCols.split(";"));
+		Map<String, List<String>> targetTableColumns = template
+				.execute(new ConnectionCallback<Map<String, List<String>>>() {
+					@Override
+					public Map<String, List<String>> doInConnection(Connection con)
+							throws SQLException, DataAccessException {
+						Map<String, List<String>> tColumns = new HashMap<String, List<String>>();
+						DatabaseMetaData dmd = con.getMetaData();
+						ResultSet rs = dmd.getColumns(null, dmd.getUserName(),
+								taskSqlBean.getTargetTable().toUpperCase(), null);
+						List<String> columns = new ArrayList<>();
+						while (rs.next()) {
+							String column = rs.getString("COLUMN_NAME").toUpperCase();
+							// 只保留对应的列
+							if (srcColumnsCollection.contains(column)) {
+								columns.add(column);
+							}
+						}
+						tColumns.put("columns", columns);
+						try {
+							rs.close();
+						} catch (Exception e) {
+
+						}
+						rs = dmd.getPrimaryKeys(null, dmd.getUserName(), taskSqlBean.getTargetTable().toUpperCase());
+						List<String> keys = new ArrayList<>();
+						while (rs.next()) {
+							String key = rs.getString(4);
+							if (srcColumnsCollection.contains(key)) {
+								keys.add(key);
+							}
+						}
+						tColumns.put("keys", keys);
+						try {
+							rs.close();
+						} catch (Exception e) {
+
+						}
+						return tColumns;
+					}
+				});
+		List<String> tableColumns = targetTableColumns.get("columns");
+		taskSqlBean.setColumns(StringUtils.collectionToDelimitedString(tableColumns, ","));
+		List<String> keys = targetTableColumns.get("keys");
+		StringBuilder insertSql = new StringBuilder();
+		StringBuilder insertParaSql = new StringBuilder();
+		StringBuilder updateSql = new StringBuilder();
+		insertSql.append("INSERT INTO ").append(taskSqlBean.getTargetTable()).append("(");
+		updateSql.append("UPDATE ").append(taskSqlBean.getTargetTable()).append(" SET ");
+		for (int i = 0; i < tableColumns.size(); i++) {
+			insertSql.append(tableColumns.get(i)).append(",");
+			insertParaSql.append(":").append(tableColumns.get(i)).append(",");
+			if (!keys.contains(tableColumns.get(i))) {
+				updateSql.append(tableColumns.get(i)).append("=").append(":").append(tableColumns.get(i)).append(",");
+			}
+		}
+		insertSql.deleteCharAt(insertSql.length() - 1);
+		insertParaSql.deleteCharAt(insertParaSql.length() - 1);
+		updateSql.deleteCharAt(updateSql.length() - 1);
+		updateSql.append(" WHERE ");
+		if (keys.size() > 0) {
+			for (int i = 0; i < keys.size(); i++) {
+				if (i != 0) {
+					updateSql.append(" AND ");
+				}
+				updateSql.append(keys.get(i)).append(" = :").append(keys.get(i));
+			}
+		} else {
+			updateSql = null;
+		}
+
+		insertSql.append(")VALUES(").append(insertParaSql);
+		insertSql.append(")");
+		taskSqlBean.setInsertSql(insertSql.toString());
+		taskSqlBean.setUpdateSql(updateSql.toString());
+		taskSqlBean.setRefreshSql(TaskSqlBean.REFRESH_SQL_FALSE);
+		taskSqlService.updateByPrimaryKey(taskSqlBean);
+		log.info("do sql task prepare finished, cost:{}", (System.currentTimeMillis() - start));
+	}
+
+	public static String listToString(List<String> stringList) {
+		if (stringList == null) {
+			return null;
+		}
+		StringBuilder result = new StringBuilder();
+		boolean flag = false;
+		for (String string : stringList) {
+			if (flag) {
+				result.append(",");
+			} else {
+				flag = true;
+			}
+			result.append(string);
+		}
+		return result.toString();
+	}
+
 	/**
 	 * 结果同步到目标源
 	 * 
@@ -228,93 +331,20 @@ public class SqlSynJob implements Job {
 	private int runTaskSqlWithJdbcTemplate(final TaskBean t, final TaskSqlBean taskSqlBean, JSONObject result) {
 		try {
 			// 此处应在插入或更新失败时回滚,并把查询结果以文件方式保存,留待以后再执行此任务;
-			String[] srcColumns = result.getString("columns").split(";");
 			String dsid = taskSqlBean.getTargetConn();
 			DataSourceBean dsb = dataSourceService.selectByPrimaryKey(dsid);
 			if (null == dsb) {
 				throw new RuntimeException("数据源获取失败:" + dsid);
 			}
 			JdbcTemplate template = JdbcTemplateUtils.getJdbcTemplate(dsb);
-			final Collection<String> srcColumnsCollection = Arrays.asList(srcColumns);
-			Map<String, List<String>> targetTableColumns = template
-					.execute(new ConnectionCallback<Map<String, List<String>>>() {
-						@Override
-						public Map<String, List<String>> doInConnection(Connection con)
-								throws SQLException, DataAccessException {
-							Map<String, List<String>> tColumns = new HashMap<String, List<String>>();
-							DatabaseMetaData dmd = con.getMetaData();
-							ResultSet rs = dmd.getColumns(null, dmd.getUserName(),
-									taskSqlBean.getTargetTable().toUpperCase(), null);
-							List<String> columns = new ArrayList<>();
-							while (rs.next()) {
-								String column = rs.getString("COLUMN_NAME").toUpperCase();
-								// 只保留对应的列
-								if (srcColumnsCollection.contains(column)) {
-									columns.add(column);
-								}
-							}
-							tColumns.put("columns", columns);
-							try {
-								rs.close();
-							} catch (Exception e) {
-
-							}
-							rs = dmd.getPrimaryKeys(null, dmd.getUserName(),
-									taskSqlBean.getTargetTable().toUpperCase());
-							List<String> keys = new ArrayList<>();
-							while (rs.next()) {
-								String key = rs.getString(4);
-								if (srcColumnsCollection.contains(key)) {
-									keys.add(key);
-								}
-							}
-							tColumns.put("keys", keys);
-							try {
-								rs.close();
-							} catch (Exception e) {
-
-							}
-							return tColumns;
-						}
-					});
-			List<String> tableColumns = targetTableColumns.get("columns");
-			List<String> keys = targetTableColumns.get("keys");
-			StringBuilder insertSql = new StringBuilder();
-			StringBuilder insertParaSql = new StringBuilder();
-			StringBuilder updateSql = new StringBuilder();
-			insertSql.append("insert into ").append(taskSqlBean.getTargetTable()).append("(");
-			updateSql.append("update ").append(taskSqlBean.getTargetTable()).append(" set ");
-			for (int i = 0; i < tableColumns.size(); i++) {
-				insertSql.append(tableColumns.get(i)).append(",");
-				insertParaSql.append(":").append(tableColumns.get(i)).append(",");
-				if (!keys.contains(tableColumns.get(i))) {
-					updateSql.append(tableColumns.get(i)).append("=").append(":").append(tableColumns.get(i))
-							.append(",");
-				}
+			if (TaskSqlBean.REFRESH_SQL_TRUE.equals(taskSqlBean.getRefreshSql())) {
+				prepare(result.getString("columns"), taskSqlBean, template);
 			}
-			insertSql.deleteCharAt(insertSql.length() - 1);
-			insertParaSql.deleteCharAt(insertParaSql.length() - 1);
-			updateSql.deleteCharAt(updateSql.length() - 1);
-			updateSql.append(" where ");
-			if (keys.size() > 0) {
-				for (int i = 0; i < keys.size(); i++) {
-					if (i != 0) {
-						updateSql.append(" and ");
-					}
-					updateSql.append(keys.get(i)).append(" = :").append(keys.get(i));
-				}
-			} else {
-				updateSql = null;
-			}
-
-			insertSql.append(")VALUES(").append(insertParaSql);
-			insertSql.append(")");
-			log.debug("insertSql:{}", insertSql.toString());
-			log.debug("updateSql:{}", updateSql.toString());
 			JSONArray data = (JSONArray) result.get("data");
-			return doCommit(data, insertSql.toString(), updateSql.toString(), tableColumns, template);
+			return doCommit(data, taskSqlBean, template);
 		} catch (Exception e) {
 			log.error("执行插入失败:{}", e.getMessage());
+			e.printStackTrace();
 			saveCache(t, taskSqlBean, result);
 			return -1;
 		}
@@ -328,8 +358,11 @@ public class SqlSynJob implements Job {
 	 * @param updateSql
 	 * @return
 	 */
-	private int doCommit(JSONArray data, String insertSql, String updateSql, List<String> tableColumns,
-			JdbcTemplate template) {
+	private int doCommit(JSONArray data, TaskSqlBean taskSqlBean, JdbcTemplate template) {
+		long start = System.currentTimeMillis();
+		String insertSql = taskSqlBean.getInsertSql();
+		String updateSql = taskSqlBean.getUpdateSql();
+		List<String> tableColumns = Arrays.asList(taskSqlBean.getColumns().split(","));
 		try {
 			NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template);
 			for (int i = 0; i < data.size(); i++) {
@@ -343,10 +376,11 @@ public class SqlSynJob implements Job {
 					nameJdbcTemplate.update(insertSql, param);
 				}
 			}
+			log.info("{} task update {} records", taskSqlBean.getId(), data.size());
 		} catch (Exception e) {
 			return -1;
 		}
-
+		log.info("doCommit cost:{}", System.currentTimeMillis() - start);
 		return 0;
 	}
 }

+ 43 - 5
xtdsp/trunk/src/main/java/com/xt/dsp/model/TaskSqlBean.java

@@ -1,6 +1,8 @@
 package com.xt.dsp.model;
 
 public class TaskSqlBean {
+	public static final String REFRESH_SQL_TRUE = "1";
+	public static final String REFRESH_SQL_FALSE = "0";
 	private String id;
 
 	private String taskId;
@@ -9,7 +11,43 @@ public class TaskSqlBean {
 
 	private String targetConn;
 
-	private String sql;
+	private String querySql;
+	private String updateSql;
+	private String insertSql;
+	private String refreshSql;
+	private String columns;
+
+	public String getColumns() {
+		return columns;
+	}
+
+	public void setColumns(String columns) {
+		this.columns = columns;
+	}
+
+	public String getRefreshSql() {
+		return refreshSql;
+	}
+
+	public void setRefreshSql(String refreshSql) {
+		this.refreshSql = refreshSql;
+	}
+
+	public String getUpdateSql() {
+		return updateSql;
+	}
+
+	public void setUpdateSql(String updateSql) {
+		this.updateSql = updateSql;
+	}
+
+	public String getInsertSql() {
+		return insertSql;
+	}
+
+	public void setInsertSql(String insertSql) {
+		this.insertSql = insertSql;
+	}
 
 	private String targetTable;
 
@@ -45,12 +83,12 @@ public class TaskSqlBean {
 		this.targetConn = targetConn == null ? null : targetConn.trim();
 	}
 
-	public String getSql() {
-		return sql;
+	public String getQuerySql() {
+		return querySql;
 	}
 
-	public void setSql(String sql) {
-		this.sql = sql == null ? null : sql.trim();
+	public void setQuerySql(String querySql) {
+		this.querySql = querySql == null ? null : querySql.trim();
 	}
 
 	public String getTargetTable() {

+ 2 - 0
xtdsp/trunk/src/main/java/com/xt/dsp/service/TaskSqlService.java

@@ -8,4 +8,6 @@ public interface TaskSqlService {
 	List<TaskSqlBean> selectByTaskId(String taskId);
 
 	TaskSqlBean selectByPrimaryKey(String id);
+
+	public int updateByPrimaryKey(TaskSqlBean record);
 }

+ 2 - 1
xtdsp/trunk/src/main/java/com/xt/dsp/serviceImpl/JobInfoServiceImpl.java

@@ -156,7 +156,8 @@ public class JobInfoServiceImpl implements JobService {
 				tb.withSchedule(simpleSchedule().withIntervalInSeconds(job.getInterval()).repeatForever());
 			}
 		} else {
-			tb.withSchedule(cronSchedule(job.getCronExpression()));
+			// tb.startNow();
+			tb.startNow().withSchedule(cronSchedule(job.getCronExpression()));
 		}
 		return tb.build();
 	}

+ 4 - 0
xtdsp/trunk/src/main/java/com/xt/dsp/serviceImpl/TaskSqlServiceImpl.java

@@ -28,4 +28,8 @@ public class TaskSqlServiceImpl implements TaskSqlService {
 	public TaskSqlBean selectByPrimaryKey(String id) {
 		return mapper.selectByPrimaryKey(id);
 	}
+
+	public int updateByPrimaryKey(TaskSqlBean record) {
+		return mapper.updateByPrimaryKey(record);
+	}
 }