【发布时间】:2020-02-07 01:07:30
【问题描述】:
假设我有两个任务管理器,每个任务管理器只有一个任务槽。现在,我有以下工作:
KeyedStream<...> streamA = env.addSource(...).keyBy(...);
KeyedStream<...> streamB = env.addSource(...).keyBy(...);
streamA.connect(streamB).flatMap(new StatefulJoinFunction()).setParallelism(2);
一个任务管理器将使用来自 Kafka 主题的数据,另一个将使用来自另一个 Kafka 主题的数据。
我将作业发送给作业管理器以执行它。 Flink 分配 both 任务管理器来处理 flatMap(因为任务管理器只有一个任务槽)。
flatMap 在事件之间进行简单的连接(使用两个键控状态):
public class StatefulJoinFunction extends RichCoFlatMapFunction<A, B, String> {
private ValueState<A> AState;
private ValueState<B> BState;
@Override
public void open(Configuration config) {
AState = getRuntimeContext().getState(new ValueStateDescriptor<>("A event state", A.class));
BState = getRuntimeContext().getState(new ValueStateDescriptor<>("B event state", B.class));
}
@Override
public void flatMap1(A event, Collector<String> out) throws Exception {
B secondEvent = BState.value();
if (secondEvent == null)
AState.update(event);
else {
out.collect(...);
BState.clear();
}
}
@Override
public void flatMap2(A event, Collector<String> out) throws Exception {
A firstEvent = AState.value();
if (firstEvent == null)
BState.update(event);
else {
out.collect(...);
AState.clear();
}
}
}
如果我理解正确的话,在 connect 方法之后,流就变成了只有一个。现在,实现的 flatMap 需要共享状态,因为操作员必须控制相关事件是否到达以应用连接,但它以两个并行执行,因此使用两个任务管理器。这意味着每次必须更新状态时,任务管理器都应该保存在另一个任务管理器的状态中(在连接方法之后共享),或者它可能需要简单地读取状态。那么任务管理器如何通信呢?是否会影响性能,因为任务管理器可能运行在不同的集群节点上?
编辑:我在 Flink 的博客上找到了以下article,似乎两个任务管理器可以通过 TCP 连接进行通信,这对我来说很有意义,因为有些情况我们需要在事件之间共享状态。如果这是错误的,请您向我解释一下Flink如何管理以下场景?
假设总是有两个任务管理器,物理上位于两个集群节点上。每个任务管理器始终只有一个插槽。我运行上述作业并将并行度设置为 2(例如,在将作业发送到作业管理器时使用 -p 参数)。现在,Flink 将从我的作业中创建两个结构相同的子任务,并将它们发送到任务管理器。两个任务管理器都将执行“相同”的作业,但使用不同的事件。该作业消耗来自两个 Kafka 主题的事件:A 和 B。这意味着第一个和第二个任务管理器将同时消耗来自主题 A 和 B 的事件,但是不同的事件,否则会有重复。作业是相同的,即它执行上面的 RichCoFlatMapFunction,然后每个任务管理器将在本地处理其消费事件集和个人本地状态。现在问题来了:假设第一个任务管理器已经消费了一个键为“1”的事件。此事件到达 RichCoFlatMapFunction 内部,并存储在状态内部,因为操作员仍在等待另一个具有相同键的事件来生成连接。如果另一个具有“1”键的事件从第二个任务管理器中被消耗,并且它们不共享它们的状态或通信,则不可能进行加入。 我的推理有什么问题?
【问题讨论】:
标签: join state apache-flink