【发布时间】:2018-02-28 10:44:30
【问题描述】:
我在 SQL Server 中有一个表,我想流式传输到 Kafka 主题,结构如下:
(UserID, ReportID)
此表将不断更改(添加、插入、无更新记录)
我想把这个变成这样的结构,放到 Elasticsearch 中:
{
"UserID": 1,
"Reports": [1, 2, 3, 4, 5, 6]
}
到目前为止,我看到的示例是日志或点击流,它们在我的情况下不起作用。
这种用例可能吗?我总是可以只看UserID 更改和查询数据库,但这似乎很幼稚,并不是最好的方法。
更新
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.util.ArrayList;
import java.util.Properties;
public class MyDemo {
public static void main(String... args) {
System.out.println("Hello KTable!");
final Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<Long, Long> reportPermission = builder.stream(TOPIC);
KTable<Long, ArrayList<Long>> result = reportPermission
.groupByKey()
.aggregate(
new Initializer<ArrayList<Long>>() {
@Override
public ArrayList<Long> apply() {
return null;
}
},
new Aggregator<Long, Long, ArrayList<Long>>() {
@Override
public ArrayList<Long> apply(Long key, Long value, ArrayList<Long> aggregate) {
aggregate.add(value);
return aggregate;
}
},
new Serde<ArrayList<Long>>() {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}
@Override
public void close() {}
@Override
public Serializer<ArrayList<Long>> serializer() {
return null;
}
@Override
public Deserializer<ArrayList<Long>> deserializer() {
return null;
}
});
result.to("report-aggregated-topic");
KafkaStreams streams = new KafkaStreams(builder, createStreamProperties());
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static final String TOPIC = "report-permission";
private static final Properties createStreamProperties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "report-permission-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
return props;
}
}
我实际上陷入了聚合阶段,因为我无法为 ArrayList<Long> 编写合适的 SerDe(还没有足够的技能),lambda 似乎不适用于聚合器 - 它不知道 agg 的类型是什么:
KTable<Long, ArrayList<Long>> sample = builder.stream(TOPIC)
.groupByKey()
.aggregate(
() -> new ArrayList<Long>(),
(key, val, agg) -> agg.add(val),
longSerde
);
【问题讨论】:
-
使用 Lambda 的聚合应该可以工作。看看这个例子:github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/…
-
对于 Serds:您当然可以挑剔地重用现有的
LongSerde。基本模式是首先序列化数组列表的大小,然后是每个单独的长值。在反序列化中你也这样做。首先,反序列化大小,然后知道预期有多少条目并单独反序列化所有条目。或者你从头开始写一个列表序列化器stackoverflow.com/questions/23793885/…
标签: java apache-kafka apache-kafka-streams apache-kafka-connect