【问题标题】:Time semantics between KStream and KTableKStream 和 KTable 之间的时间语义
【发布时间】:2019-08-14 15:56:34
【问题描述】:

我正在尝试构建以下拓扑:

  1. 使用 Debezium 连接器,我拉出 2 个表(我们称它们为表 A 和 DA)。根据 DBZ,存储表行的主题具有结构 { before: "...", after: "..." }。

  2. 我的拓扑结构的第一步是从这两个“表”主题中创建“干净”的 KStream。那里的子拓扑大致如下:

private static KStream<String, TABLE_A.Value> getTableARowByIdStream(
    StreamsBuilder builder, Properties streamsConfig) {
  return builder
      .stream("TABLE_A", Consumed.withTimestampExtractor(Application::getRowDate))
      .filter((key, envelope) -> [ some filtering condition ] )
      .map((key, envelope) -> [ maps to TABLE_A.Value ] )
      .through(tableRowByIdTopicName);
}
  1. 请注意,我明确分配了记录时间,因为表行在最初发布后将被 CDC'ed “年”。该函数目前正在做的是伪造从 2010-01-01 开始的时间,并使用AtomicInteger,为每个消耗的实体增加 1 毫秒。它对表 A 执行此操作,但对 DA 不执行此操作(我稍后会解释原因)。

  2. 拓扑的第 2 阶段是基于表 A 的“已清理”主题构建 1 个 KTable,如下所示:

private static KTable<String, EntityInfoList> getEntityInfoListById(
    KStream<String, TABLE_A.Value> tableAByIdStream) {
  return tableAByIdStream
      .map((key, value) -> [ some mapping ] )
      .groupByKey()
      .aggregate(() -> [ builds up a EntityInfoList object ] ));
}
  1. 最后,准备好 KTable,我将通过 DA 加入 KStream,如下所示:
private static KStream<String, OutputTopicEntity> getOutputTopicEntityStream(
    KStream<String, Table_DA.Value> tableDAStream,
    KTable<String, EntityInfoList> tableA_KTable) {

  KStream<String, Table_DA>[] branches = tableDAStream.branch(
      (key, value) -> [ some logic ],
      (key, value) -> true);

  KStream<String, OutputTopicEntity> internalAccountRefStream = branches[0]
      .join(
          tableA_KTable,
          (streamValue, tableValue) -> [ some logic to build a list of OutputTopicEntity ])
      .flatMap((key, listValue) -> [ some logic to flatten it ]));

   [ similar logic with branch[1] ]
}

我的问题是,尽管我在“伪造”来自 Table_A 主题的记录(我已经验证它们使用 kafkacat 引用 2010/01/01)和 Table_DA 中的条目(流端)的时间加入的)在今天'2019/08/14'左右具有时间戳),Kafka Streams 似乎不会一直读取 Table_DA KStream 中的任何条目,直到它将 Table_A 中的所有记录摄取到 KTable 中。

因此,我没有预期的所有“加入命中”,而且它也是不确定的。我对What are the differences between KTable vs GlobalKTable and leftJoin() vs outerJoin()?这句话的理解正好相反:

对于流表连接,Kafka Stream 对齐基于记录时间戳排序的记录处理。因此,表的更新与您流的记录保持一致。

到目前为止,我的经验是这没有发生。我还可以很容易地看到我的应用程序在消耗完 Table_DA 流中的所有条目后如何继续通过 Table_A 主题方式搅动(它恰好小了 10 倍)。

我做错了吗?

【问题讨论】:

  • 你真的应该让代码更小,因为它太多难以消化,而且大多数时候与 Kafka Streams 几乎没有关系。目前看起来像“为什么这段代码不起作用?”问题,可以关闭。
  • @JacekLaskowski 我不同意。我只是为上下文提供完整的拓扑结构,并帮助知道时间语义应该是什么的人理解全貌。我绝对不会问为什么这段代码不工作,因为它工作得非常好,除了流 vs ktable 消耗优先级

标签: apache-kafka-streams


【解决方案1】:

时间戳同步最好在 2.1.0 版本之前进行(参见https://issues.apache.org/jira/browse/KAFKA-3514)。

从 2.1.0 开始,时间戳严格同步。但是,如果一个输入没有任何数据,Kafka Streams 将按照KIP-353 中的描述“强制”处理以避免永远阻塞。如果您有突发输入,并且如果一个输入没有数据,并且想要在一段时间内“阻止”处理,您可以增加配置参数 max.task.idle.ms(默认为 0),如 2.1.0 通过 KIP-353 介绍的那样。

【讨论】:

    猜你喜欢
    • 2021-12-11
    • 1970-01-01
    • 2019-10-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-23
    • 1970-01-01
    • 2020-10-17
    • 1970-01-01
    相关资源
    最近更新 更多