【问题标题】:Kafka Tombstoning with a DSL KStream to KTable Transformation使用 DSL KStream 到 KTable 转换的 Kafka Tombstoning
【发布时间】:2020-03-06 14:12:01
【问题描述】:

我有一个KStream<String, X>,我基本上想将其转换为KTable<String, Y>

我能找到使用 DSL 实现此目的的唯一方法是使用映射、分组然后归约。

val stream: KStream<String, X> = ...
val table: KTable<String, Y> = stream
  .mapValues({ value -> toYOrNull(value)})
  .groupByKey(Grouped.with(Serdes.String(), ySerde))
  .reduce(
    {old: Y?, updated: Y? -> updated},
    Materialized.`as`<String, Y, KeyValueStore<Bytes, ByteArray>>("y-store")
      .withKeySerde(Serdes.String()
      .withValueSerde(ySerde)
  )

我希望这可以处理reduceupdated 的值为null 的情况,但是当我使用TopologyTestDriver 检查商店时,它似乎仍然具有旧版本。我做错了什么?

这是我的测试:

@Test
fun shouldDeleteFromTableWhenNull() {
  val store = testDriver.getKeyValueStore<String, Y?>("y-store")
  store.put("key", Y())

  inputTopic.pipeInput("key", anXThatMapsToANullY)

  assertThat(store.get("key")).isNull() // Fails as the old entry is still there
}

【问题讨论】:

    标签: java kotlin apache-kafka-streams


    【解决方案1】:

    值为 null 的记录将被忽略。

    根据文档,这是预期的行为:KGroupedStream::reduce(...) Java Doc

    通过分组键组合此流中记录的值。具有空键或值的记录将被忽略

    【讨论】:

      【解决方案2】:

      在即将发布的 Apache Kafka 2.5 版本中,添加了一个新运算符 KStream#toTable() 来解决此用例(参见 https://issues.apache.org/jira/browse/KAFKA-7658

      在旧版本中,您需要使用非空“代理删除值”来避免记录被删除,并让您的 reduce 函数在看到“代理删除值”时返回 null

      【讨论】:

      • 感谢您的回复,您知道什么时候发布吗?
      • 发布计划可以在 Apache Kafka wiki 中找到:cwiki.apache.org/confluence/pages/… -- devlopment 已关闭,RC 投票已经开始。因此,假设在测试期间没有发现更多的阻止程序,它应该很快就会发布。
      • @Eduardo 在旧版本中,您还可以通过使用 ProcessorContext 获取 KeyValueStore 直接使用 transformValues 访问和更新存储区 y-store
      • @TuyenLuong 你可以这样做,但你没有得到KTable 回复——因此这取决于你想要执行的其他操作——对于“交互式查询”,你的建议会起作用。但是,如果您想进一步处理数据,例如在表-表或流-表连接中,使用transformValues() 将无济于事。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-12-02
      • 1970-01-01
      • 1970-01-01
      • 2018-02-23
      • 2021-12-11
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多