【发布时间】:2020-04-12 05:20:46
【问题描述】:
我正在使用 apache flink(v1.10.0) 来计算 RabbitMQ 消息,将结果接收到 MySQL,现在我是这样计算的:
consumeRecord.keyBy("gameType")
.timeWindowAll(Time.seconds(5))
.reduce((d1, d2) -> {
d1.setRealPumpAmount(d1.getRealPumpAmount() + d2.getRealPumpAmount());
d1.setPumpAmount(d1.getPumpAmount() + d2.getPumpAmount());
return d1;
})
.addSink(new SinkFunction<ReportPump>() {
@Override
public void invoke(ReportPump value, Context context) throws Exception {
// save to mysql
}
});
但是现在sink方法每次调用只获取一行,如果这批中的一行失败了,我无法回滚批处理操作。现在我想获取一个窗口的批处理,如果失败了一次sink到数据库,我回滚插入和 Apache Flink 的检查点。这就是我现在要做的:
consumeRecord.keyBy("gameType")
.timeWindowAll(Time.seconds(5)).reduce(new ReduceFunction<ReportPump>() {
@Override
public ReportPump reduce(ReportPump d1, ReportPump d2) throws Exception {
d1.setRealPumpAmount(d1.getRealPumpAmount() + d2.getRealPumpAmount());
d1.setPumpAmount(d1.getPumpAmount() + d2.getPumpAmount());
return d1;
}
})
.apply(new AllWindowFunction<ReportPump, List<ReportPump>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<ReportPump> values, Collector<List<ReportPump>> out) throws Exception {
ArrayList<ReportPump> employees = Lists.newArrayList(values);
if (employees.size() > 0) {
out.collect(employees);
}
}
})
.addSink(new SinkFunction<List<ReportPump>>() {
@Override
public void invoke(List<ReportPump> value, Context context) throws Exception {
PumpRealtimeHandler.invoke(value);
}
});
但是 apply 函数给出了提示:Cannot resolve method 'apply' in 'SingleOutputStreamOperator'。如何减少它并获取批处理数据列表并仅刷新一次到数据库?
【问题讨论】:
标签: apache-flink