|
@@ -1,453 +0,0 @@
|
|
|
-package com.xt.dsp.serviceImpl;
|
|
|
-
|
|
|
-import java.sql.Connection;
|
|
|
-import java.sql.DatabaseMetaData;
|
|
|
-import java.sql.ResultSet;
|
|
|
-import java.sql.ResultSetMetaData;
|
|
|
-import java.sql.SQLException;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.Collection;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.dao.DataAccessException;
|
|
|
-import org.springframework.jdbc.core.ConnectionCallback;
|
|
|
-import org.springframework.jdbc.core.JdbcTemplate;
|
|
|
-import org.springframework.jdbc.core.ResultSetExtractor;
|
|
|
-import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
|
|
|
-import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
|
|
|
-import org.springframework.jdbc.core.namedparam.SqlParameterSource;
|
|
|
-import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
|
|
-import org.springframework.stereotype.Service;
|
|
|
-import org.springframework.transaction.TransactionStatus;
|
|
|
-import org.springframework.transaction.support.DefaultTransactionDefinition;
|
|
|
-import org.springframework.util.StringUtils;
|
|
|
-
|
|
|
-import com.alibaba.fastjson.JSONArray;
|
|
|
-import com.alibaba.fastjson.JSONObject;
|
|
|
-import com.xt.dsp.bean.TaskCacheBean;
|
|
|
-import com.xt.dsp.jdbc.JdbcTemplateUtils;
|
|
|
-import com.xt.dsp.mappers.TaskSqlBeanMapper;
|
|
|
-import com.xt.dsp.model.DataSourceBean;
|
|
|
-import com.xt.dsp.model.TaskBean;
|
|
|
-import com.xt.dsp.model.TaskSqlBean;
|
|
|
-import com.xt.dsp.service.CacheBeanService;
|
|
|
-import com.xt.dsp.service.DataSourceService;
|
|
|
-import com.xt.dsp.service.TaskService;
|
|
|
-import com.xt.dsp.service.TaskSqlService;
|
|
|
-
|
|
|
-@Service
|
|
|
-public class TaskSqlServiceImpl implements TaskSqlService {
|
|
|
- /** 日志记录 */
|
|
|
- private final Logger log = LoggerFactory.getLogger(TaskSqlServiceImpl.class);
|
|
|
- @Autowired
|
|
|
- private TaskSqlBeanMapper mapper;
|
|
|
- /** sql抽取任务服务接口 */
|
|
|
- @Autowired
|
|
|
- private TaskService taskService;
|
|
|
- /** 任务缓存服务 */
|
|
|
- @Autowired
|
|
|
- private CacheBeanService cacheBeanService;
|
|
|
- /** 数据源服务接口 */
|
|
|
- @Autowired
|
|
|
- private DataSourceService dataSourceService;
|
|
|
-
|
|
|
- public List<TaskSqlBean> selectByTaskCode(String taskCode) {
|
|
|
- return mapper.selectByTaskCode(taskCode);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 根据主键查找记录
|
|
|
- *
|
|
|
- * @param id
|
|
|
- * String
|
|
|
- * @return T
|
|
|
- */
|
|
|
- public TaskSqlBean selectByPrimaryKey(String id) {
|
|
|
- return mapper.selectByPrimaryKey(id);
|
|
|
- }
|
|
|
-
|
|
|
- public int updateByPrimaryKey(TaskSqlBean record) {
|
|
|
- return mapper.updateByPrimaryKey(record);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * SQL同步任务处理<br>
|
|
|
- * 事务处理
|
|
|
- *
|
|
|
- * @param taskSqlBeans
|
|
|
- */
|
|
|
- public int runTask(TaskBean t, String condition) {
|
|
|
- log.info("SqlTask[{}] running......", t.getCode());
|
|
|
- if (!TaskBean.TYPE_SQL.equals(t.getType())) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- long start = System.currentTimeMillis();
|
|
|
- // 执行缓存
|
|
|
- executeCache(t);
|
|
|
- List<TaskSqlBean> taskSqlBeans = selectByTaskCode(t.getCode());
|
|
|
- if (taskSqlBeans == null || taskSqlBeans.size() == 0) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
- for (TaskSqlBean ts : taskSqlBeans) {
|
|
|
- JSONObject result = getSqlResultJsonObject(ts, condition);
|
|
|
- if (null == result || result.getJSONArray("data").size() == 0) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (TaskBean.CACHE_USE.equals(t.getCacheUse())) {
|
|
|
- saveCache(t, ts, result);
|
|
|
- } else {
|
|
|
- runTaskSqlWithJdbcTemplate(t, ts, result);
|
|
|
- }
|
|
|
- }
|
|
|
- log.info("SqlTask[{}] finished,cost:{}", t.getCode(), System.currentTimeMillis() - start);
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 执行缓存,执行成功返回0,否则返回其他
|
|
|
- *
|
|
|
- * @param t
|
|
|
- * @return
|
|
|
- */
|
|
|
- private int executeCache(TaskBean t) {
|
|
|
- long start = System.currentTimeMillis();
|
|
|
- List<TaskCacheBean> taskDatas = cacheBeanService.readCache(t.getCacheFolder());
|
|
|
- // 无缓存数据
|
|
|
- if (taskDatas == null || taskDatas.size() == 0) {
|
|
|
- t.setCacheUse(TaskBean.CACHE_UNUSE);
|
|
|
- taskService.updateByPrimaryKey(t);
|
|
|
- return 0;
|
|
|
- }
|
|
|
- Collections.sort(taskDatas);
|
|
|
- Map<String, TaskSqlBean> tsbs = new HashMap<>();
|
|
|
- for (TaskCacheBean tc : taskDatas) {
|
|
|
- String tsbid = tc.getTaskId();
|
|
|
- TaskSqlBean bean = null;
|
|
|
- if (tsbs.containsKey(tsbid)) {
|
|
|
- bean = tsbs.get(tsbid);
|
|
|
- } else {
|
|
|
- bean = selectByPrimaryKey(tsbid);
|
|
|
- if (null == bean) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- tsbs.put(tsbid, bean);
|
|
|
- }
|
|
|
- if (null == bean) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- runTaskSqlWithJdbcTemplate(t, bean, tc.getCacheData());
|
|
|
- }
|
|
|
- t.setCacheUse(TaskBean.CACHE_UNUSE);
|
|
|
- taskService.updateByPrimaryKey(t);
|
|
|
- long cost = System.currentTimeMillis() - start;
|
|
|
- log.info("SqlTask[{}],execute cache cost:{}", t.getCode(), cost);
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- private Map<String, Object> getParamMapFromCondition(String condition) {
|
|
|
- Map<String, Object> param = new HashMap<>();
|
|
|
- if (null == condition) {
|
|
|
- return param;
|
|
|
- }
|
|
|
- String[] conditions = condition.split(";");
|
|
|
- int index = -1;
|
|
|
- for (String c : conditions) {
|
|
|
- index = c.indexOf('=');
|
|
|
- if (index < 0) {
|
|
|
- throw new IllegalArgumentException("条件不合法:" + c);
|
|
|
- }
|
|
|
- String name = c.substring(0, index);
|
|
|
- if (name.endsWith("_IN")) {
|
|
|
- param.put(name, Arrays.asList(c.substring(index + 1).split(",")));
|
|
|
- } else {
|
|
|
- param.put(name, c.substring(index + 1));
|
|
|
- }
|
|
|
- }
|
|
|
- return param;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取sql检索结果
|
|
|
- *
|
|
|
- * @param taskSqlBean
|
|
|
- * @return
|
|
|
- */
|
|
|
- private JSONObject getSqlResultJsonObject(TaskSqlBean taskSqlBean, String condition) {
|
|
|
- long start = System.currentTimeMillis();
|
|
|
- String dsid = taskSqlBean.getSrcConn();
|
|
|
- DataSourceBean dsb = dataSourceService.selectByPrimaryKey(dsid);
|
|
|
- NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(
|
|
|
- JdbcTemplateUtils.getJdbcTemplate(dsb));
|
|
|
- String sql = taskSqlBean.getQuerySql();
|
|
|
- JSONObject result = nameJdbcTemplate.query(sql, getParamMapFromCondition(condition),
|
|
|
- new ResultSetExtractor<JSONObject>() {
|
|
|
- @Override
|
|
|
- public JSONObject extractData(ResultSet rs) throws SQLException, DataAccessException {
|
|
|
- JSONObject result = new JSONObject();
|
|
|
- ResultSetMetaData rsmd = rs.getMetaData();
|
|
|
- StringBuilder columns = new StringBuilder();
|
|
|
- for (int i = 0; i < rsmd.getColumnCount(); i++) {
|
|
|
- columns.append(rsmd.getColumnName(i + 1).toUpperCase());
|
|
|
- columns.append(";");
|
|
|
- }
|
|
|
- if (columns.length() > 0) {
|
|
|
- columns.deleteCharAt(columns.length() - 1);
|
|
|
- }
|
|
|
- JSONArray results = new JSONArray();
|
|
|
- while (rs.next()) {
|
|
|
- JSONObject resultJson = new JSONObject();
|
|
|
- for (String column : columns.toString().split(";")) {
|
|
|
- resultJson.put(column, rs.getObject(column));
|
|
|
- }
|
|
|
- results.add(resultJson);
|
|
|
- }
|
|
|
- if (results.size() == 0) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- result.put("columns", columns.toString());
|
|
|
- result.put("data", results);
|
|
|
- return result;
|
|
|
- }
|
|
|
- });
|
|
|
- if (log.isInfoEnabled()) {
|
|
|
- int size = 0;
|
|
|
- if (result != null && result.getJSONArray("data") != null) {
|
|
|
- size = result.getJSONArray("data").size();
|
|
|
- }
|
|
|
- log.info("TaskSql[{}] getSqlResultJsonObject get {} records, cost:{}", taskSqlBean.getId(), size,
|
|
|
- (System.currentTimeMillis() - start));
|
|
|
- }
|
|
|
-
|
|
|
- return result;
|
|
|
- }
|
|
|
-
|
|
|
- private void prepare(String srcCols, final TaskSqlBean taskSqlBean, JdbcTemplate template) {
|
|
|
- long start = System.currentTimeMillis();
|
|
|
- final Collection<String> srcColumnsCollection = Arrays.asList(srcCols.split(";"));
|
|
|
- Map<String, List<String>> targetTableColumns = template
|
|
|
- .execute(new ConnectionCallback<Map<String, List<String>>>() {
|
|
|
- @Override
|
|
|
- public Map<String, List<String>> doInConnection(Connection con)
|
|
|
- throws SQLException, DataAccessException {
|
|
|
- Map<String, List<String>> tColumns = new HashMap<String, List<String>>();
|
|
|
- DatabaseMetaData dmd = con.getMetaData();
|
|
|
- ResultSet rs = dmd.getColumns(null, dmd.getUserName(),
|
|
|
- taskSqlBean.getTargetTable().toUpperCase(), null);
|
|
|
- List<String> columns = new ArrayList<>();
|
|
|
- while (rs.next()) {
|
|
|
- String column = rs.getString("COLUMN_NAME").toUpperCase();
|
|
|
- // 只保留对应的列
|
|
|
- if (srcColumnsCollection.contains(column)) {
|
|
|
- columns.add(column);
|
|
|
- }
|
|
|
- }
|
|
|
- tColumns.put("columns", columns);
|
|
|
- try {
|
|
|
- rs.close();
|
|
|
- } catch (Exception e) {
|
|
|
-
|
|
|
- }
|
|
|
- rs = dmd.getPrimaryKeys(null, dmd.getUserName(), taskSqlBean.getTargetTable().toUpperCase());
|
|
|
- List<String> keys = new ArrayList<>();
|
|
|
- while (rs.next()) {
|
|
|
- String key = rs.getString(4);
|
|
|
- if (srcColumnsCollection.contains(key)) {
|
|
|
- keys.add(key);
|
|
|
- }
|
|
|
- }
|
|
|
- tColumns.put("keys", keys);
|
|
|
- try {
|
|
|
- rs.close();
|
|
|
- } catch (Exception e) {
|
|
|
-
|
|
|
- }
|
|
|
- return tColumns;
|
|
|
- }
|
|
|
- });
|
|
|
- List<String> tableColumns = targetTableColumns.get("columns");
|
|
|
- taskSqlBean.setColumns(StringUtils.collectionToDelimitedString(tableColumns, ","));
|
|
|
- List<String> keys = targetTableColumns.get("keys");
|
|
|
- StringBuilder insertSql = new StringBuilder();
|
|
|
- StringBuilder insertParaSql = new StringBuilder();
|
|
|
- StringBuilder updateSql = new StringBuilder();
|
|
|
- insertSql.append("INSERT INTO ").append(taskSqlBean.getTargetTable()).append("(");
|
|
|
- updateSql.append("UPDATE ").append(taskSqlBean.getTargetTable()).append(" SET ");
|
|
|
- for (int i = 0; i < tableColumns.size(); i++) {
|
|
|
- insertSql.append(tableColumns.get(i)).append(",");
|
|
|
- insertParaSql.append(":").append(tableColumns.get(i)).append(",");
|
|
|
- if (!keys.contains(tableColumns.get(i))) {
|
|
|
- updateSql.append(tableColumns.get(i)).append("=").append(":").append(tableColumns.get(i)).append(",");
|
|
|
- }
|
|
|
- }
|
|
|
- insertSql.deleteCharAt(insertSql.length() - 1);
|
|
|
- insertParaSql.deleteCharAt(insertParaSql.length() - 1);
|
|
|
- updateSql.deleteCharAt(updateSql.length() - 1);
|
|
|
- updateSql.append(" WHERE ");
|
|
|
- if (keys.size() > 0) {
|
|
|
- for (int i = 0; i < keys.size(); i++) {
|
|
|
- if (i != 0) {
|
|
|
- updateSql.append(" AND ");
|
|
|
- }
|
|
|
- updateSql.append(keys.get(i)).append(" = :").append(keys.get(i));
|
|
|
- }
|
|
|
- } else {
|
|
|
- updateSql = null;
|
|
|
- }
|
|
|
-
|
|
|
- insertSql.append(")VALUES(").append(insertParaSql);
|
|
|
- insertSql.append(")");
|
|
|
- taskSqlBean.setInsertSql(insertSql.toString());
|
|
|
- taskSqlBean.setUpdateSql(String.valueOf(updateSql));
|
|
|
- taskSqlBean.setRefreshSql(TaskSqlBean.REFRESH_SQL_FALSE);
|
|
|
- updateByPrimaryKey(taskSqlBean);
|
|
|
- log.info("SqlTask[]do {} sql task prepare finished, cost:{}", taskSqlBean.getId(),
|
|
|
- (System.currentTimeMillis() - start));
|
|
|
- }
|
|
|
-
|
|
|
- public static String listToString(List<String> stringList) {
|
|
|
- if (stringList == null) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- StringBuilder result = new StringBuilder();
|
|
|
- boolean flag = false;
|
|
|
- for (String string : stringList) {
|
|
|
- if (flag) {
|
|
|
- result.append(",");
|
|
|
- } else {
|
|
|
- flag = true;
|
|
|
- }
|
|
|
- result.append(string);
|
|
|
- }
|
|
|
- return result.toString();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 结果同步到目标源
|
|
|
- *
|
|
|
- * @param taskSqlBean
|
|
|
- * @param result
|
|
|
- * @return
|
|
|
- */
|
|
|
- private int runTaskSqlWithJdbcTemplate(final TaskBean t, final TaskSqlBean taskSqlBean, JSONObject result) {
|
|
|
- try {
|
|
|
- // 此处应在插入或更新失败时回滚,并把查询结果以文件方式保存,留待以后再执行此任务;
|
|
|
- String dsid = taskSqlBean.getTargetConn();
|
|
|
- DataSourceBean dsb = dataSourceService.selectByPrimaryKey(dsid);
|
|
|
- if (null == dsb) {
|
|
|
- throw new RuntimeException("数据源获取失败:" + dsid);
|
|
|
- }
|
|
|
- JdbcTemplate template = JdbcTemplateUtils.getJdbcTemplate(dsb);
|
|
|
- if (TaskSqlBean.REFRESH_SQL_TRUE.equals(taskSqlBean.getRefreshSql())) {
|
|
|
- prepare(result.getString("columns"), taskSqlBean, template);
|
|
|
- }
|
|
|
- JSONArray data = (JSONArray) result.get("data");
|
|
|
- int runResult = -1;
|
|
|
- DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(
|
|
|
- template.getDataSource());
|
|
|
- DefaultTransactionDefinition def = new DefaultTransactionDefinition();
|
|
|
- TransactionStatus status = transactionManager.getTransaction(def);
|
|
|
- try {
|
|
|
- runResult = TaskSqlBean.MODE_ALL.equals(taskSqlBean.getMode())
|
|
|
- ? doAllCommit(data, taskSqlBean, template) : doOneCommit(data, taskSqlBean, template);
|
|
|
- transactionManager.commit(status);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("datasource[{}] commit error:{}", dsid, e.getMessage());
|
|
|
- // 回滚事务
|
|
|
- transactionManager.rollback(status);
|
|
|
- // 抛出异常到外层进行缓存处理
|
|
|
- throw e;
|
|
|
- }
|
|
|
- return runResult;
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("SqlTask[{}] 执行插入失败:{}", t.getCode(), e.getMessage());
|
|
|
- e.printStackTrace();
|
|
|
- saveCache(t, taskSqlBean, result);
|
|
|
- return -1;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 处理事务提交
|
|
|
- *
|
|
|
- * @param data
|
|
|
- * @param insertSql
|
|
|
- * @param updateSql
|
|
|
- * @return
|
|
|
- */
|
|
|
- private int doOneCommit(JSONArray data, TaskSqlBean taskSqlBean, JdbcTemplate template) {
|
|
|
- long start = System.currentTimeMillis();
|
|
|
- String insertSql = taskSqlBean.getInsertSql();
|
|
|
- String updateSql = taskSqlBean.getUpdateSql();
|
|
|
- List<String> tableColumns = Arrays.asList(taskSqlBean.getColumns().split(","));
|
|
|
- NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template);
|
|
|
- for (int i = 0; i < data.size(); i++) {
|
|
|
- Map<String, Object> param = new HashMap<>();
|
|
|
- JSONObject jo = data.getJSONObject(i);
|
|
|
- for (String col : tableColumns) {
|
|
|
- param.put(col, jo.get(col));
|
|
|
- }
|
|
|
- int count = nameJdbcTemplate.update(updateSql, new MapSqlParameterSource(param));
|
|
|
- if (count == 0) {
|
|
|
- nameJdbcTemplate.update(insertSql, param);
|
|
|
- }
|
|
|
- }
|
|
|
- log.info("SqlTask[{}] doCommit {} records cost:{}", taskSqlBean.getId(), data.size(),
|
|
|
- System.currentTimeMillis() - start);
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 处理事务提交
|
|
|
- *
|
|
|
- * @param data
|
|
|
- * @param insertSql
|
|
|
- * @param updateSql
|
|
|
- * @return
|
|
|
- */
|
|
|
- private int doAllCommit(JSONArray data, TaskSqlBean taskSqlBean, JdbcTemplate template) {
|
|
|
- long start = System.currentTimeMillis();
|
|
|
- String insertSql = taskSqlBean.getInsertSql();
|
|
|
- List<String> tableColumns = Arrays.asList(taskSqlBean.getColumns().split(","));
|
|
|
- NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template);
|
|
|
- List<SqlParameterSource> params = new ArrayList<>();
|
|
|
- for (int i = 0; i < data.size(); i++) {
|
|
|
- Map<String, Object> param = new HashMap<>();
|
|
|
- JSONObject jo = data.getJSONObject(i);
|
|
|
- for (String col : tableColumns) {
|
|
|
- param.put(col, jo.get(col));
|
|
|
- }
|
|
|
- params.add(new MapSqlParameterSource(param));
|
|
|
- }
|
|
|
- SqlParameterSource[] sqlParams = new SqlParameterSource[params.size()];
|
|
|
- String deleteAllSql = "DELETE FROM " + taskSqlBean.getTargetTable();
|
|
|
- Map<String, ?> paramMap = new HashMap<>();
|
|
|
- nameJdbcTemplate.update(deleteAllSql, paramMap);
|
|
|
- log.info("SqlTask[{}] delete all:{}", taskSqlBean.getId(), deleteAllSql);
|
|
|
- nameJdbcTemplate.batchUpdate(insertSql, params.toArray(sqlParams));
|
|
|
- log.info("SqlTask[{}] update {} records", taskSqlBean.getId(), data.size());
|
|
|
- log.info("SqlTask[{}] doAllCommit {} records cost:{}", taskSqlBean.getId(), data.size(),
|
|
|
- System.currentTimeMillis() - start);
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- private void saveCache(TaskBean t, TaskSqlBean ts, JSONObject result) {
|
|
|
- long start = System.currentTimeMillis();
|
|
|
- String fileName = ts.getId() + "-" + System.currentTimeMillis();
|
|
|
- TaskCacheBean cacheBean = new TaskCacheBean();
|
|
|
- cacheBean.setAvalidCacheFolder(t.getCacheFolder());
|
|
|
- cacheBean.setCacheData(result);
|
|
|
- cacheBean.setFileName(fileName);
|
|
|
- cacheBeanService.writeCache(cacheBean);
|
|
|
- t.setCacheUse(TaskBean.CACHE_USE);
|
|
|
- taskService.updateByPrimaryKey(t);
|
|
|
- log.info("SqlTask[] saveCache cost:{}", t.getCode(), System.currentTimeMillis() - start);
|
|
|
- }
|
|
|
-
|
|
|
-}
|