|
- package com.xt.dsp.job;
- import java.sql.Connection;
- import java.sql.DatabaseMetaData;
- import java.sql.ResultSet;
- import java.sql.ResultSetMetaData;
- import java.sql.SQLException;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import org.quartz.Job;
- import org.quartz.JobExecutionContext;
- import org.quartz.JobExecutionException;
- import org.quartz.SchedulerContext;
- import org.quartz.SchedulerException;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.context.ApplicationContext;
- import org.springframework.dao.DataAccessException;
- import org.springframework.jdbc.core.ConnectionCallback;
- import org.springframework.jdbc.core.JdbcTemplate;
- import org.springframework.jdbc.core.ResultSetExtractor;
- import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
- import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
- import org.springframework.jdbc.core.namedparam.SqlParameterSource;
- import org.springframework.util.StringUtils;
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import com.xt.dsp.bean.TaskCacheBean;
- import com.xt.dsp.jdbc.JdbcTemplateUtils;
- import com.xt.dsp.model.DataSourceBean;
- import com.xt.dsp.model.JobBean;
- import com.xt.dsp.model.TaskBean;
- import com.xt.dsp.model.TaskSqlBean;
- import com.xt.dsp.service.CacheBeanService;
- import com.xt.dsp.service.DataSourceService;
- import com.xt.dsp.service.JobService;
- import com.xt.dsp.service.TaskService;
- import com.xt.dsp.service.TaskSqlService;
- /**
- * 工作调度控制
- *
- * @author yuanxd
- *
- */
- public class SqlSynJob implements Job {
- /** 日志记录 */
- private final Logger log = LoggerFactory.getLogger(SqlSynJob.class);
- /** 任务服务接口 */
- private TaskService taskService;
- /** sql抽取任务服务接口 */
- private TaskSqlService taskSqlService;
- /** 数据源服务接口 */
- private DataSourceService dataSourceService;
- /** 当前执行的任务对象 */
- private JobBean job = null;
- /** 任务缓存服务 */
- private CacheBeanService cacheBeanService;
- private JobService jobService = null;
- private void initServices(ApplicationContext appCtx) {
- this.taskService = appCtx.getBean(TaskService.class);
- this.taskSqlService = appCtx.getBean(TaskSqlService.class);
- this.dataSourceService = appCtx.getBean(DataSourceService.class);
- this.cacheBeanService = appCtx.getBean(CacheBeanService.class);
- this.jobService = appCtx.getBean(JobService.class);
- }
- @Override
- public void execute(JobExecutionContext context) throws JobExecutionException {
- long start = System.currentTimeMillis();
- SchedulerContext schCtx = null;
- try {
- schCtx = context.getScheduler().getContext();
- } catch (SchedulerException e) {
- log.error("get scheduler error:{}", e.getMessage());
- return;
- }
- initServices((ApplicationContext) schCtx.get("applicationContext"));
- String jobId = context.getJobDetail().getJobDataMap().getString(JobService.PARAM_JOB);
- job = jobService.findOne(jobId);
- if (null == job) {
- throw new IllegalArgumentException("未找到执行任务信息!");
- }
- if (StringUtils.isEmpty(job.getCode())) {
- throw new IllegalArgumentException("job code不可为空!");
- }
- List<TaskBean> tasks = taskService.selectByJobCode(job.getCode());
- for (TaskBean t : tasks) {
- // SQL 同步任务
- if (TaskBean.TYPE_SQL.equals(t.getType())) {
- doTaskSqlBeans(t);
- }
- }
- log.info("任务执行完成,耗时:{}", System.currentTimeMillis() - start);
- }
- /**
- * SQL同步任务处理
- *
- * @param taskSqlBeans
- */
- private void doTaskSqlBeans(TaskBean t) {
- // 执行缓存
- executeCache(t);
- List<TaskSqlBean> taskSqlBeans = taskSqlService.selectByTaskId(t.getId());
- if (taskSqlBeans == null || taskSqlBeans.size() == 0) {
- return;
- }
- for (TaskSqlBean ts : taskSqlBeans) {
- JSONObject result = getSqlResultJsonObject(ts);
- if (null == result || result.getJSONArray("data").size() == 0) {
- continue;
- }
- if (TaskBean.CACHE_USE.equals(t.getCacheUse())) {
- saveCache(t, ts, result);
- } else {
- runTaskSqlWithJdbcTemplate(t, ts, result);
- }
- }
- }
- /**
- * 执行缓存,执行成功返回0,否则返回其他
- *
- * @param t
- * @return
- */
- private int executeCache(TaskBean t) {
- long start = System.currentTimeMillis();
- List<TaskCacheBean> taskDatas = cacheBeanService.readCache(t.getCacheFolder());
- // 无缓存数据
- if (taskDatas == null || taskDatas.size() == 0) {
- t.setCacheUse(TaskBean.CACHE_UNUSE);
- taskService.updateByPrimaryKey(t);
- return 0;
- }
- Collections.sort(taskDatas);
- Map<String, TaskSqlBean> tsbs = new HashMap<>();
- for (TaskCacheBean tc : taskDatas) {
- String tsbid = tc.getTaskId();
- TaskSqlBean bean = null;
- if (tsbs.containsKey(tsbid)) {
- bean = tsbs.get(tsbid);
- } else {
- bean = taskSqlService.selectByPrimaryKey(tsbid);
- if (null == bean) {
- return -1;
- }
- tsbs.put(tsbid, bean);
- }
- if (null == bean) {
- return -1;
- }
- runTaskSqlWithJdbcTemplate(t, bean, JSONObject.parseObject((String) tc.getCacheData()));
- }
- t.setCacheUse(TaskBean.CACHE_UNUSE);
- taskService.updateByPrimaryKey(t);
- long cost = System.currentTimeMillis() - start;
- log.info("execute cache cost:" + cost);
- return 0;
- }
- private void saveCache(TaskBean t, TaskSqlBean ts, JSONObject result) {
- long start = System.currentTimeMillis();
- String fileName = ts.getId() + "-" + System.currentTimeMillis();
- TaskCacheBean cacheBean = new TaskCacheBean();
- cacheBean.setAvalidCacheFolder(t.getCacheFolder());
- cacheBean.setCacheData(result);
- cacheBean.setFileName(fileName);
- cacheBeanService.writeCache(cacheBean);
- cacheBeanService.writeCache(cacheBean);
- cacheBeanService.writeCache(cacheBean);
- t.setCacheUse(TaskBean.CACHE_USE);
- taskService.updateByPrimaryKey(t);
- log.info("Task[] saveCache cost:{}", ts.getId(), System.currentTimeMillis() - start);
- }
- /**
- * 获取sql检索结果
- *
- * @param taskSqlBean
- * @return
- */
- private JSONObject getSqlResultJsonObject(TaskSqlBean taskSqlBean) {
- long start = System.currentTimeMillis();
- String dsid = taskSqlBean.getSrcConn();
- DataSourceBean dsb = dataSourceService.selectByPrimaryKey(dsid);
- JdbcTemplate template = JdbcTemplateUtils.getJdbcTemplate(dsb);
- String sql = taskSqlBean.getQuerySql();
- JSONObject result = template.query(sql, new ResultSetExtractor<JSONObject>() {
- @Override
- public JSONObject extractData(ResultSet rs) throws SQLException, DataAccessException {
- JSONObject result = new JSONObject();
- ResultSetMetaData rsmd = rs.getMetaData();
- StringBuilder columns = new StringBuilder();
- for (int i = 0; i < rsmd.getColumnCount(); i++) {
- columns.append(rsmd.getColumnName(i + 1).toUpperCase());
- columns.append(";");
- }
- if (columns.length() > 0) {
- columns.deleteCharAt(columns.length() - 1);
- }
- JSONArray results = new JSONArray();
- while (rs.next()) {
- JSONObject resultJson = new JSONObject();
- for (String column : columns.toString().split(";")) {
- resultJson.put(column, rs.getObject(column));
- }
- results.add(resultJson);
- }
- if (results.size() == 0) {
- return null;
- }
- result.put("columns", columns.toString());
- result.put("data", results);
- return result;
- }
- });
- log.debug("Task[{}] result:{}", taskSqlBean.getId(), result);
- log.info("Task[{}] getSqlResultJsonObject cost:{}", taskSqlBean.getId(), (System.currentTimeMillis() - start));
- return result;
- }
- private void prepare(String srcCols, final TaskSqlBean taskSqlBean, JdbcTemplate template) {
- long start = System.currentTimeMillis();
- final Collection<String> srcColumnsCollection = Arrays.asList(srcCols.split(";"));
- Map<String, List<String>> targetTableColumns = template
- .execute(new ConnectionCallback<Map<String, List<String>>>() {
- @Override
- public Map<String, List<String>> doInConnection(Connection con)
- throws SQLException, DataAccessException {
- Map<String, List<String>> tColumns = new HashMap<String, List<String>>();
- DatabaseMetaData dmd = con.getMetaData();
- ResultSet rs = dmd.getColumns(null, dmd.getUserName(),
- taskSqlBean.getTargetTable().toUpperCase(), null);
- List<String> columns = new ArrayList<>();
- while (rs.next()) {
- String column = rs.getString("COLUMN_NAME").toUpperCase();
- // 只保留对应的列
- if (srcColumnsCollection.contains(column)) {
- columns.add(column);
- }
- }
- tColumns.put("columns", columns);
- try {
- rs.close();
- } catch (Exception e) {
- }
- rs = dmd.getPrimaryKeys(null, dmd.getUserName(), taskSqlBean.getTargetTable().toUpperCase());
- List<String> keys = new ArrayList<>();
- while (rs.next()) {
- String key = rs.getString(4);
- if (srcColumnsCollection.contains(key)) {
- keys.add(key);
- }
- }
- tColumns.put("keys", keys);
- try {
- rs.close();
- } catch (Exception e) {
- }
- return tColumns;
- }
- });
- List<String> tableColumns = targetTableColumns.get("columns");
- taskSqlBean.setColumns(StringUtils.collectionToDelimitedString(tableColumns, ","));
- List<String> keys = targetTableColumns.get("keys");
- StringBuilder insertSql = new StringBuilder();
- StringBuilder insertParaSql = new StringBuilder();
- StringBuilder updateSql = new StringBuilder();
- insertSql.append("INSERT INTO ").append(taskSqlBean.getTargetTable()).append("(");
- updateSql.append("UPDATE ").append(taskSqlBean.getTargetTable()).append(" SET ");
- for (int i = 0; i < tableColumns.size(); i++) {
- insertSql.append(tableColumns.get(i)).append(",");
- insertParaSql.append(":").append(tableColumns.get(i)).append(",");
- if (!keys.contains(tableColumns.get(i))) {
- updateSql.append(tableColumns.get(i)).append("=").append(":").append(tableColumns.get(i)).append(",");
- }
- }
- insertSql.deleteCharAt(insertSql.length() - 1);
- insertParaSql.deleteCharAt(insertParaSql.length() - 1);
- updateSql.deleteCharAt(updateSql.length() - 1);
- updateSql.append(" WHERE ");
- if (keys.size() > 0) {
- for (int i = 0; i < keys.size(); i++) {
- if (i != 0) {
- updateSql.append(" AND ");
- }
- updateSql.append(keys.get(i)).append(" = :").append(keys.get(i));
- }
- } else {
- updateSql = null;
- }
- insertSql.append(")VALUES(").append(insertParaSql);
- insertSql.append(")");
- taskSqlBean.setInsertSql(insertSql.toString());
- taskSqlBean.setUpdateSql(updateSql.toString());
- taskSqlBean.setRefreshSql(TaskSqlBean.REFRESH_SQL_FALSE);
- taskSqlService.updateByPrimaryKey(taskSqlBean);
- log.info("Task[]do {} sql task prepare finished, cost:{}", taskSqlBean.getId(),
- (System.currentTimeMillis() - start));
- }
- public static String listToString(List<String> stringList) {
- if (stringList == null) {
- return null;
- }
- StringBuilder result = new StringBuilder();
- boolean flag = false;
- for (String string : stringList) {
- if (flag) {
- result.append(",");
- } else {
- flag = true;
- }
- result.append(string);
- }
- return result.toString();
- }
- /**
- * 结果同步到目标源
- *
- * @param taskSqlBean
- * @param result
- * @return
- */
- private int runTaskSqlWithJdbcTemplate(final TaskBean t, final TaskSqlBean taskSqlBean, JSONObject result) {
- try {
- // 此处应在插入或更新失败时回滚,并把查询结果以文件方式保存,留待以后再执行此任务;
- String dsid = taskSqlBean.getTargetConn();
- DataSourceBean dsb = dataSourceService.selectByPrimaryKey(dsid);
- if (null == dsb) {
- throw new RuntimeException("数据源获取失败:" + dsid);
- }
- JdbcTemplate template = JdbcTemplateUtils.getJdbcTemplate(dsb);
- if (TaskSqlBean.REFRESH_SQL_TRUE.equals(taskSqlBean.getRefreshSql())) {
- prepare(result.getString("columns"), taskSqlBean, template);
- }
- JSONArray data = (JSONArray) result.get("data");
- return TaskSqlBean.MODE_ALL.equals(taskSqlBean.getMode()) ? doAllCommit(data, taskSqlBean, template)
- : doOneCommit(data, taskSqlBean, template);
- } catch (Exception e) {
- log.error("Task[{}] 执行插入失败:{}", taskSqlBean.getId(), e.getMessage());
- e.printStackTrace();
- saveCache(t, taskSqlBean, result);
- return -1;
- }
- }
- /**
- * 处理事务提交
- *
- * @param data
- * @param insertSql
- * @param updateSql
- * @return
- */
- private int doOneCommit(JSONArray data, TaskSqlBean taskSqlBean, JdbcTemplate template) {
- long start = System.currentTimeMillis();
- String insertSql = taskSqlBean.getInsertSql();
- String updateSql = taskSqlBean.getUpdateSql();
- List<String> tableColumns = Arrays.asList(taskSqlBean.getColumns().split(","));
- try {
- NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template);
- for (int i = 0; i < data.size(); i++) {
- Map<String, Object> param = new HashMap<>();
- JSONObject jo = data.getJSONObject(i);
- for (String col : tableColumns) {
- param.put(col, jo.get(col));
- }
- int count = nameJdbcTemplate.update(updateSql, new MapSqlParameterSource(param));
- if (count == 0) {
- nameJdbcTemplate.update(insertSql, param);
- }
- }
- log.info("Task[{}] update {} records", taskSqlBean.getId(), data.size());
- } catch (Exception e) {
- return -1;
- }
- log.info("Task[{}] doCommit cost:{}", taskSqlBean.getId(), System.currentTimeMillis() - start);
- return 0;
- }
- /**
- * 处理事务提交
- *
- * @param data
- * @param insertSql
- * @param updateSql
- * @return
- */
- private int doAllCommit(JSONArray data, TaskSqlBean taskSqlBean, JdbcTemplate template) {
- long start = System.currentTimeMillis();
- String insertSql = taskSqlBean.getInsertSql();
- List<String> tableColumns = Arrays.asList(taskSqlBean.getColumns().split(","));
- try {
- NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template);
- List<SqlParameterSource> params = new ArrayList<>();
- for (int i = 0; i < data.size(); i++) {
- Map<String, Object> param = new HashMap<>();
- JSONObject jo = data.getJSONObject(i);
- for (String col : tableColumns) {
- param.put(col, jo.get(col));
- }
- params.add(new MapSqlParameterSource(param));
- }
- SqlParameterSource[] sqlParams = new SqlParameterSource[params.size()];
- String deleteAllSql = "DELETE FROM " + taskSqlBean.getTargetTable();
- Map<String, ?> paramMap = new HashMap<>();
- nameJdbcTemplate.update(deleteAllSql, paramMap);
- log.info("Task[{}] delete all:{}", taskSqlBean.getId(), deleteAllSql);
- nameJdbcTemplate.batchUpdate(insertSql, params.toArray(sqlParams));
- log.info("Task[{}] update {} records", taskSqlBean.getId(), data.size());
- } catch (Exception e) {
- return -1;
- }
- log.info("Task[{}] doCommit cost:{}", taskSqlBean.getId(), System.currentTimeMillis() - start);
- return 0;
- }
- }
|