【问题标题】:send message to topic based on message key using kafka streams使用 kafka 流根据消息密钥向主题发送消息
【发布时间】:2019-09-17 17:57:16
【问题描述】:

我希望能够根据消息键的键将 Kafkastream 中的所有记录发送到不同的主题。 前任。 Kafka 中的流包含名称作为键和记录作为值。我想根据记录的键将这些记录扇出到不同的主题

数据: (jhon -> {jhonsRecord}),(sean -> {seansRecord}),(mary -> {marysRecord}),(jhon -> {jhonsRecord2}), 预计

  • topic1 :name: jhon ->(jhon -> {jhonsRecord}),(jhon -> {jhonsRecord2})
  • topic2 :sean-> (sean -> {seansRecord})
  • topic3 :mary -> (mary -> {marysRecord})

以下是我现在执行此操作的方式,但是由于名称列表很长,所以速度很慢。另外,即使记录了几个名字,我也需要遍历整个列表请提出修复建议

    for( String name : names )
    {
        recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);
    } 

【问题讨论】:

  • 注意:这可能会导致一堆主题(使用默认设置)。这里的用例是什么?
  • @cricket_007 感谢您指出这一点,但这就是这里的意图。每个名称有 100 条记录,每条记录都需要单独处理和汇总。

标签: java apache-kafka apache-kafka-streams


【解决方案1】:

我认为你应该使用KStream::to(final TopicNameExtractor<K, V> topicExtractor) 函数。它使您能够计算每条消息的主题名称。

示例代码:

final KStream<String, String> stream = ???;
stream.to((key, value, recordContext) -> key);

【讨论】:

  • 假设与输出主题没有任何关系,我比我的答案更喜欢这个
【解决方案2】:

我认为您正在寻找的是KStream#branch

以下内容未经测试,但显示了大致思路

// get a list of predicates to branch a topic on
final List<String> names = Arrays.asList("jhon", "sean", "mary");
final Predicate[] predicates = names.stream()
    .map((Function<String, Predicate<String, Object>>) n -> (s, o) -> s.equals(n))
    .toArray(Predicate[]::new);

// example input
final KStream<Object, Object> stream = new StreamsBuilder().stream("names");

// split the topic
KStream<String, Object>[] branches = stream.branch(predicates);
for (int i = 0; i < names.size(); i++) {
    branches[i].to(names.get(i));
}

// KStream branches[0] contains all records whose keys are "jhon"
// KStream branches[1] contains all records whose keys are "sean"
...

【讨论】:

    【解决方案3】:

    如果您需要为每个用户生成聚合数据,则无需为每个用户写入单独的主题。你最好在源流上写一个聚合。这样一来,您不会最终每个键都有一个主题,但您仍然可以独立地对每个用户运行操作。

    Serde<UserRecord> recordSerde = ...
    KStream<Stream, UserAggregate> aggregateByName = recordsByName
       .groupByKey(Grouped.with(Serdes.String(), recordSerde))
       .aggregate(...)
       .toStream()
    

    详情请见https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#aggregating

    这种方法将扩展到数百万用户,您目前无法通过每个用户一个主题的方法来实现这一目标。

    【讨论】:

    • 这是一个很好的想法,这里有一个观点是 KPI 需要在聚合数据和原始数据上运行。所以在任何时候,我都可能需要访问用户的原始数据和汇总数据。例如,一项工作可能需要查看单个用户的原始数据,而另一项工作需要查看单个用户的聚合数据。
    • 还是有可能的。在上述情况下,您可以将聚合结果写入另一个主题,但具有原始数据的原始主题仍然可供任何其他消费者读取。您甚至可以在同一个 Kafka Streams 应用程序中执行此操作,您可以在同一个源主题上运行多个不同的转换。在实践中,Kafka Streams 将读取您的每条输入消息一次,然后依次将它们传递给这些单独的转换中的每一个。
    • 是的,您是对的,原始数据和聚合数据仅在 2 个主题中可用,但是当我只想查看一个特定用户的数据时,我没有任何选择,只能订阅该用户的所有内容和过滤记录。如果我的理解不正确,请告诉我。
    • 没错。 Kafka 只是将每条消息视为一个字节桶,因此任何过滤都需要在消费者级别完成。如果您只想直接从 Kafka 获取给定用户的记录,则需要将它们分开到单独的主题中。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-06-13
    • 1970-01-01
    • 2019-10-13
    • 2017-12-15
    • 2019-02-14
    相关资源
    最近更新 更多