【问题标题】:how to flush batch data to sink in apache flink如何刷新批处理数据以在 apache flink 中下沉
【发布时间】:2020-04-12 05:20:46
【问题描述】:

我正在使用 apache flink(v1.10.0) 来计算 RabbitMQ 消息,将结果接收到 MySQL,现在我是这样计算的:

   consumeRecord.keyBy("gameType")
                .timeWindowAll(Time.seconds(5))
                .reduce((d1, d2) -> {
                    d1.setRealPumpAmount(d1.getRealPumpAmount() + d2.getRealPumpAmount());
                    d1.setPumpAmount(d1.getPumpAmount() + d2.getPumpAmount());
                    return d1;
                })
                .addSink(new SinkFunction<ReportPump>() {
                    @Override
                    public void invoke(ReportPump value, Context context) throws Exception {
                        // save to mysql
                    }
                });

但是现在sink方法每次调用只获取一行,如果这批中的一行失败了,我无法回滚批处理操作。现在我想获取一个窗口的批处理,如果失败了一次sink到数据库,我回滚插入和 Apache Flink 的检查点。这就是我现在要做的:

consumeRecord.keyBy("gameType")
                .timeWindowAll(Time.seconds(5)).reduce(new ReduceFunction<ReportPump>() {
                    @Override
                    public ReportPump reduce(ReportPump d1, ReportPump d2) throws Exception {
                        d1.setRealPumpAmount(d1.getRealPumpAmount() + d2.getRealPumpAmount());
                        d1.setPumpAmount(d1.getPumpAmount() + d2.getPumpAmount());
                        return d1;
                    }
                })
                .apply(new AllWindowFunction<ReportPump, List<ReportPump>, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<ReportPump> values, Collector<List<ReportPump>> out) throws Exception {
                        ArrayList<ReportPump> employees = Lists.newArrayList(values);
                        if (employees.size() > 0) {
                            out.collect(employees);
                        }
                    }
                })
                .addSink(new SinkFunction<List<ReportPump>>() {
                    @Override
                    public void invoke(List<ReportPump> value, Context context) throws Exception {
                        PumpRealtimeHandler.invoke(value);
                    }
                });

但是 apply 函数给出了提示:Cannot resolve method 'apply' in 'SingleOutputStreamOperator'。如何减少它并获取批处理数据列表并仅刷新一次到数据库?

【问题讨论】:

    标签: apache-flink


    【解决方案1】:

    SingleOutputStreamOperator 没有 apply 方法,因为 apply 只能在窗口化之后发出。 你在这里想念的是:

    .windowAll(GlobalWindows.create())
    

    在reduce和apply之间,它会将所有reduce结果聚合到一个包含所有reduce结果列表的全局窗口中,而不是您可以针对一个列表而不是针对数据库的多个批次进行collect。


    我不知道你的结果是不是一个好的做法,因为你会失去 apache flink 的并行性。

    您应该阅读有关 TableApi 和 JDBC 接收器的信息,也许它会对您有所帮助。 (有关它的更多信息:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#jdbc-connector)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-09-20
      相关资源
      最近更新 更多