【发布时间】:2019-04-10 11:14:39
【问题描述】:
我正在研究 Flink 中的数据倾斜处理以及如何更改低level control of physical partition 以便对元组进行均匀处理。我已经创建了合成倾斜数据源,我的目标是在一个窗口上处理(聚合)它们。这是complete code。
streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.rebalance() // or .rescale() .shuffle()
.keyBy(new StationPlatformKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.setParallelism(4)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
;
根据 Flink 仪表板,我看不出 .shuffle()、.rescale() 和 .rebalance() 之间有太大区别。尽管文档说 rebalance() 转换更适合数据倾斜。
之后我尝试使用.partitionCustom(partitioner, "someKey")。然而,令我惊讶的是,我无法在窗口操作中使用 setParallelism(4)。文档说
注意:此操作本质上是非并行的,因为所有元素 必须通过同一个算子实例。
我不明白为什么。如果允许我做partitionCustom,那我为什么不能在那之后使用并行性呢?这是complete code。
streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.partitionCustom(new StationPlatformKeyCustomPartitioner(), new StationPlatformKeySelector())
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
;
谢谢, 费利佩
【问题讨论】:
-
我正在查看这个答案 stackoverflow.com/questions/34681887/…,它说要实现 OneInputStreamOperator (ci.apache.org/projects/flink/flink-docs-stable/api/java/…),但我不确定它是否能解决我的问题
标签: java partitioning flink-streaming skew