【问题标题】:Kafka streams : how to handle dynamic conditions in a filter?Kafka 流:如何处理过滤器中的动态条件?
【发布时间】:2019-09-13 09:45:40
【问题描述】:

有没有办法处理 kafka 流中的动态条件? 我需要通过用户更改的值列表过滤我的流数据,但是这个值列表不在流中,它们可以通过 http 调用获得。

stream(myTopic)
    .filter(isDataOK())
    ...

private Predicate<> isDataOK() {
     http_call;
     return predicate_value_based_on_http_answer;
}

是否可以在 kafka 流处理期间处理 http 调用,还是我需要其他流中的条件数据?

提前致谢, 问候

【问题讨论】:

  • 有可能,但这不是正确的方法。 http 调用可能繁重、抛出异常等,它可能会减慢处理速度甚至停止处理。

标签: java filter apache-kafka stream


【解决方案1】:

我对动态流配置有类似的要求。为每个消息处理调用 http 不是一个好主意,因为它很耗时。最好在处理组件中缓存值并定期更新它们,例如每小时一次。您还应该考虑 http 调用不可用时的场景。

在另一个项目中,我从另一个 Kafka 主题异步接收动态配置,这对于流处理来说效果更好、更自然。

【讨论】:

  • 您好,感谢您的回答,我分享您的意见,我将创建一个日程安排功能作为快速解决方案,并使用我的设置创建另一个 kafka 主题以实现长期解决方案。但事实上,我也对没有时间表或提供主题的技术解决方案感兴趣,只是为了了解如何做到这一点。
猜你喜欢
  • 2019-10-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-10-20
  • 2020-02-11
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多