【发布时间】:2021-02-14 22:55:36
【问题描述】:
下面是etl.xml中job的配置
<batch:step id="Produce">
<batch:partition partitioner="partitioner">
<batch:handler grid-size="${ partitioner.limit}"></batch:handler>
<batch:step>
<batch:tasklet>
<batch:chunk reader="Reader" writer="kafkaProducer"
commit-interval="20000">
</batch:chunk>
<batch:listeners>
<batch:listener ref="producingListener" />
</batch:listeners>
</batch:tasklet>
</batch:step>
</batch:partition>
</batch:step>
以下是用于向主题发送消息的代码。
ListenableFuture
listenableFuture.addCallback(new ListenableFutureCallback
@Override
public void onSuccess(SendResult<String, message > result) {
log.info("marking as SUCCESS");
manager.updateStatus(“someTable”, KafkaResponse.SUCCESS);
}
@Override
public void onFailure(Throwable ex) {
log.info("marking as FAILURE");
manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
}
}
一旦 kafkaTemplate.send(message) 被执行,监听器被调用并且作业完成。我看到 onSuccess(), onFailure() 在作业完成后被调用。 如何更改作业的配置,以便在收到来自 kafka 主题的确认后调用侦听器?
【问题讨论】:
-
您使用的是自定义 Kafka
ItemWriter? -
是的。它实现了spring批处理ItemWriter接口
-
对
kafkaTemplate.send的调用是异步的,因此注册的回调可能会在之后被调用(即在作业完成之后)。如果您希望写入操作在作业继续后续块之前完成(或者如果当前块是最后一个块,则完成),您应该阻止等待未来返回并检查其结果。 -
有没有办法使用 CompletableFuture 类?
-
您想要一些代码示例来说明您建议阻止等待未来的内容吗?这可能会有所帮助。
标签: apache-kafka spring-batch listener kafka-producer-api spring-batch-tasklet