【问题标题】:Kafka Streams join by key with complex conditionKafka Streams 键加入,条件复杂
【发布时间】:2019-12-27 03:50:10
【问题描述】:

我正在尝试通过键将KStreamGlobalKTable 加入,但具有特定的逻辑。

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, Integer> stream = builder.stream(inputTopic1); // key = "ABC"
    GlobalKTable<String, Integer> table = builder.globalTable(inputTopic2); // key = "ABC"

    stream.join(table, // join first by "ABC" = "ABC", then by "AB" = "AB", then by "A" = "A"
            (key, value) -> key,
            (valueLeft, valueRigth) -> {/* identify by which condition the join was performed */});

例如,如果 key = "ABC",那么:

  • 首先,按完整键加入 - 即“ABC”=“ABC”
  • 然后,如果未加入,则通过前两个符号加入(删除一个符号) - 即“AB”=“AB”
  • 最后,尝试只加入一个符号 - 即“A” = “A”

此外,还需要知道执行连接的条件 - 例如,按 3 个字母/按 2 个字母/按 1 个字母。

问题是,有可能还是我应该寻找解决方法?例如,使用相应的键(带有“ABC”键的表,一个带有“AB”键的表和一个带有“A”键的表)复制 GlobalKTable 并执行 3 个单独的连接?或者有什么其他建议?

提前致谢!

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams kafka-join


    【解决方案1】:

    对多个表使用一系列左连接是可能的(如果您知道经常想尝试连接)。如果连接成功,则跳过下一个连接。使用leftJoin()branch() 的组合应该允许您在每次加入后将流拆分为“加入”和“重试”。最后,您可以根据需要将不同的结果流merge() 放在一起。

    【讨论】:

      猜你喜欢
      • 2022-08-24
      • 1970-01-01
      • 2019-12-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-07-26
      相关资源
      最近更新 更多