【问题标题】:How can I pause (turn on/off) stream processing w/ Spring Cloud Stream & Kafka Streams Binder?如何使用 Spring Cloud Stream 和 Kafka Streams Binder 暂停(打开/关闭)流处理?
【发布时间】:2020-06-16 14:42:46
【问题描述】:

我正在使用 Spring Cloud Stream (3.0.4.RELEASE) 和 Kafka-Streams binder (3.0.0.RELEASE)。我也在使用“函数式编程模型”(所以没有@StreamListener 等)。多么可爱的技术啊!

我需要能够在一天中的特定时间暂停流处理/消费新事件。这为事件创建了一个“停电期”。在“停电期”结束后,我将恢复流处理。因此,我希望能够使用代码暂停或打开/关闭 KStream 消费者。我似乎无法管理它!

到目前为止,我尝试了什么? - 使用 /actuator/bindings 端点启动/停止 kafka-streams 绑定。似乎这不适用于 kafka-streams binder,仅适用于 kafka binder :(。

任何帮助将不胜感激!谢谢!

【问题讨论】:

    标签: java apache-kafka-streams spring-cloud-stream


    【解决方案1】:

    Kafka Streams binder 不支持开箱即用的用于控制流处理的执行器绑定端点。这个用例已经出现before

    如果您可以在 Kafka 流处理器前添加额外的输入/输出主题(以及潜在的延迟,具体取决于多种因素),那么有一种方法可以解决这个问题。请参阅添加的 cmets here

    基本思想是第一个处理器是一个简单的直通处理器,它不使用 Kafka Streams,而是 Spring Cloud Stream 中基于标准消息传递的绑定器。在那里,您可以使用执行器绑定端点控制事件流。该处理器的输出成为 Kafkfa Streams 处理器的输入。

    再次重申,实现此模式不需要太多代码(可能 3 或 4 行),但根据应用程序的要求和吞吐量可能会影响性能。不过,如果这不是问题,您可以尝试这种模式。

    希望这会有所帮助。

    【讨论】:

    • 谢谢!在这种情况下,我可以使用 Kafka Binder 而不是 Kafka-Streams binder,所以理论上这很好......我实际上从来没有能够让带有 Kafka Binder 的 Spring Cloud Stream 工作:(您知道使用 JAAS 安全性、模式注册表、本机 avro 编码和函数式编程模型的示例配置吗?实际上花了几天时间试图让一个简单的 avro 使用者工作!感谢 SobyChacko
    • 您检查了示例存储库吗?那里提供了一大堆不同的示例应用程序,尽管不是您在单个应用程序中提到的所有内容。 github.com/spring-cloud/spring-cloud-stream-samples
    • 如果你有一些示例代码/应用程序要分享,我们可以看看它卡在哪里。
    • Yaya 我只是暂时放弃 Avro 和模式注册表并使用字符串来让它工作。我已经设法通过带有 POST 请求的“绑定可视化和控制”来暂停和启动消费者。您是否知道 Spring Cloud Stream 或类似配置中是否有让我在“暂停”状态下“启动”消费者的配置?我想在消费者暂停的情况下启动应用程序,然后有一个@PostConstruct 检查一天中的时间,如果它不在我的停电期间,我使用'bindingsEndpoint bean'以编程方式'恢复'消费者到.changeState( )。谢谢!如果是这样,那就太棒了!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-12-19
    • 1970-01-01
    相关资源
    最近更新 更多