【发布时间】:2021-04-16 11:34:25
【问题描述】:
我有 2 个使用 kafka 主题创建的流,我正在使用 DataStream API 加入它们。我希望将加入(应用)的结果发布到另一个 kafka 主题。在 out 主题中看不到 join 的结果。
我确认我正在向两个源主题发布正确的数据。不知道哪里出错了。这是代码sn-p,
创建的流如下所示。
DataStream<String> ms1=env.addSource(new FlinkKafkaConsumer("top1",new SimpleStringSchema(),prop))
.assignTimestampsAndWatermarks(new WatermarkStrategy() {
@Override
public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new AscendingTimestampsWatermarks<>();
}
@Override
public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (event, timestamp) -> System.currentTimeMillis();
}
});
DataStream<String> ms2=env.addSource(new FlinkKafkaConsumer("top2",new SimpleStringSchema(),prop))
.assignTimestampsAndWatermarks(new WatermarkStrategy() {
@Override
public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new AscendingTimestampsWatermarks<>();
}
@Override
public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (event, timestamp) -> System.currentTimeMillis();
}
});
使用 join-where-equals 执行流连接,如下所示。
DataStream joinedStreams = ms1.join(ms2)
.where(o -> {String[] tokens = ((String) o).split("::"); return tokens[0];})
.equalTo(o -> {String[] tokens = ((String) o).split("::"); return tokens[0];})
.window(EventTimeSessionWindows.withGap(Time.seconds(60)))
.apply(new JoinFunction<String, String, CountryData>() {
@Override
public CountryData join(String o, String o2) throws Exception {
String[] tokens1 = o.split("::");
String[] tokens2 = o2.split("::");
CountryData countryData = new CountryData(tokens1[0], tokens1[1], tokens1[2], Long.parseLong(tokens1[3])+Long.parseLong(tokens2[3]));
return countryData;
}});
添加如下接收器,
joinedStreams.addSink(new FlinkKafkaProducer<CountryData>("localhost:9095","flink-output", new CustomSchema()));
dataStreamSink.setParallelism(1);
dataStreamSink.name("KAFKA-TOPIC");
任何线索,哪里出错了?我可以看到拓扑中可用的消息 谢谢
【问题讨论】: