【问题标题】:How to understand kafka streams aggregation?如何理解 kafka 流聚合?
【发布时间】:2019-10-07 17:38:09
【问题描述】:

我是 kafka 的新手,正在学习它。我只是在为员工汇总数据,但遇到了问题。有人可以帮忙吗。

我有一个主题 timeoffs,其中 key time_off_id 和 value 类型的 object 还包含员工 ID。所以我想建立一个商店,其中员工 ID 应该是键,值应该是该员工休假的列表。但我遵循以下方法但遇到了问题。聚合数据时,方法参考中的返回类型错误:无法将 ArrayList 转换为 VR。你能帮帮我吗?

代码:

KTable<String, TimeOff> timeoffs = builder.table(topic);
KGroupedTable<String, TimeOff> groupedTable = timeoffs.groupBy(
    (key, value) -> KeyValue.pair(value.getEmployeeId(), value)
);
groupedTable.aggregate(ArrayList<TimeOff>::new, (k, newValue, aggValue) -> {
  aggValue.add(newValue);
  return aggValue;
}, Materialized.as("NewStore"));

我也尝试过这种方法,但同样没有解决问题。

TimeOffList 类:

package com.kafka.productiontest.models;

import java.util.ArrayList;

public class TimeOffList {
  ArrayList list = new ArrayList<TimeOff>();

  public TimeOffList add(Object s) {
    list.add(s);
    return this;
  }
}

在流媒体类中:

groupedTable.aggregate(TimeOffList::new,
    (k, newValue, aggValue) -> (TimeOffList) aggValue.add(newValue));

实施您的解决方案后,此问题已消失,但现在面临 serde 问题。我已经实现了 TimeOffListSerde。请检查以下代码

KStream<String, TimeOff> source = builder.stream(topic);
source.groupBy((k, v) -> v.getEmployeeId())
    .aggregate(ArrayList::new,
        (key, value, aggregate) -> {
          aggregate.add(value);
          return aggregate;
        }, Materialized.as("NewStore").withValueSerde(new TimeOffListSerde(TimeOff.class)));

TimeOffListSerde.java

package com.kafka.productiontest.models;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;

import java.util.ArrayList;
import java.util.Map;

public class TimeOffListSerde implements Serde<ArrayList<TimeOff>> {
  private Serde<ArrayList<TimeOff>> inner;

  public TimeOffListSerde() {
  }

  public TimeOffListSerde(Serde<TimeOff> serde){
    inner = Serdes.serdeFrom(new TimeOffListSerializer(serde.serializer()), new TimeOffListDeserializer(serde.deserializer()));
  }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    inner.serializer().configure(configs, isKey);
    inner.deserializer().configure(configs, isKey);
  }

  @Override
  public void close() {
    inner.serializer().close();
    inner.deserializer().close();
  }

  @Override
  public Serializer<ArrayList<TimeOff>> serializer() {
    return inner.serializer();
  }

  @Override
  public Deserializer<ArrayList<TimeOff>> deserializer() {
    return inner.deserializer();
  }
}

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    你想要这个吗?

    KStream<String, TimeOff> source = builder.stream(sourceTopic);
    KTable<String, List<TimeOff>> table = source.groupBy((k, v) -> v.getId())
        .aggregate(ArrayList::new,
                (key, value, aggregate) -> {
                    aggregate.add(value);
                    return aggregate;
                }, Materialized.as("NewStore"));
    

    【讨论】:

    • 这解决了编译问题,但是在流式传输记录时,它仍然给出了这个问题。无法从 START_ARRAY 令牌中反序列化 com.kafka.productiontest.models.TimeOff 的实例
    • 你必须为 TimeOff 类实现 Serde 类,并使用那个 ex) builder.stream("source", Consumed.with(Serdes.String(), Serdes.forTimeOff() ));
    • KStream source = builder.stream(topic); source.groupBy((k, v) -> v.getEmployeeId()) .aggregate(ArrayList::new, (key, value, aggregate) -> { aggregate.add(value); return aggregate; }, Materialized.as ("NewStore").withValueSerde(new TimeOffListSerde(TimeOff.class)));
    • 我已经更新了这个问题,面临 serde 的问题。如果你能帮忙,请。
    • 你的回答是对的。但是,请您指导下一个 Serdes 问题。我也发布了我的 TimeOffListSerde 课程。 @小酒馆