【问题标题】:Implementing Spring Integration InboundChannelAdapter for Kafka为 Kafka 实现 Spring Integration InboundChannelAdapter
【发布时间】:2013-01-16 05:23:42
【问题描述】:

我正在尝试在 Spring 集成中实现自定义入站通道适配器,以使用来自 apache kafka 的消息。基于spring集成示例,我发现我需要创建一个实现MessageSource接口的类并实现receive()方法,该方法将从kafka返回消费的Message。但是基于consumer example in kafka,KafkaStream 中的消息迭代器是由 BlockingQueue 支持的。因此,如果队列中没有消息,则线程将被阻塞。

那么实现 receive() 方法的最佳方法是什么,因为该方法可能会阻塞,直到有东西要消耗为止.. ?

从更一般的意义上说,我们如何为流式消息源实现自定义入站通道,该通道会阻塞直到有东西可供消费......?

【问题讨论】:

    标签: java spring-integration apache-kafka


    【解决方案1】:

    receive() 方法可以阻塞(只要底层操作正确响应中断的线程),并且从入站通道适配器的角度来看,根据底层源的期望,它可能更适合使用固定延迟触发器。例如,“长轮询”可以在提供非常小的延迟值时模拟事件驱动的行为。

    我们的 JMS 轮询 MessageSource 实现中也有类似的情况。在那里,底层行为由 JmsTemplate 的 receive() 方法之一处理。 JmsTemplate 本身允许配置超时值。这意味着,例如,您可以选择最多阻塞 5 秒,然后在每个阻塞接收调用之间有一个非常短的延迟触发。或者,您可以指定无限期的接收超时。该决定最终取决于对底层资源、消息吞吐量等的期望。

    另外,我想让您知道我们正在自己探索 Kafka 适配器。也许您想在 spring-integration-extensions 存储库中进行协作?

    问候, 标记

    【讨论】:

    • 谢谢@mfisher,我如何联系您讨论与spring-integration-extensions 存储库合作的事宜..?
    猜你喜欢
    • 2017-12-14
    • 1970-01-01
    • 1970-01-01
    • 2020-07-25
    • 1970-01-01
    • 1970-01-01
    • 2015-12-12
    • 2018-09-19
    • 2016-07-27
    相关资源
    最近更新 更多