【问题标题】:How to create a state store with HashMap as value in Kafka streams?如何使用 HashMap 作为 Kafka 流中的值创建状态存储?
【发布时间】:2016-08-29 05:52:24
【问题描述】:

我需要创建一个以字符串键 HashMap 作为值的状态存储。我尝试了以下两种方法。

// First method
StateStoreSupplier avgStoreNew = Stores.create("AvgsNew")
          .withKeys(Serdes.String())
          .withValues(HashMap.class)
          .persistent()
          .build();

// Second method
HashMap<String ,Double> h = new HashMap<String ,Double>();

StateStoreSupplier avgStore1 = Stores.create("Avgs")
          .withKeys(Serdes.String())
          .withValues(Serdes.serdeFrom(h.getClass()))
          .persistent()
          .build();

代码编译正常,没有任何错误,但出现运行时错误

io.confluent.examples.streams.WordCountProcessorAPIException in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer

有人可以建议我创建国有商店的正确方法是什么吗?

【问题讨论】:

    标签: apache-kafka-streams


    【解决方案1】:

    如果要创建状态存储,则需要为要使用的类型提供 serializerdeserializer 类。在 Kafka Stream 中,有一个名为 Serde 的抽象,它将序列化器和反序列化器包装在一个类中。

    如果你使用.withValues(Class&lt;K&gt; keyClass),它必须持有它

    @param keyClass 键的类,必须是 Kafka 内置 serdes 的类型之一

    因为HashMap 没有内置的Serdes,所以您需要先实现一个(可能称为HashMapSerde),并将这个类赋予方法.withValues(Serde&lt;K&gt; keySerde)。此外,您还必须为HashMap 实现实际的序列化器和反序列化器。如果您知道 HashMap 的泛型类型,则应该指定它们(这使得序列化器和反序列化器的实现更加简单。

    类似这样的东西(只是一个草图;省略了泛型):

    import org.apache.kafka.common.serialization.Serde;
    import org.apache.kafka.common.serialization.Serializer;
    import org.apache.kafka.common.serialization.Deserializer;
    
    public class HashMapSerde implements Serde<HashMap> {
    
        void configure(Map<String, ?> configs, boolean isKey) {
            /* put your code here */
        }
    
        void close() {
            /* put your code here */
        }
    
        Serializer<HashMap> serializer() {
            return new Serializer<HashMap>() {
                public void configure(Map<String, ?> configs, boolean isKey) {
                    /* put your code here */
                }
    
                public byte[] serialize(String topic, T data) {
                    /* put your code here */
                }
    
                public void close() {
                    /* put your code here */
                }
            };
        }
    
        Deserializer<HashMap> deserializer() {
            return new Deserializer<HashMap>() {
                public void configure(Map<String, ?> configs, boolean isKey) {
                    /* put your code here */
                }
    
                public T deserialize(String topic, byte[] data) {
                    /* put your code here */
                }
    
                public void close() {
                    /* put your code here */
                }
            };
        }
    }
    

    如果您想查看如何实现(反)序列化程序和Serde 的示例,请查看https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serializationhttps://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java

    【讨论】:

    • 谢谢@matthias-j-sax。我会试试这个。
    • @matthias : 我们如何为 Hashmap 定义 serde,?> ?
    • 实现Serde接口。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-27
    • 2019-07-14
    • 1970-01-01
    • 2020-06-27
    相关资源
    最近更新 更多