【问题标题】:Spring batch - JdbcPagingItemReader, SQLServerException: The column name XYZ is not validSpring batch - JdbcPagingItemReader, SQLServerException: 列名 XYZ 无效
【发布时间】:2020-11-03 05:19:14
【问题描述】:

我有一个从 SQL Server 数据库中读取文档列表的工作。文档需要处于某种状态并按列status_updated_time 排序。 我想阅读document.id,然后在作业处理器中将其处理为Driving Query Based ItemReaders
writer 中的列状态已更改,因此由于this problem,我无法使用JpaPagingItemReader
我使用了JdbcPagingItemReader,但在按status_updated_time 排序时出错。 然后我尝试添加和id 进行排序,但这没有帮助。
我想得到的查询是:

SELECT id 
FROM document 
WHERE status IN (0, 1, 2)
ORDER BY status_updated_time ASC, id ASC 

我的读者:

@StepScope
@Bean
private ItemReader<Long> statusReader() {
JdbcPagingItemReader<Long> reader = new JdbcPagingItemReader<>();
...
reader.setRowMapper(SingleColumnRowMapper.newInstance(Long.class));
...

Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("status_updated_time", Order.ASCENDING);
sortKeys.put("id", Order.ASCENDING);

SqlServerPagingQueryProvider queryProvider = new SqlServerPagingQueryProvider();
queryProvider.setSelectClause(SELECT_CLAUSE);
queryProvider.setFromClause(FROM_CLAUSE);
queryProvider.setWhereClause(WHERE_CLAUSE);
queryProvider.setSortKeys(sortKeys);

reader.setQueryProvider(queryProvider);
...
return reader;
}

常量在哪里:

private static final String SELECT_CLAUSE = "id";
private static final String FROM_CLAUSE = "document";
private static final String WHERE_CLAUSE = "status IN (0, 1, 2) ";

执行作业时出现错误:

org.springframework.dao.TransientDataAccessResourceException: StatementCallback; SQL [SELECT TOP 10 id FROM document WHERE status IN (0, 1, 2) ORDER BY id ASC, status_updated_time ASC]; The column name status_updated_time is not valid.; nested exception is com.microsoft.sqlserver.jdbc.SQLServerException: The column name status_updated_time is not valid.
at org.springframework.jdbc.support.SQLStateSQLExceptionTranslator.doTranslate(SQLStateSQLExceptionTranslator.java:110)
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:72)
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:81)
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:81)
at org.springframework.jdbc.core.JdbcTemplate.translateException(JdbcTemplate.java:1443)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:388)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:452)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:462)
at org.springframework.batch.item.database.JdbcPagingItemReader.doReadPage(JdbcPagingItemReader.java:210)
at org.springframework.batch.item.database.AbstractPagingItemReader.doRead(AbstractPagingItemReader.java:108)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:92)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94)
at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:87)
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:119)

我在堆栈溢出 (this...) 上看到了一些关于 The column name XYZ is not valid 的问题,但在我需要按另一列排序的情况下,我没有看到任何可行的方法。
另一个问题是对列进行排序。
无论我是先将status_updated_timeid 添加到生成脚本中的地图排序始终是ORDER BY id ASC, status_updated_time ASC

编辑: 阅读this question,特别是这一行:

JdbcPagingItemReader 在这里假设排序键和 select 子句中的列被调用完全相同

我意识到我在结果集中需要列status_updated_time,所以我重构了:

private static final String SELECT_CLAUSE = "id, status_updated_time";
...
queryProvider.setSelectClause(SELECT_CLAUSE);
...
reader.setRowMapper(
    (rs, i) -> {
      Document document = new Document();
      document.setId(rs.getLong(1));
      document.setStatusUpdatedTime(rs.getObject(2, Timestamp.class));
      return document;
    }
);

现在应用程序可以编译并且作业可以运行了。

但是,排序问题保持不变。我不能先订购status_updated_time,然后订购idid 永远是第一位的。
我试图从排序中删除id 并遇到了另一个问题。 在测试环境中。我有 1600 行要处理。我的工作流程行并将status_updated_time 更新为now()。当作业开始处理时,他并没有在 1600 处停止,而是继续处理,因为每一行都有新的 status_updated_time 并且读者认为它是新行,并无休止地继续处理。
当仅按id 排序时,作业处理了 1600 行然后停止。
因此,由于排序问题,我似乎无法使用JdbcPagingItemReader
而且我想要一些可以并行运行的阅读器来加快这项工作(每天每小时运行大约 20 分钟)。
有什么建议吗?

