【问题标题】:Apache Kafka : Check existence of message in a TopicApache Kafka:检查主题中是否存在消息
【发布时间】:2017-10-02 15:06:20
【问题描述】:

我有一种情况,我需要检查某个特定消息是否已经存在于某个主题中,我绝对不需要该主题中的重复。

任何人都可以提出任何优雅的方式来做到这一点,而不是使用所有消息并检查它们。

【问题讨论】:

    标签: apache-kafka kafka-consumer-api apache-kafka-streams


    【解决方案1】:

    我不认为自己是卡夫卡的专家,但我认为你假装是“反对”卡夫卡的本质。

    不过,我提出了一个使用 Java 版 Kafka Streams 库的解决方案。基本上,流程如下:

    • 将每条消息映射到一个新的键值对,其中键是前一个键及其值的组合:(key1, message1) -> (key1-message1, message1)

    • 使用键对消息进行分组,作为此操作的结果,您将获得 KGroupedStream

    • 应用reduce函数,将值修改为一些自定义值,例如字符串“重复值”。

    • 将reduce后生成的KTable转换成KStream,并推送到新的Kafka Topic中。

    前面的解释有这么多假设,我将提供一些代码来说明一下:

    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> resources =  builder.stream("topic-where-the-messages-are-sent");
    
    KeyValueMapper<String, String, KeyValue<String,String>> kvMapper = new KeyValueMapper<String, String, KeyValue<String,String>>() {
        public KeyValue<String, String> apply(String key, String value) {
            return new KeyValue<String, String>(key + "-" + value, value);
        }
    };
    
    Reducer<String> reducer = new Reducer<String>() {
        public String apply(String value1, String value2) {
            return "Duplicated message";
        }
    };
    
    resources.map(kvMapper)
        .groupByKey()
        .reduce(reducer, "test-store-name")
        .toStream()
        .to("unique-message-output");
    
    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();
    

    请记住,这可能不是最佳解决方案,也许您不会将其视为解决问题的“优雅”方式。

    希望对你有帮助。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-09-05
      • 1970-01-01
      • 2018-07-07
      • 2020-02-16
      • 1970-01-01
      • 1970-01-01
      • 2016-04-18
      • 2017-02-20
      相关资源
      最近更新 更多