【问题标题】:Spring batch understanding chunk processingSpring批处理理解块处理
【发布时间】:2018-11-26 22:34:31
【问题描述】:

我在表中总共有 8 条记录,其中 6 条在 spring 批处理调用读取时符合 jpareader 的条件。现在我将页面大小和块大小设置为 1 进行测试。期望当作业运行时,它应该进行 6 次读取调用,然后它应该一个一个地处理它们并一个一个地写入它们。但实际上发生的情况是它只是调用 read 4 次(从日志中我可以看到像这样读取页面 0...1)并处理 4 ,其中一个被过滤掉,因为不符合写入条件,然后它只更新 3 条记录和作业标记为已成功完成。

就像这个作业需要运行 3 次才能处理所有记录。我们还不清楚。试图理解块处理,但我认为块只是聚合结果以进行写入调用..之后我希望读取和处理应该继续。

通过这个测试,我们对于为生产设置块大小设置什么感到困惑,如果我们设置为较大的数字,它将需要更多的内存(堆)。

【问题讨论】:

  • 页面大小是什么意思?你能发布你的配置的相关部分吗?

标签: spring-batch


【解决方案1】:

我看到了混乱。 JpaPagingItemReaderpageSize参数与面向块的步骤的chunkSize(或commit-interval)无关。

如果您使用JpaPagingItemReader 并在pageSize = 4 的面向块的步骤之外使用它,它将一次获取4 个项目(即每页)。现在这 4 个项目可以以 2 个为一组进行处理,每页将有两个块。 JpaPagingItemReader 将读取第一页(4 个项目的列表),然后在面向块的步骤每次调用 read 时从该列表返回项目。这是一个带有pageSize = 4chunkSize = 2totalItems = 8 和一个块监听器的示例:

ChunkListener.beforeChunk
Reading page 0
Reading item1
Reading item2
Writing item1
Writing item2
ChunkListener.afterChunk
ChunkListener.beforeChunk
Reading item3
Reading item4
Writing item3
Writing item4
ChunkListener.afterChunk
ChunkListener.beforeChunk
Reading page 1
Reading item5
Reading item6
Writing item5
Writing item6
ChunkListener.afterChunk
ChunkListener.beforeChunk
Reading item7
Reading item8
Writing item7
Writing item8
ChunkListener.afterChunk
ChunkListener.beforeChunk
Reading page 2
Reading item = null
ChunkListener.afterChunk

我使用此配置创建了一个sample app,因此您可以使用它并查看它是如何工作的。

希望这有助于理解与分页项目阅读器一起使用时面向块的处理模型。

【讨论】:

  • 哇,我以前没见过这个。现在我将使用您的示例应用程序。但无论如何,为什么在我的情况下它没有在一次作业运行中处理所有项目(有很多读取和块)。谢谢。
  • 我可以用您的样本重现我的问题。我又添加了一条名字为 Tom 的记录(从数据开始,第一条记录)。我将查询更改为“来自客户,其中 firstName='Tom'”并添加了 ItemProcessor,我将名字设置为大写。现在将页面大小和块大小更改为 1。如果我运行应用程序,它只会处理第一个记录并且作业正在完成,而第二个 Tom 没有转换为大写。我期待一个一个地处理两个汤姆。
  • 在开头的数据集中添加了一条记录“汤姆克鲁斯”+更新了查询/处理器,仍然按预期工作:MyJob.beforeChunk Reading page 0 Reading customer = Customer[id=1, firstName='Tom', lastName='Cruise'] Writing customer = Customer[id=1, firstName='TOM', lastName='CRUISE'] MyJob.afterChunk MyJob.beforeChunk Reading page 1 Reading customer = Customer[id=8, firstName='Tom', lastName='Hanks'] Writing customer = Customer[id=8, firstName='TOM', lastName='HANKS'] MyJob.afterChunk MyJob.beforeChunk Reading page 2 Reading customer = null MyJob.afterChunk
  • 我只能看到这些重大变化,如果可行,我将创建你的仓库的分支来提交。@Component public class MyItemProcessor implements ItemProcessor { @Override public Customer process(Customer客户)抛出异常 { customer.setFirstName(customer.getFirstName().toUpperCase());回头客; } } 并返回 steps.get("step") .chunk(CHUNK_SIZE) .reader(itemReader(null)) .processor(myItemProcessor) .writer(itemWriter()) .listener(chunkListener()) 。构建();
  • 块和页面大小也设置为 1
猜你喜欢
  • 1970-01-01
  • 2019-01-30
  • 2014-12-03
  • 2021-10-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多