【问题标题】:How to call StepExcecutionListener in spring batch with kafka integration?如何在带有kafka集成的spring批处理中调用StepExecutionListener?
【发布时间】:2021-02-14 22:55:36
【问题描述】:

下面是etl.xml中job的配置

<batch:step id="Produce">

    <batch:partition partitioner="partitioner">

        <batch:handler grid-size="${ partitioner.limit}"></batch:handler>

        <batch:step>

            <batch:tasklet>

                <batch:chunk reader="Reader" writer="kafkaProducer"
                             commit-interval="20000">

                </batch:chunk>

                <batch:listeners>

                    <batch:listener ref="producingListener" />

                    
                </batch:listeners>

            </batch:tasklet>

        </batch:step>

    </batch:partition>

</batch:step>

以下是用于向主题发送消息的代码。

ListenableFuture> listenableFuture = kafkaTemplate.send(message);

listenableFuture.addCallback(new ListenableFutureCallback>() {

@Override
public void onSuccess(SendResult<String, message > result) {
    log.info("marking as SUCCESS");
    manager.updateStatus(“someTable”, KafkaResponse.SUCCESS);
}

@Override
public void onFailure(Throwable ex) {
    log.info("marking as FAILURE");
    manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
}

}

一旦 kafkaTemplate.send(message) 被执行,监听器被调用并且作业完成。我看到 onSuccess(), onFailure() 在作业完成后被调用。 如何更改作业的配置,以便在收到来自 kafka 主题的确认后调用侦听器?

【问题讨论】:

  • 您使用的是自定义 Kafka ItemWriter
  • 是的。它实现了spring批处理ItemWriter接口
  • kafkaTemplate.send 的调用是异步的,因此注册的回调可能会在之后被调用(即在作业完成之后)。如果您希望写入操作在作业继续后续块之前完成(或者如果当前块是最后一个块,则完成),您应该阻止等待未来返回并检查其结果。
  • 有没有办法使用 CompletableFuture 类?
  • 您想要一些代码示例来说明您建议阻止等待未来的内容吗?这可能会有所帮助。

标签: apache-kafka spring-batch listener kafka-producer-api spring-batch-tasklet


【解决方案1】:

您想要一些代码示例来说明您建议阻止等待未来的内容吗?这可能会有所帮助。

我没有尝试以下方法,但想法如下:

ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(message);
try {
    SendResult<String, Message> sendResult = future.get();
    // inspect sendResult
    log.info("marking as SUCCESS");
    manager.updateStatus(“someTable”, KafkaResponse.SUCCESS);
} catch (Exception e) {
     log.info("marking as FAILURE");
     manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
     // do something with e
}

【讨论】:

  • future.get() 给出了我发送给主题的消息。它没有告诉我消息是成功发送到kafka还是由于任何原因失败。在这里,我想从 kafka 获得认可,并基于此我想调用 updateKafkaStatus()。
  • 我使用了你的建议来使用 future.get() 并修改了如下代码,它按照我想要的方式工作。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-05-13
  • 2015-05-24
  • 1970-01-01
  • 1970-01-01
  • 2021-02-03
  • 1970-01-01
相关资源
最近更新 更多