【问题标题】:Kafka Streams API GroupBy behaviourKafka Streams API GroupBy 行为
【发布时间】:2020-04-19 21:04:30
【问题描述】:

我是 kafka 流的新手,我正在尝试使用 groupBy 函数将一些流数据聚合到 KTable 中。问题如下:

产生的消息是一个json msg,格式如下:

{ "current_ts": "2019-12-24 13:16:40.316952",
  "primary_keys": ["ID"],
  "before": null,
  "tokens": {"txid":"3.17.2493", 
             "csn":"64913009"},
  "op_type":"I",
  "after":  { "CODE":"AAAA41",
              "STATUS":"COMPLETED",
              "ID":24},
  "op_ts":"2019-12-24 13:16:40.316941",
  "table":"S_ORDER"} 

我想隔离json字段"after",然后用"key" = "ID"value创建一个KTable整个 json “之后”

首先,我创建了一个 KStream 来隔离 "after" json,它工作正常。

KStream代码块:(不要注意if语句,因为“before”和“after”格式相同。)

KStream<String, String> s_order_list = s_order
                .mapValues(value -> {
                    String time;
                    JSONObject json = new JSONObject(value);
                    if (json.getString("op_type").equals("I")) {
                        time = "after";
                    }else {
                        time = "before";
                    }
                    JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
                    return json2.toString();
                });

正如预期的那样,输出如下:

...
null {"CODE":"AAAA48","STATUS":"SUBMITTED","ID":6}
null {"CODE":"AAAA16","STATUS":"COMPLETED","ID":1}
null {"CODE":"AAAA3","STATUS":"SUBMITTED","ID":25}
null {"CODE":"AAAA29","STATUS":"SUBMITTED","ID":23}
...

之后,我实现了一个 KTable 来分组 json 的“ID”。

KTable代码块:

  KTable<String, String> s_table = s_order_list
                .groupBy((key, value) -> {
                    JSONObject json = new JSONObject(value);
                    return json.getString("ID");
                });

我想创建KTable&lt;String, String&gt;,但我正在创建GroupedStream&lt;Object,String&gt;

Required type: KTable<String,String>
Provided:KGroupedStream<Object,String>
no instance(s) of type variable(s) KR exist so that KGroupedStream<KR, String> conforms to KTable<String, String>

总之,问题是 KGroupedStreams 到底是什么以及如何正确实现 KTable?

【问题讨论】:

  • 为什么不能直接把ID作为key放在producer里面呢? (你在使用 Debezium 吗?)
  • 你如何创建你的上游对象,即s_order——这似乎只是一个普遍的问题。
  • 无法将 KStream 转换为 KTable。那么为什么不使用 KTable 本身对数据进行共同分区呢。
  • @cricket_007 不,我没有使用 Debezium。在我正在进行的项目中,我无法更改生产者发布的消息。
  • @MatthiasJ.Sax s_order 是另一个使用builder.stream("topic-name") 构建的流。实际上是多余的,可能只是 s_order_list 直接从主题中“读取消息”。

标签: json group-by apache-kafka apache-kafka-streams ktable


【解决方案1】:

groupBy 处理器之后,您可以使用有状态处理器,例如 aggregatereduce(处理器返回 KTable)。你可以这样做:

KGroupedStream<String, String> s_table = s_order_list
                     .groupBy((key, value) ->
                         new JSONObject(value).getString("ID"),
                         Grouped.with(
                                 Serdes.String(),
                                 Serdes.String())
                     );

KTable<String, StringAggregate> aggregateStrings = s_table.aggregate(
                     (StringAggregate::new),
                     (key, value, aggregate) -> aggregate.addElement(value));

StringAggregate 看起来像:

public class StringAggregate {

    private static List<String> elements = new ArrayList<>();

    public StringAggregate addElement(String element){
        elements.add(element);
        return this;
    }
    //other methods
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-12-30
    • 1970-01-01
    • 1970-01-01
    • 2019-03-30
    • 2018-03-04
    相关资源
    最近更新 更多