|
@@ -23,7 +23,10 @@ import org.springframework.jdbc.core.ResultSetExtractor;
|
|
|
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
|
|
|
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
|
|
|
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
|
|
|
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.transaction.TransactionStatus;
|
|
|
+import org.springframework.transaction.support.DefaultTransactionDefinition;
|
|
|
import org.springframework.util.StringUtils;
|
|
|
|
|
|
import com.alibaba.fastjson.JSONArray;
|
|
@@ -345,8 +348,21 @@ public class TaskSqlServiceImpl implements TaskSqlService {
|
|
|
prepare(result.getString("columns"), taskSqlBean, template);
|
|
|
}
|
|
|
JSONArray data = (JSONArray) result.get("data");
|
|
|
- return TaskSqlBean.MODE_ALL.equals(taskSqlBean.getMode()) ? doAllCommit(data, taskSqlBean, template)
|
|
|
- : doOneCommit(data, taskSqlBean, template);
|
|
|
+ int runResult = -1;
|
|
|
+ DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(
|
|
|
+ template.getDataSource());
|
|
|
+ DefaultTransactionDefinition def = new DefaultTransactionDefinition();
|
|
|
+ TransactionStatus status = transactionManager.getTransaction(def);
|
|
|
+ try {
|
|
|
+ runResult = TaskSqlBean.MODE_ALL.equals(taskSqlBean.getMode())
|
|
|
+ ? doAllCommit(data, taskSqlBean, template) : doOneCommit(data, taskSqlBean, template);
|
|
|
+ transactionManager.commit(status);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("datasource[{}] commit error:{}", dsid, e.getMessage());
|
|
|
+ transactionManager.rollback(status);
|
|
|
+ runResult = -1;
|
|
|
+ }
|
|
|
+ return runResult;
|
|
|
} catch (Exception e) {
|
|
|
log.error("SqlTask[{}] 执行插入失败:{}", t.getCode(), e.getMessage());
|
|
|
e.printStackTrace();
|
|
@@ -368,26 +384,20 @@ public class TaskSqlServiceImpl implements TaskSqlService {
|
|
|
String insertSql = taskSqlBean.getInsertSql();
|
|
|
String updateSql = taskSqlBean.getUpdateSql();
|
|
|
List<String> tableColumns = Arrays.asList(taskSqlBean.getColumns().split(","));
|
|
|
- try {
|
|
|
- NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template);
|
|
|
- for (int i = 0; i < data.size(); i++) {
|
|
|
- Map<String, Object> param = new HashMap<>();
|
|
|
- JSONObject jo = data.getJSONObject(i);
|
|
|
- for (String col : tableColumns) {
|
|
|
- param.put(col, jo.get(col));
|
|
|
- }
|
|
|
- int count = nameJdbcTemplate.update(updateSql, new MapSqlParameterSource(param));
|
|
|
- if (count == 0) {
|
|
|
- nameJdbcTemplate.update(insertSql, param);
|
|
|
- }
|
|
|
- System.err.println(i + 1);
|
|
|
+ NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template);
|
|
|
+ for (int i = 0; i < data.size(); i++) {
|
|
|
+ Map<String, Object> param = new HashMap<>();
|
|
|
+ JSONObject jo = data.getJSONObject(i);
|
|
|
+ for (String col : tableColumns) {
|
|
|
+ param.put(col, jo.get(col));
|
|
|
+ }
|
|
|
+ int count = nameJdbcTemplate.update(updateSql, new MapSqlParameterSource(param));
|
|
|
+ if (count == 0) {
|
|
|
+ nameJdbcTemplate.update(insertSql, param);
|
|
|
}
|
|
|
- log.info("SqlTask[{}] update {} records", taskSqlBean.getId(), data.size());
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- return -1;
|
|
|
}
|
|
|
- log.info("SqlTask[{}] doCommit cost:{}", taskSqlBean.getId(), System.currentTimeMillis() - start);
|
|
|
+ log.info("SqlTask[{}] doCommit {} records cost:{}", taskSqlBean.getId(), data.size(),
|
|
|
+ System.currentTimeMillis() - start);
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
@@ -403,28 +413,25 @@ public class TaskSqlServiceImpl implements TaskSqlService {
|
|
|
long start = System.currentTimeMillis();
|
|
|
String insertSql = taskSqlBean.getInsertSql();
|
|
|
List<String> tableColumns = Arrays.asList(taskSqlBean.getColumns().split(","));
|
|
|
- try {
|
|
|
- NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template);
|
|
|
- List<SqlParameterSource> params = new ArrayList<>();
|
|
|
- for (int i = 0; i < data.size(); i++) {
|
|
|
- Map<String, Object> param = new HashMap<>();
|
|
|
- JSONObject jo = data.getJSONObject(i);
|
|
|
- for (String col : tableColumns) {
|
|
|
- param.put(col, jo.get(col));
|
|
|
- }
|
|
|
- params.add(new MapSqlParameterSource(param));
|
|
|
+ NamedParameterJdbcTemplate nameJdbcTemplate = new NamedParameterJdbcTemplate(template);
|
|
|
+ List<SqlParameterSource> params = new ArrayList<>();
|
|
|
+ for (int i = 0; i < data.size(); i++) {
|
|
|
+ Map<String, Object> param = new HashMap<>();
|
|
|
+ JSONObject jo = data.getJSONObject(i);
|
|
|
+ for (String col : tableColumns) {
|
|
|
+ param.put(col, jo.get(col));
|
|
|
}
|
|
|
- SqlParameterSource[] sqlParams = new SqlParameterSource[params.size()];
|
|
|
- String deleteAllSql = "DELETE FROM " + taskSqlBean.getTargetTable();
|
|
|
- Map<String, ?> paramMap = new HashMap<>();
|
|
|
- nameJdbcTemplate.update(deleteAllSql, paramMap);
|
|
|
- log.info("SqlTask[{}] delete all:{}", taskSqlBean.getId(), deleteAllSql);
|
|
|
- nameJdbcTemplate.batchUpdate(insertSql, params.toArray(sqlParams));
|
|
|
- log.info("SqlTask[{}] update {} records", taskSqlBean.getId(), data.size());
|
|
|
- } catch (Exception e) {
|
|
|
- return -1;
|
|
|
+ params.add(new MapSqlParameterSource(param));
|
|
|
}
|
|
|
- log.info("SqlTask[{}] doCommit cost:{}", taskSqlBean.getId(), System.currentTimeMillis() - start);
|
|
|
+ SqlParameterSource[] sqlParams = new SqlParameterSource[params.size()];
|
|
|
+ String deleteAllSql = "DELETE FROM " + taskSqlBean.getTargetTable();
|
|
|
+ Map<String, ?> paramMap = new HashMap<>();
|
|
|
+ nameJdbcTemplate.update(deleteAllSql, paramMap);
|
|
|
+ log.info("SqlTask[{}] delete all:{}", taskSqlBean.getId(), deleteAllSql);
|
|
|
+ nameJdbcTemplate.batchUpdate(insertSql, params.toArray(sqlParams));
|
|
|
+ log.info("SqlTask[{}] update {} records", taskSqlBean.getId(), data.size());
|
|
|
+ log.info("SqlTask[{}] doAllCommit {} records cost:{}", taskSqlBean.getId(), data.size(),
|
|
|
+ System.currentTimeMillis() - start);
|
|
|
return 0;
|
|
|
}
|
|
|
|