package com.xt.dsp.job; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.SchedulerContext; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.ConnectionCallback; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.ResultSetExtractor; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.jdbc.core.namedparam.SqlParameterSource; import org.springframework.util.StringUtils; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xt.dsp.bean.TaskCacheBean; import com.xt.dsp.jdbc.JdbcTemplateUtils; import com.xt.dsp.model.DataSourceBean; import com.xt.dsp.model.JobBean; import com.xt.dsp.model.TaskBean; import com.xt.dsp.model.TaskSqlBean; import com.xt.dsp.service.CacheBeanService; import com.xt.dsp.service.DataSourceService; import com.xt.dsp.service.JobService; import com.xt.dsp.service.TaskService; import com.xt.dsp.service.TaskSqlService; /** * 工作调度控制 * * @author yuanxd * */ public class SqlSynJob implements Job { /** 日志记录 */ private final Logger log = LoggerFactory.getLogger(SqlSynJob.class); /** 任务服务接口 */ private TaskService taskService; /** sql抽取任务服务接口 */ private TaskSqlService taskSqlService; /** 数据源服务接口 */ private DataSourceService dataSourceService; /** 当前执行的任务对象 */ private JobBean job = null; /** 任务缓存服务 */ private CacheBeanService cacheBeanService; private JobService jobService = null; private void initServices(ApplicationContext appCtx) { this.taskService = appCtx.getBean(TaskService.class); this.taskSqlService = appCtx.getBean(TaskSqlService.class); this.dataSourceService = appCtx.getBean(DataSourceService.class); this.cacheBeanService = appCtx.getBean(CacheBeanService.class); this.jobService = appCtx.getBean(JobService.class); } @Override public void execute(JobExecutionContext context) throws JobExecutionException { long start = System.currentTimeMillis(); SchedulerContext schCtx = null; try { schCtx = context.getScheduler().getContext(); } catch (SchedulerException e) { log.error("get scheduler error:{}", e.getMessage()); return; } initServices((ApplicationContext) schCtx.get("applicationContext")); String jobId = context.getJobDetail().getJobDataMap().getString(JobService.PARAM_JOB); job = jobService.findOne(jobId); if (null == job) { throw new IllegalArgumentException("未找到执行任务信息!"); } if (StringUtils.isEmpty(job.getCode())) { throw new IllegalArgumentException("job code不可为空!"); } List tasks = taskService.selectByJobCode(job.getCode()); for (TaskBean t : tasks) { // SQL 同步任务 if (TaskBean.TYPE_SQL.equals(t.getType())) { doTaskSqlBeans(t); } } log.info("任务执行完成,耗时:{}", System.currentTimeMillis() - start); } /** * SQL同步任务处理 * * @param taskSqlBeans */ private void doTaskSqlBeans(TaskBean t) { // 执行缓存 executeCache(t); List taskSqlBeans = taskSqlService.selectByTaskId(t.getId()); if (taskSqlBeans == null || taskSqlBeans.size() == 0) { return; } for (TaskSqlBean ts : taskSqlBeans) { JSONObject result = getSqlResultJsonObject(ts); if (null == result || result.getJSONArray("data").size() == 0) { continue; } if (TaskBean.CACHE_USE.equals(t.getCacheUse())) { saveCache(t, ts, result); } else { runTaskSqlWithJdbcTemplate(t, ts, result); } } } /** * 执行缓存,执行成功返回0,否则返回其他 * * @param t * @return */ private int executeCache(TaskBean t) { long start = System.currentTimeMillis(); List taskDatas = cacheBeanService.readCache(t.getCacheFolder()); // 无缓存数据 if (taskDatas == null || taskDatas.size() == 0) { t.setCacheUse(TaskBean.CACHE_UNUSE); taskService.updateByPrimaryKey(t); return 0; } Collections.sort(taskDatas); Map tsbs = new HashMap<>(); for (TaskCacheBean tc : taskDatas) { String tsbid = tc.getTaskId(); TaskSqlBean bean = null; if (tsbs.containsKey(tsbid)) { bean = tsbs.get(tsbid); } else { bean = taskSqlService.selectByPrimaryKey(tsbid); if (null == bean) { return -1; } tsbs.put(tsbid, bean); } if (null == bean) { return -1; } runTaskSqlWithJdbcTemplate(t, bean, JSONObject.parseObject((String) tc.getCacheData())); } t.setCacheUse(TaskBean.CACHE_UNUSE); taskService.updateByPrimaryKey(t); long cost = System.currentTimeMillis() - start; log.info("execute cache cost:" + cost); return 0; } private void saveCache(TaskBean t, TaskSqlBean ts, JSONObject result) { long start = System.currentTimeMillis(); String fileName = ts.getId() + "-" + System.currentTimeMillis(); TaskCacheBean cacheBean = new TaskCacheBean(); cacheBean.setAvalidCacheFolder(t.getCacheFolder()); cacheBean.setCacheData(result); cacheBean.setFileName(fileName); cacheBeanService.writeCache(cacheBean); cacheBeanService.writeCache(cacheBean); cacheBeanService.writeCache(cacheBean); t.setCacheUse(TaskBean.CACHE_USE); taskService.updateByPrimaryKey(t); log.info("Task[] saveCache cost:{}", ts.getId(), System.currentTimeMillis() - start); } /** * 获取sql检索结果 * * @param taskSqlBean * @return */ private JSONObject getSqlResultJsonObject(TaskSqlBean taskSqlBean) { long start = System.currentTimeMillis(); String dsid = taskSqlBean.getSrcConn(); DataSourceBean dsb = dataSourceService.selectByPrimaryKey(dsid); JdbcTemplate template = JdbcTemplateUtils.getJdbcTemplate(dsb); String sql = taskSqlBean.getQuerySql(); JSONObject result = template.query(sql, new ResultSetExtractor() { @Override public JSONObject extractData(ResultSet rs) throws SQLException, DataAccessException { JSONObject result = new JSONObject(); ResultSetMetaData rsmd = rs.getMetaData(); StringBuilder columns = new StringBuilder(); for (int i = 0; i < rsmd.getColumnCount(); i++) { columns.append(rsmd.getColumnName(i + 1).toUpperCase()); columns.append(";"); } if (columns.length() > 0) { columns.deleteCharAt(columns.length() - 1); } JSONArray results = new JSONArray(); while (rs.next()) { JSONObject resultJson = new JSONObject(); for (String column : columns.toString().split(";")) { resultJson.put(column, rs.getObject(column)); } results.add(resultJson); } if (results.size() == 0) { return null; } result.put("columns", columns.toString()); result.put("data", results); return result; } }); log.debug("Task[{}] result:{}", taskSqlBean.getId(), result); log.info("Task[{}] getSqlResultJsonObject cost:{}", taskSqlBean.getId(), (System.currentTimeMillis() - start)); return result; } private void prepare(String srcCols, final TaskSqlBean taskSqlBean, JdbcTemplate template) { long start = System.currentTimeMillis(); final Collection srcColumnsCollection = Arrays.asList(srcCols.split(";")); Map> targetTableColumns = template .execute(new ConnectionCallback>>() { @Override public Map> doInConnection(Connection con) throws SQLException, DataAccessException { Map> tColumns = new HashMap>(); DatabaseMetaData dmd = con.getMetaData(); ResultSet rs = dmd.getColumns(null, dmd.getUserName(), taskSqlBean.getTargetTable().toUpperCase(), null); List columns = new ArrayList<>(); while (rs.next()) { String column = rs.getString("COLUMN_NAME").toUpperCase(); // 只保留对应的列 if (srcColumnsCollection.contains(column)) { columns.add(column); } } tColumns.put("columns", columns); try { rs.close(); } catch (Exception e) { } rs = dmd.getPrimaryKeys(null, dmd.getUserName(), taskSqlBean.getTargetTable().toUpperCase()); List keys = new ArrayList<>(); while (rs.next()) { String key = rs.getString(4); if (srcColumnsCollection.contains(key)) { keys.add(key); } } tColumns.put("keys", keys); try { rs.close(); } catch (Exception e) { } return tColumns; } }); List tableColumns = targetTableColumns.get("columns"); taskSqlBean.setColumns(StringUtils.collectionToDelimitedString(tableColumns, ",")); List keys = targetTableColumns.get("keys"); StringBuilder insertSql = new StringBuilder(); StringBuilder insertParaSql = new StringBuilder(); StringBuilder updateSql = new StringBuilder(); insertSql.append("INSERT INTO ").append(taskSqlBean.getTargetTable()).append("("); updateSql.append("UPDATE ").append(taskSqlBean.getTargetTable()).append(" SET "); for (int i = 0; i < tableColumns.size(); i++) { insertSql.append(tableColumns.get(i)).append(","); insertParaSql.append(":").append(tableColumns.get(i)).append(","); if (!keys.contains(tableColumns.get(i))) { updateSql.append(tableColumns.get(i)).append("=").append(":").append(tableColumns.get(i)).append(","); } } insertSql.deleteCharAt(insertSql.length() - 1); insertParaSql.deleteCharAt(insertParaSql.length() - 1); updateSql.deleteCharAt(updateSql.length() - 1); updateSql.append(" WHERE "); if (keys.size() > 0) { for (int i = 0; i < keys.size(); i++) { if (i != 0) { updateSql.append(" AND "); } updateSql.append(keys.get(i)).append(" = :").append(keys.get(i)); } } else { updateSql = null; } insertSql.append(")VALUES(").append(insertParaSql); insertSql.append(")"); taskSqlBean.setInsertSql(insertSql.toString()); taskSqlBean.setUpdateSql(updateSql.toString()); taskSqlBean.setRefreshSql(TaskSqlBean.REFRESH_SQL_FALSE); taskSqlService.updateByPrimaryKey(taskSqlBean); log.info("Task[]do {} sql task prepare finished, cost:{}", taskSqlBean.getId(), (System.currentTimeMillis() - start)); } public static String listToString(List stringList) { if (stringList == null) { return null; } StringBuilder result = new StringBuilder(); boolean flag = false; for (String string : stringList) { if (flag) { result.append(","); } else { flag = true; } result.append(string); } return result.toString(); } /** * 结果同步到目标源 * * @param taskSqlBean * @param result * @return */ private int runTaskSqlWithJdbcTemplate(final TaskBean t, final TaskSqlBean taskSqlBean, JSONObject result) { try { // 此处应在插入或更新失败时回滚,并把查询结果以文件方式保存,留待以后再执行此任务; String dsid = taskSqlBean.getTargetConn(); DataSourceBean dsb = dataSourceService.selectByPrimaryKey(dsid); if (null == dsb) { throw new RuntimeException("数据源获取失败:" + dsid); } JdbcTemplate template = JdbcTemplateUtils.getJdbcTemplate(dsb); if (TaskSqlBean.REFRESH_SQL_TRUE.equals(taskSqlBean.getRefreshSql())) { prepare(result.getString("columns"), taskSqlBean, template); } JSONArray data = (JSONArray) result.get("data"); return TaskSqlBean.MODE_ALL.equals(taskSqlBean.getMode()) ? doAllCommit(data, taskSqlBean, template) : doOneCommit(data, taskSqlBean, template); } catch (Exception e) { log.error("Task[{}] 执行插入失败:{}", taskSqlBean.getId(), e.getMessage()); e.printStackTrace(); saveCache(t, taskSqlBean, result); return -1; } } /** * 处理事务提交 * * @param data * @param insertSql * @param updateSql * @return */ private int doOneCommit(JSONArray data, TaskSqlBean taskSqlBean, JdbcTemplate template) { long start = System.currentTimeMillis(); String insertSql = taskSqlBean.getInsertSql(); String updateSql = taskSqlBean.getUpdateSql(); List tableColumns = Arrays.asList(taskSqlBean.getColumns().split(",")); try { NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template); for (int i = 0; i < data.size(); i++) { Map param = new HashMap<>(); JSONObject jo = data.getJSONObject(i); for (String col : tableColumns) { param.put(col, jo.get(col)); } int count = nameJdbcTemplate.update(updateSql, new MapSqlParameterSource(param)); if (count == 0) { nameJdbcTemplate.update(insertSql, param); } } log.info("Task[{}] update {} records", taskSqlBean.getId(), data.size()); } catch (Exception e) { return -1; } log.info("Task[{}] doCommit cost:{}", taskSqlBean.getId(), System.currentTimeMillis() - start); return 0; } /** * 处理事务提交 * * @param data * @param insertSql * @param updateSql * @return */ private int doAllCommit(JSONArray data, TaskSqlBean taskSqlBean, JdbcTemplate template) { long start = System.currentTimeMillis(); String insertSql = taskSqlBean.getInsertSql(); List tableColumns = Arrays.asList(taskSqlBean.getColumns().split(",")); try { NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template); List params = new ArrayList<>(); for (int i = 0; i < data.size(); i++) { Map param = new HashMap<>(); JSONObject jo = data.getJSONObject(i); for (String col : tableColumns) { param.put(col, jo.get(col)); } params.add(new MapSqlParameterSource(param)); } SqlParameterSource[] sqlParams = new SqlParameterSource[params.size()]; String deleteAllSql = "DELETE FROM " + taskSqlBean.getTargetTable(); Map paramMap = new HashMap<>(); nameJdbcTemplate.update(deleteAllSql, paramMap); log.info("Task[{}] delete all:{}", taskSqlBean.getId(), deleteAllSql); nameJdbcTemplate.batchUpdate(insertSql, params.toArray(sqlParams)); log.info("Task[{}] update {} records", taskSqlBean.getId(), data.size()); } catch (Exception e) { return -1; } log.info("Task[{}] doCommit cost:{}", taskSqlBean.getId(), System.currentTimeMillis() - start); return 0; } }