【发布时间】:2013-01-16 05:23:42
【问题描述】:
我正在尝试在 Spring 集成中实现自定义入站通道适配器,以使用来自 apache kafka 的消息。基于spring集成示例,我发现我需要创建一个实现MessageSource接口的类并实现receive()方法,该方法将从kafka返回消费的Message。但是基于consumer example in kafka,KafkaStream 中的消息迭代器是由 BlockingQueue 支持的。因此,如果队列中没有消息,则线程将被阻塞。
那么实现 receive() 方法的最佳方法是什么,因为该方法可能会阻塞,直到有东西要消耗为止.. ?
从更一般的意义上说,我们如何为流式消息源实现自定义入站通道,该通道会阻塞直到有东西可供消费......?
【问题讨论】:
标签: java spring-integration apache-kafka