【问题讨论】:

  • 从您的堆栈跟踪中,这是 Spring Batch 生成的查询:SELECT TOP 10 id FROM document WHERE status IN (0, 1, 2) ORDER BY id ASC, status_updated_time ASC。您是否尝试过使用 sql 客户端运行此查询并查看它是否按预期工作?
  • 是的,我试过了。 sql客户端一切正常。看起来我需要在 select 子句中使用列 status_updated_time。我找到了解决方法(请参阅问题的编辑),但由于按status_updated_time 排序而面临另一个问题,这是这项工作的强制性要求。
  • 所以如果我理解正确 same 查询从您的 sql 客户端正确运行,但在您的 Spring Batch 作业中失败,对吗?您是否对 javadoc 中提到的排序键有唯一键约束(即使这仅用于可重新启动性)?从您的更新中,我了解到您的目标是提高性能。我建议你使用分区。
  • 是的,该查询在 sql 客户端中正确运行,并在编译 Spring Batch 应用程序时报告错误。由status_updated_time asc, id asc 订购将是唯一的(我需要真正的数据库约束还是仅订单列在操作上是唯一的?)。
    是的,作业已经存在并且使用 JpaPagingItemReader 单线程工作。我需要优化和加快执行速度。
    分区的好主意。谢谢你。我将尝试实现这一点,并将在此处发布结果。
  • 该列上应该有一个真正的数据库唯一键约束(这就是 javadoc 提到的)。对于分区,这可能会有所帮助:github.com/spring-projects/spring-batch/blob/master/….

标签: java spring-batch


【解决方案1】:

我要感谢 Mahmoud 监控 Spring Batch 问题并提供帮助。但是他的建议对我没有帮助,所以我使用了不同的方法。 我使用临时(辅助)表为主要步骤执行准备数据,并且在主要步骤中,阅读器正在从该表中读取数据。

第一步将删除帮助表:

@Bean
private Step dropHelpTable() {
  return stepBuilderFactory
    .get(STEP_DROP_HELP_TABLE)
    .transactionManager(cronTransactionManager)
    .tasklet(dropHelpTableTasklet())
    .build();
}

private Tasklet dropHelpTableTasklet() {
  return (contribution, chunkContext) -> {
    jdbcTemplate.execute(DROP_SCRIPT);
    return RepeatStatus.FINISHED;
  };
}

private static final String STEP_DROP_HELP_TABLE = "dropHelpTable";
private static final String DROP_SCRIPT = "IF EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.TABLES "
  + "WHERE TABLE_NAME = 'query_document_helper') "
  + "BEGIN "
  + " DROP TABLE query_document_helper "
  + "END";

第二步将准备数据。插入稍后处理的文档的id:

@Bean
private Step insertDataToHelpTable() {
  return stepBuilderFactory
    .get(STEP_INSERT_HELP_TABLE)
    .transactionManager(cronTransactionManager)
    .tasklet(insertDataToHelpTableTasklet())
    .build();
}

private Tasklet insertDataToHelpTableTasklet() {
  return (contribution, chunkContext) -> {
    jdbcTemplate.execute("SELECT TOP " + limit + " id " + INSERT_SCRIPT);
    return RepeatStatus.FINISHED;
  };
}

private static final String STEP_INSERT_HELP_TABLE = "insertHelpTable";
private static final String INSERT_SCRIPT = "INTO query_document_helper "
  + "FROM dbo.document "
  + "WHERE status IN (0, 1, 2) "
  + "ORDER BY status_updated_time ASC";

@Value("${cron.batchjob.queryDocument.limit}")
private Integer limit;

在此之后,我拥有了将在一个作业执行中使用的所有数据,因此不再需要按 status_updated_time 排序(条件不是在此作业执行中处理最年轻的文档,而是在稍后执行时当它们变得最旧时)。 然后在下一步中我使用普通阅读器。

@Bean
private Step queryDocumentStep() {
  return stepBuilderFactory
    .get(STEP_QUERY_NEW_DOCUMENT_STATUS)
    .transactionManager(cronTransactionManager)
    .<Long, Document>chunk(chunk)
    .reader(documentReader())
    ...
    .taskExecutor(multiThreadingTaskExecutor.threadPoolTaskExecutor())
    .build();
}

@StepScope
@Bean
private ItemReader<Long> documentReader() {
  JdbcPagingItemReader<Long> reader = new JdbcPagingItemReader<>();
  reader.setDataSource(coreBatchDataSource);
  reader.setMaxItemCount(limit);
  reader.setPageSize(chunk);
  ...
  Map<String, Order> sortKeys = new HashMap<>();
  sortKeys.put("id", Order.ASCENDING);

  SqlServerPagingQueryProvider queryProvider = new SqlServerPagingQueryProvider();
  queryProvider.setSelectClause(SELECT_CLAUSE);
  queryProvider.setFromClause(FROM_CLAUSE);
  queryProvider.setSortKeys(sortKeys);

  reader.setQueryProvider(queryProvider);
  ...
  return reader;
}

private static final String STEP_QUERY_NEW_DOCUMENT_STATUS = "queryNewDocumentStatus";
private static final String SELECT_CLAUSE = "id";
private static final String FROM_CLAUSE = "query_archive_document_helper";

工作看起来像这样:

@Bean
public Job queryDocumentJob() {
return jobBuilderFactory
    .get(JOB_QUERY_DOCUMENT)
    .incrementer(new RunIdIncrementer())
    .start(dropHelpTable())
    .next(insertDataToHelpTable())
    .next(queryDocumentStep())
    .build();
}

private static final String JOB_QUERY_DOCUMENT = "queryDocument";

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-09-02
    • 2021-09-12
    • 2018-03-29
    • 2018-01-25
    • 1970-01-01
    • 2019-08-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多