【问题标题】:How to migrate a KTable-KTable leftJoin with explicit state store from Kafka 0.11 to Kafka 2.0?如何将具有显式状态存储的 KTable-KTable leftJoin 从 Kafka 0.11 迁移到 Kafka 2.0?
【发布时间】:2018-11-28 18:26:26
【问题描述】:

由于 API 从 Kafka 0.11 更改为 Kafka 2.0,我们面临一个问题。在我们基于 0.11 的 Kafka 流应用程序中,我们在两个使用命名状态存储的 KTables[String,Something] 之间建立了连接:

val joinedTable = {
    myClass1Table.leftJoin[MyClass1,MyClass2](myClass2Table,new
        MyJoiner, new MySerde[MyClass1Class2],"my-join-store")
}

但是,当迁移到 2.0 时,明确提供状态存储的唯一方法如下:

val joinedTable = {
    val materialized = Materialized.as[String,MyClass1,KeyValueStore[Bytes,Array[Byte]]]("join-store").withValueSerde(new Serde[MyClass1Class2])
    myClass1Table.leftJoin[MyClass1,MyClass2](myClass2Table,new
        MyJoiner,materialized)
}

使用此代码,替换生产中的应用实例失败,因为 Kafka 0.11 中的状态存储可能使用了 myTable1 和 myTable2 的密钥 serde。

org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
        at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:174)

除了执行 kafka-streams-application-reset 之外,还有其他更好的方法来处理这个问题吗?

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    您可以显式传入密钥 serde:

    val joinedTable = {
      val materialized = Materialized
        .as[String,MyClass1,KeyValueStore[Bytes,Array[Byte]]]("join-store")
        .withKeySerde(new StringSerde())
        .withValueSerde(new Serde[MyClass1Class2])
    
      myClass1Table.leftJoin[MyClass1,MyClass2](myClass2Table, new MyJoiner, materialized)
    }
    

    【讨论】:

    • 我认为应该使用它来实际强制使用与 kafka 0.11 中相同的 keySerde 设计,这不会破坏操作的语义(尽管 API 已经改变,这很好)
    • 我不是 Scala 专家,所以不确定。但是,您似乎正在针对 Java API 编写 Scala。从 2.0.0 开始,Kafka Streams 也附带了一个 Scala API。我建议使用它,并假设它可以解决问题。
    • 这不是我想说的。我想说的是,如果您想在 0.11 和 2.0.0 中具有相同的行为,则无需明确指定连接的密钥序列化程序。在 0.11 中不需要它,所以在 2.0.0 中应该不需要它
    • 虽然我同意你的说法,但 Scala 在 0.11 中不受官方支持,因此,我们不测试 Scala 的兼容性,而只测试 Java。看来,有些东西无意中被打破了,很不幸:(
    猜你喜欢
    • 2017-08-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-04-24
    • 1970-01-01
    • 1970-01-01
    • 2020-11-04
    • 2018-12-28
    相关资源
    最近更新 更多