【发布时间】:2017-04-09 07:29:43
【问题描述】:
我有 2 个 Kafka 主题流式传输来自不同来源的完全相同的内容,因此我可以在其中一个来源出现故障时获得高可用性。 我正在尝试使用 Kafka Streams 0.10.1.0 将 2 个主题合并为 1 个输出主题,这样我就不会错过任何有关失败的消息,并且在所有源都启动时没有重复。
使用 KStream 的leftJoin 方法时,其中一个主题(次要主题)可以正常关闭,但是当主要主题关闭时,没有任何内容发送到输出主题。这似乎是因为,根据Kafka Streams developer guide,
KStream-KStream leftJoin 总是由来自主流的记录驱动
所以如果没有来自主流的记录,它不会使用来自辅助流的记录,即使它们存在。一旦主流重新联机,输出就会正常恢复。
我还尝试使用outerJoin(添加重复记录),然后转换为 KTable 和 groupByKey 以消除重复,
KStream mergedStream = stream1.outerJoin(stream2,
(streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,
JoinWindows.of(2000L))
mergedStream.groupByKey()
.reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore))
.toStream((key,value) -> value)
.to(outputStream)
但我仍然偶尔会得到重复。我还经常使用commit.interval.ms=200 让 KTable 发送到输出流。
处理此合并以从多个相同的输入主题中获得一次性输出的最佳方法是什么?
【问题讨论】:
-
一般来说,我会推荐Processor API来解决这个问题。您也可以尝试切换到当前的
trunk版本(不确定这是否适合您)。连接已重新设计,这可能会解决您的问题:cwiki.apache.org/confluence/display/KAFKA/… 新的连接语义将包含在 Kafka0.10.2中,其目标发布日期为 2017 年 1 月 (cwiki.apache.org/confluence/display/KAFKA/…)。 -
@MatthiasJ.Sax 我切换到主干,看起来
leftJoin现在的行为类似于 KStream-KStream 连接的outerJoin,所以我想我会回到 10.1 语义.我现在尝试的是创建一个输出空值的假流,我将在 leftJoin 中将其用作主要的,以前是主要的,并在 leftJoin 中使用该合并与辅助。我希望这将导致在主流中始终具有值,即使我的主流关闭(因为我只会从第一个 leftJoin 获得 null)。 -
新的
leftJoin确实会像旧的outerJoin一样从双方触发(我想这就是您所说的“看起来leftJoin 现在表现得像outerJoin”的意思?)——这个比旧的leftJoin更接近 SQL 语义——但leftJoin与outerJoin仍然不同:如果右侧触发并且没有找到连接伙伴,它会丢弃记录并且不会发出任何结果。 -
我还想知道您的密钥是如何分布的,以及同一个密钥在单个主题中使用的频率。也许您可以只使用一个同时使用两个主题的 KTable 来帮助消除......但如前所述,我强烈建议使用处理器 API!
-
啊,好吧,我没有想到新的
leftJoin和outerJoin之间的区别。我确实最终使用了处理器 API 和您对另一个问题 (stackoverflow.com/a/40837977/6167108) 的回答,它运行良好。您可以在此处将其添加为答案,我会接受。谢谢!
标签: java high-availability apache-kafka-streams