【发布时间】:2017-10-02 15:06:20
【问题描述】:
我有一种情况,我需要检查某个特定消息是否已经存在于某个主题中,我绝对不需要该主题中的重复。
任何人都可以提出任何优雅的方式来做到这一点,而不是使用所有消息并检查它们。
【问题讨论】:
标签: apache-kafka kafka-consumer-api apache-kafka-streams
我有一种情况,我需要检查某个特定消息是否已经存在于某个主题中,我绝对不需要该主题中的重复。
任何人都可以提出任何优雅的方式来做到这一点,而不是使用所有消息并检查它们。
【问题讨论】:
标签: apache-kafka kafka-consumer-api apache-kafka-streams
我不认为自己是卡夫卡的专家,但我认为你假装是“反对”卡夫卡的本质。
不过,我提出了一个使用 Java 版 Kafka Streams 库的解决方案。基本上,流程如下:
将每条消息映射到一个新的键值对,其中键是前一个键及其值的组合:(key1, message1) -> (key1-message1, message1)
使用键对消息进行分组,作为此操作的结果,您将获得 KGroupedStream。
应用reduce函数,将值修改为一些自定义值,例如字符串“重复值”。
将reduce后生成的KTable转换成KStream,并推送到新的Kafka Topic中。
前面的解释有这么多假设,我将提供一些代码来说明一下:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> resources = builder.stream("topic-where-the-messages-are-sent");
KeyValueMapper<String, String, KeyValue<String,String>> kvMapper = new KeyValueMapper<String, String, KeyValue<String,String>>() {
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<String, String>(key + "-" + value, value);
}
};
Reducer<String> reducer = new Reducer<String>() {
public String apply(String value1, String value2) {
return "Duplicated message";
}
};
resources.map(kvMapper)
.groupByKey()
.reduce(reducer, "test-store-name")
.toStream()
.to("unique-message-output");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
请记住,这可能不是最佳解决方案,也许您不会将其视为解决问题的“优雅”方式。
希望对你有帮助。
【讨论】: