【问题标题】:Apache Flink Stream join using DataStream API not outputting anythingApache Flink Stream join 使用 DataStream API 不输出任何东西
【发布时间】:2021-04-16 11:34:25
【问题描述】:

我有 2 个使用 kafka 主题创建的流,我正在使用 DataStream API 加入它们。我希望将加入(应用)的结果发布到另一个 kafka 主题。在 out 主题中看不到 join 的结果。

我确认我正在向两个源主题发布正确的数据。不知道哪里出错了。这是代码sn-p,

创建的流如下所示。

DataStream<String> ms1=env.addSource(new FlinkKafkaConsumer("top1",new SimpleStringSchema(),prop))
            .assignTimestampsAndWatermarks(new WatermarkStrategy() {
                @Override
                public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                    return new AscendingTimestampsWatermarks<>();
                }
                @Override
                public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                    return (event, timestamp) -> System.currentTimeMillis();
                }
            });
DataStream<String> ms2=env.addSource(new FlinkKafkaConsumer("top2",new SimpleStringSchema(),prop))
            .assignTimestampsAndWatermarks(new WatermarkStrategy() {
                @Override
                public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                    return new AscendingTimestampsWatermarks<>();
                }
                @Override
                public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                    return (event, timestamp) -> System.currentTimeMillis();
                }
            });

使用 join-where-equals 执行流连接,如下所示。

DataStream joinedStreams = ms1.join(ms2)
            .where(o -> {String[] tokens = ((String) o).split("::"); return tokens[0];})
            .equalTo(o -> {String[] tokens = ((String) o).split("::"); return tokens[0];})
            .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
            .apply(new JoinFunction<String, String, CountryData>() {
                @Override
                public CountryData join(String o, String o2) throws Exception {
                    String[] tokens1 = o.split("::");
                    String[] tokens2 = o2.split("::");
                    CountryData countryData = new CountryData(tokens1[0], tokens1[1], tokens1[2], Long.parseLong(tokens1[3])+Long.parseLong(tokens2[3]));
                    return countryData;
                }});

添加如下接收器,

joinedStreams.addSink(new FlinkKafkaProducer<CountryData>("localhost:9095","flink-output", new CustomSchema()));
dataStreamSink.setParallelism(1);
dataStreamSink.name("KAFKA-TOPIC");

任何线索,哪里出错了?我可以看到拓扑中可用的消息 谢谢

【问题讨论】:

    标签: apache-kafka apache-flink


    【解决方案1】:

    我认为这两个 FlinkKafkaConsumer 实例缺少时间提取器和水印配置。

    由于代码使用事件时间窗口连接,它需要与在 Kafka 中找到的数据相关联的某种时间信息,以便知道每个事件对应于哪个时间窗口。

    否则,来自两个流的事件可能永远不会足够接近 在事件时间 以匹配 EventTimeSessionWindows.withGap(Time.seconds(60)) 定义的 60 秒窗口。

    你还需要设置 watermark 参数告诉 Flink什么时候停止等待新数据并物化输出 s.t.可以看到加入结果。

    查看Kafka connector time and watermark configuration,了解您拥有的各种时间提取和水印可能性。

    最后,确保您将分布在足够长时间段内的测试数据发送到您的应用程序。通过事件时间处理,只有“足够老”的数据才能输出,年轻的数据总是“卡在传输中”。例如,对于 60 秒的时间窗口和 30 秒的水印,您至少需要 90 秒的数据才能在输出中看到任何内容。

    【讨论】:

    • 感谢 Svend,它通过将 assignTimestampsAndWatermarks 添加到两个数据流中来工作,并使用工作代码 sn-p 更新了帖子。
    猜你喜欢
    • 2020-09-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-10-31
    • 1970-01-01
    • 1970-01-01
    • 2022-10-13
    相关资源
    最近更新 更多