SqlSynJob.java 15 KB


  1. package com.xt.dsp.job;
  2. import java.sql.Connection;
  3. import java.sql.DatabaseMetaData;
  4. import java.sql.ResultSet;
  5. import java.sql.ResultSetMetaData;
  6. import java.sql.SQLException;
  7. import java.util.ArrayList;
  8. import java.util.Arrays;
  9. import java.util.Collection;
  10. import java.util.Collections;
  11. import java.util.HashMap;
  12. import java.util.List;
  13. import java.util.Map;
  14. import org.quartz.Job;
  15. import org.quartz.JobExecutionContext;
  16. import org.quartz.JobExecutionException;
  17. import org.quartz.SchedulerContext;
  18. import org.quartz.SchedulerException;
  19. import org.slf4j.Logger;
  20. import org.slf4j.LoggerFactory;
  21. import org.springframework.context.ApplicationContext;
  22. import org.springframework.dao.DataAccessException;
  23. import org.springframework.jdbc.core.ConnectionCallback;
  24. import org.springframework.jdbc.core.JdbcTemplate;
  25. import org.springframework.jdbc.core.ResultSetExtractor;
  26. import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
  27. import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
  28. import org.springframework.jdbc.core.namedparam.SqlParameterSource;
  29. import org.springframework.util.StringUtils;
  30. import com.alibaba.fastjson.JSONArray;
  31. import com.alibaba.fastjson.JSONObject;
  32. import com.xt.dsp.bean.TaskCacheBean;
  33. import com.xt.dsp.jdbc.JdbcTemplateUtils;
  34. import com.xt.dsp.model.DataSourceBean;
  35. import com.xt.dsp.model.JobBean;
  36. import com.xt.dsp.model.TaskBean;
  37. import com.xt.dsp.model.TaskSqlBean;
  38. import com.xt.dsp.service.CacheBeanService;
  39. import com.xt.dsp.service.DataSourceService;
  40. import com.xt.dsp.service.JobService;
  41. import com.xt.dsp.service.TaskService;
  42. import com.xt.dsp.service.TaskSqlService;
  43. /**
  44. * 工作调度控制
  45. *
  46. * @author yuanxd
  47. *
  48. */
  49. public class SqlSynJob implements Job {
  50. /** 日志记录 */
  51. private final Logger log = LoggerFactory.getLogger(SqlSynJob.class);
  52. /** 任务服务接口 */
  53. private TaskService taskService;
  54. /** sql抽取任务服务接口 */
  55. private TaskSqlService taskSqlService;
  56. /** 数据源服务接口 */
  57. private DataSourceService dataSourceService;
  58. /** 当前执行的任务对象 */
  59. private JobBean job = null;
  60. /** 任务缓存服务 */
  61. private CacheBeanService cacheBeanService;
  62. private JobService jobService = null;
  63. private void initServices(ApplicationContext appCtx) {
  64. this.taskService = appCtx.getBean(TaskService.class);
  65. this.taskSqlService = appCtx.getBean(TaskSqlService.class);
  66. this.dataSourceService = appCtx.getBean(DataSourceService.class);
  67. this.cacheBeanService = appCtx.getBean(CacheBeanService.class);
  68. this.jobService = appCtx.getBean(JobService.class);
  69. }
  70. @Override
  71. public void execute(JobExecutionContext context) throws JobExecutionException {
  72. long start = System.currentTimeMillis();
  73. SchedulerContext schCtx = null;
  74. try {
  75. schCtx = context.getScheduler().getContext();
  76. } catch (SchedulerException e) {
  77. log.error("get scheduler error:{}", e.getMessage());
  78. return;
  79. }
  80. initServices((ApplicationContext) schCtx.get("applicationContext"));
  81. String jobId = context.getJobDetail().getJobDataMap().getString(JobService.PARAM_JOB);
  82. job = jobService.findOne(jobId);
  83. if (null == job) {
  84. throw new IllegalArgumentException("未找到执行任务信息!");
  85. }
  86. if (StringUtils.isEmpty(job.getCode())) {
  87. throw new IllegalArgumentException("job code不可为空!");
  88. }
  89. List<TaskBean> tasks = taskService.selectByJobCode(job.getCode());
  90. for (TaskBean t : tasks) {
  91. // SQL 同步任务
  92. if (TaskBean.TYPE_SQL.equals(t.getType())) {
  93. doTaskSqlBeans(t);
  94. }
  95. }
  96. log.info("任务执行完成,耗时:{}", System.currentTimeMillis() - start);
  97. }
  98. /**
  99. * SQL同步任务处理
  100. *
  101. * @param taskSqlBeans
  102. */
  103. private void doTaskSqlBeans(TaskBean t) {
  104. // 执行缓存
  105. executeCache(t);
  106. List<TaskSqlBean> taskSqlBeans = taskSqlService.selectByTaskId(t.getId());
  107. if (taskSqlBeans == null || taskSqlBeans.size() == 0) {
  108. return;
  109. }
  110. for (TaskSqlBean ts : taskSqlBeans) {
  111. JSONObject result = getSqlResultJsonObject(ts);
  112. if (null == result || result.getJSONArray("data").size() == 0) {
  113. continue;
  114. }
  115. if (TaskBean.CACHE_USE.equals(t.getCacheUse())) {
  116. saveCache(t, ts, result);
  117. } else {
  118. runTaskSqlWithJdbcTemplate(t, ts, result);
  119. }
  120. }
  121. }
  122. /**
  123. * 执行缓存,执行成功返回0,否则返回其他
  124. *
  125. * @param t
  126. * @return
  127. */
  128. private int executeCache(TaskBean t) {
  129. long start = System.currentTimeMillis();
  130. List<TaskCacheBean> taskDatas = cacheBeanService.readCache(t.getCacheFolder());
  131. // 无缓存数据
  132. if (taskDatas == null || taskDatas.size() == 0) {
  133. t.setCacheUse(TaskBean.CACHE_UNUSE);
  134. taskService.updateByPrimaryKey(t);
  135. return 0;
  136. }
  137. Collections.sort(taskDatas);
  138. Map<String, TaskSqlBean> tsbs = new HashMap<>();
  139. for (TaskCacheBean tc : taskDatas) {
  140. String tsbid = tc.getTaskId();
  141. TaskSqlBean bean = null;
  142. if (tsbs.containsKey(tsbid)) {
  143. bean = tsbs.get(tsbid);
  144. } else {
  145. bean = taskSqlService.selectByPrimaryKey(tsbid);
  146. if (null == bean) {
  147. return -1;
  148. }
  149. tsbs.put(tsbid, bean);
  150. }
  151. if (null == bean) {
  152. return -1;
  153. }
  154. runTaskSqlWithJdbcTemplate(t, bean, JSONObject.parseObject((String) tc.getCacheData()));
  155. }
  156. t.setCacheUse(TaskBean.CACHE_UNUSE);
  157. taskService.updateByPrimaryKey(t);
  158. long cost = System.currentTimeMillis() - start;
  159. log.info("execute cache cost:" + cost);
  160. return 0;
  161. }
  162. private void saveCache(TaskBean t, TaskSqlBean ts, JSONObject result) {
  163. long start = System.currentTimeMillis();
  164. String fileName = ts.getId() + "-" + System.currentTimeMillis();
  165. TaskCacheBean cacheBean = new TaskCacheBean();
  166. cacheBean.setAvalidCacheFolder(t.getCacheFolder());
  167. cacheBean.setCacheData(result);
  168. cacheBean.setFileName(fileName);
  169. cacheBeanService.writeCache(cacheBean);
  170. cacheBeanService.writeCache(cacheBean);
  171. cacheBeanService.writeCache(cacheBean);
  172. t.setCacheUse(TaskBean.CACHE_USE);
  173. taskService.updateByPrimaryKey(t);
  174. log.info("Task[] saveCache cost:{}", ts.getId(), System.currentTimeMillis() - start);
  175. }
  176. /**
  177. * 获取sql检索结果
  178. *
  179. * @param taskSqlBean
  180. * @return
  181. */
  182. private JSONObject getSqlResultJsonObject(TaskSqlBean taskSqlBean) {
  183. long start = System.currentTimeMillis();
  184. String dsid = taskSqlBean.getSrcConn();
  185. DataSourceBean dsb = dataSourceService.selectByPrimaryKey(dsid);
  186. JdbcTemplate template = JdbcTemplateUtils.getJdbcTemplate(dsb);
  187. String sql = taskSqlBean.getQuerySql();
  188. JSONObject result = template.query(sql, new ResultSetExtractor<JSONObject>() {
  189. @Override
  190. public JSONObject extractData(ResultSet rs) throws SQLException, DataAccessException {
  191. JSONObject result = new JSONObject();
  192. ResultSetMetaData rsmd = rs.getMetaData();
  193. StringBuilder columns = new StringBuilder();
  194. for (int i = 0; i < rsmd.getColumnCount(); i++) {
  195. columns.append(rsmd.getColumnName(i + 1).toUpperCase());
  196. columns.append(";");
  197. }
  198. if (columns.length() > 0) {
  199. columns.deleteCharAt(columns.length() - 1);
  200. }
  201. JSONArray results = new JSONArray();
  202. while (rs.next()) {
  203. JSONObject resultJson = new JSONObject();
  204. for (String column : columns.toString().split(";")) {
  205. resultJson.put(column, rs.getObject(column));
  206. }
  207. results.add(resultJson);
  208. }
  209. if (results.size() == 0) {
  210. return null;
  211. }
  212. result.put("columns", columns.toString());
  213. result.put("data", results);
  214. return result;
  215. }
  216. });
  217. log.debug("Task[{}] result:{}", taskSqlBean.getId(), result);
  218. log.info("Task[{}] getSqlResultJsonObject cost:{}", taskSqlBean.getId(), (System.currentTimeMillis() - start));
  219. return result;
  220. }
  221. private void prepare(String srcCols, final TaskSqlBean taskSqlBean, JdbcTemplate template) {
  222. long start = System.currentTimeMillis();
  223. final Collection<String> srcColumnsCollection = Arrays.asList(srcCols.split(";"));
  224. Map<String, List<String>> targetTableColumns = template
  225. .execute(new ConnectionCallback<Map<String, List<String>>>() {
  226. @Override
  227. public Map<String, List<String>> doInConnection(Connection con)
  228. throws SQLException, DataAccessException {
  229. Map<String, List<String>> tColumns = new HashMap<String, List<String>>();
  230. DatabaseMetaData dmd = con.getMetaData();
  231. ResultSet rs = dmd.getColumns(null, dmd.getUserName(),
  232. taskSqlBean.getTargetTable().toUpperCase(), null);
  233. List<String> columns = new ArrayList<>();
  234. while (rs.next()) {
  235. String column = rs.getString("COLUMN_NAME").toUpperCase();
  236. // 只保留对应的列
  237. if (srcColumnsCollection.contains(column)) {
  238. columns.add(column);
  239. }
  240. }
  241. tColumns.put("columns", columns);
  242. try {
  243. rs.close();
  244. } catch (Exception e) {
  245. }
  246. rs = dmd.getPrimaryKeys(null, dmd.getUserName(), taskSqlBean.getTargetTable().toUpperCase());
  247. List<String> keys = new ArrayList<>();
  248. while (rs.next()) {
  249. String key = rs.getString(4);
  250. if (srcColumnsCollection.contains(key)) {
  251. keys.add(key);
  252. }
  253. }
  254. tColumns.put("keys", keys);
  255. try {
  256. rs.close();
  257. } catch (Exception e) {
  258. }
  259. return tColumns;
  260. }
  261. });
  262. List<String> tableColumns = targetTableColumns.get("columns");
  263. taskSqlBean.setColumns(StringUtils.collectionToDelimitedString(tableColumns, ","));
  264. List<String> keys = targetTableColumns.get("keys");
  265. StringBuilder insertSql = new StringBuilder();
  266. StringBuilder insertParaSql = new StringBuilder();
  267. StringBuilder updateSql = new StringBuilder();
  268. insertSql.append("INSERT INTO ").append(taskSqlBean.getTargetTable()).append("(");
  269. updateSql.append("UPDATE ").append(taskSqlBean.getTargetTable()).append(" SET ");
  270. for (int i = 0; i < tableColumns.size(); i++) {
  271. insertSql.append(tableColumns.get(i)).append(",");
  272. insertParaSql.append(":").append(tableColumns.get(i)).append(",");
  273. if (!keys.contains(tableColumns.get(i))) {
  274. updateSql.append(tableColumns.get(i)).append("=").append(":").append(tableColumns.get(i)).append(",");
  275. }
  276. }
  277. insertSql.deleteCharAt(insertSql.length() - 1);
  278. insertParaSql.deleteCharAt(insertParaSql.length() - 1);
  279. updateSql.deleteCharAt(updateSql.length() - 1);
  280. updateSql.append(" WHERE ");
  281. if (keys.size() > 0) {
  282. for (int i = 0; i < keys.size(); i++) {
  283. if (i != 0) {
  284. updateSql.append(" AND ");
  285. }
  286. updateSql.append(keys.get(i)).append(" = :").append(keys.get(i));
  287. }
  288. } else {
  289. updateSql = null;
  290. }
  291. insertSql.append(")VALUES(").append(insertParaSql);
  292. insertSql.append(")");
  293. taskSqlBean.setInsertSql(insertSql.toString());
  294. taskSqlBean.setUpdateSql(updateSql.toString());
  295. taskSqlBean.setRefreshSql(TaskSqlBean.REFRESH_SQL_FALSE);
  296. taskSqlService.updateByPrimaryKey(taskSqlBean);
  297. log.info("Task[]do {} sql task prepare finished, cost:{}", taskSqlBean.getId(),
  298. (System.currentTimeMillis() - start));
  299. }
  300. public static String listToString(List<String> stringList) {
  301. if (stringList == null) {
  302. return null;
  303. }
  304. StringBuilder result = new StringBuilder();
  305. boolean flag = false;
  306. for (String string : stringList) {
  307. if (flag) {
  308. result.append(",");
  309. } else {
  310. flag = true;
  311. }
  312. result.append(string);
  313. }
  314. return result.toString();
  315. }
  316. /**
  317. * 结果同步到目标源
  318. *
  319. * @param taskSqlBean
  320. * @param result
  321. * @return
  322. */
  323. private int runTaskSqlWithJdbcTemplate(final TaskBean t, final TaskSqlBean taskSqlBean, JSONObject result) {
  324. try {
  325. // 此处应在插入或更新失败时回滚,并把查询结果以文件方式保存,留待以后再执行此任务;
  326. String dsid = taskSqlBean.getTargetConn();
  327. DataSourceBean dsb = dataSourceService.selectByPrimaryKey(dsid);
  328. if (null == dsb) {
  329. throw new RuntimeException("数据源获取失败:" + dsid);
  330. }
  331. JdbcTemplate template = JdbcTemplateUtils.getJdbcTemplate(dsb);
  332. if (TaskSqlBean.REFRESH_SQL_TRUE.equals(taskSqlBean.getRefreshSql())) {
  333. prepare(result.getString("columns"), taskSqlBean, template);
  334. }
  335. JSONArray data = (JSONArray) result.get("data");
  336. return TaskSqlBean.MODE_ALL.equals(taskSqlBean.getMode()) ? doAllCommit(data, taskSqlBean, template)
  337. : doOneCommit(data, taskSqlBean, template);
  338. } catch (Exception e) {
  339. log.error("Task[{}] 执行插入失败:{}", taskSqlBean.getId(), e.getMessage());
  340. e.printStackTrace();
  341. saveCache(t, taskSqlBean, result);
  342. return -1;
  343. }
  344. }
  345. /**
  346. * 处理事务提交
  347. *
  348. * @param data
  349. * @param insertSql
  350. * @param updateSql
  351. * @return
  352. */
  353. private int doOneCommit(JSONArray data, TaskSqlBean taskSqlBean, JdbcTemplate template) {
  354. long start = System.currentTimeMillis();
  355. String insertSql = taskSqlBean.getInsertSql();
  356. String updateSql = taskSqlBean.getUpdateSql();
  357. List<String> tableColumns = Arrays.asList(taskSqlBean.getColumns().split(","));
  358. try {
  359. NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template);
  360. for (int i = 0; i < data.size(); i++) {
  361. Map<String, Object> param = new HashMap<>();
  362. JSONObject jo = data.getJSONObject(i);
  363. for (String col : tableColumns) {
  364. param.put(col, jo.get(col));
  365. }
  366. int count = nameJdbcTemplate.update(updateSql, new MapSqlParameterSource(param));
  367. if (count == 0) {
  368. nameJdbcTemplate.update(insertSql, param);
  369. }
  370. }
  371. log.info("Task[{}] update {} records", taskSqlBean.getId(), data.size());
  372. } catch (Exception e) {
  373. return -1;
  374. }
  375. log.info("Task[{}] doCommit cost:{}", taskSqlBean.getId(), System.currentTimeMillis() - start);
  376. return 0;
  377. }
  378. /**
  379. * 处理事务提交
  380. *
  381. * @param data
  382. * @param insertSql
  383. * @param updateSql
  384. * @return
  385. */
  386. private int doAllCommit(JSONArray data, TaskSqlBean taskSqlBean, JdbcTemplate template) {
  387. long start = System.currentTimeMillis();
  388. String insertSql = taskSqlBean.getInsertSql();
  389. List<String> tableColumns = Arrays.asList(taskSqlBean.getColumns().split(","));
  390. try {
  391. NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template);
  392. List<SqlParameterSource> params = new ArrayList<>();
  393. for (int i = 0; i < data.size(); i++) {
  394. Map<String, Object> param = new HashMap<>();
  395. JSONObject jo = data.getJSONObject(i);
  396. for (String col : tableColumns) {
  397. param.put(col, jo.get(col));
  398. }
  399. params.add(new MapSqlParameterSource(param));
  400. }
  401. SqlParameterSource[] sqlParams = new SqlParameterSource[params.size()];
  402. String deleteAllSql = "DELETE FROM " + taskSqlBean.getTargetTable();
  403. Map<String, ?> paramMap = new HashMap<>();
  404. nameJdbcTemplate.update(deleteAllSql, paramMap);
  405. log.info("Task[{}] delete all:{}", taskSqlBean.getId(), deleteAllSql);
  406. nameJdbcTemplate.batchUpdate(insertSql, params.toArray(sqlParams));
  407. log.info("Task[{}] update {} records", taskSqlBean.getId(), data.size());
  408. } catch (Exception e) {
  409. return -1;
  410. }
  411. log.info("Task[{}] doCommit cost:{}", taskSqlBean.getId(), System.currentTimeMillis() - start);
  412. return 0;
  413. }
  414. }