【问题标题】:Apache Flink: shared state between two (or more) Task ManagersApache Flink:两个(或多个)任务管理器之间的共享状态
【发布时间】:2020-02-07 01:07:30
【问题描述】:

假设我有两个任务管理器,每个任务管理器只有一个任务槽。现在,我有以下工作:

    KeyedStream<...> streamA = env.addSource(...).keyBy(...);
    KeyedStream<...> streamB = env.addSource(...).keyBy(...);

    streamA.connect(streamB).flatMap(new StatefulJoinFunction()).setParallelism(2);

一个任务管理器将使用来自 Kafka 主题的数据,另一个将使用来自另一个 Kafka 主题的数据。

我将作业发送给作业管理器以执行它。 Flink 分配 both 任务管理器来处理 flatMap(因为任务管理器只有一个任务槽)。

flatMap 在事件之间进行简单的连接(使用两个键控状态):

    public class StatefulJoinFunction extends RichCoFlatMapFunction<A, B, String> {
        private ValueState<A> AState;
        private ValueState<B> BState;

        @Override
        public void open(Configuration config) {
            AState = getRuntimeContext().getState(new ValueStateDescriptor<>("A event state", A.class));
            BState = getRuntimeContext().getState(new ValueStateDescriptor<>("B event state", B.class));
        }

        @Override
        public void flatMap1(A event, Collector<String> out) throws Exception {
            B secondEvent = BState.value();

            if (secondEvent == null)
                AState.update(event);
            else {
                out.collect(...);
                BState.clear();
            }
        }

        @Override
        public void flatMap2(A event, Collector<String> out) throws Exception {
            A firstEvent = AState.value();

            if (firstEvent == null)
                BState.update(event);
            else {
                out.collect(...);
                AState.clear();
            }
        }
    }

如果我理解正确的话,在 connect 方法之后,流就变成了只有一个。现在,实现的 flatMap 需要共享状态,因为操作员必须控制相关事件是否到达以应用连接,但它以两个并行执行,因此使用两个任务管理器。这意味着每次必须更新状态时,任务管理器都应该保存在另一个任务管理器的状态中(在连接方法之后共享),或者它可能需要简单地读取状态。那么任务管理器如何通信呢?是否会影响性能,因为任务管理器可能运行在不同的集群节点上?

编辑:我在 Flink 的博客上找到了以下article,似乎两个任务管理器可以通过 TCP 连接进行通信,这对我来说很有意义,因为有些情况我们需要在事件之间共享状态。如果这是错误的,请您向我解释一下Flink如何管理以下场景?

假设总是有两个任务管理器,物理上位于两个集群节点上。每个任务管理器始终只有一个插槽。我运行上述作业并将并行度设置为 2(例如,在将作业发送到作业管理器时使用 -p 参数)。现在,Flink 将从我的作业中创建两个结构相同的子任务,并将它们发送到任务管理器。两个任务管理器都将执行“相同”的作业,但使用不同的事件。该作业消耗来自两个 Kafka 主题的事件:A 和 B。这意味着第一个和第二个任务管理器将同时消耗来自主题 A 和 B 的事件,但是不同的事件,否则会有重复。作业是相同的,即它执行上面的 RichCoFlatMapFunction,然后每个任务管理器将在本地处理其消费事件集和个人本地状态。现在问题来了:假设第一个任务管理器已经消费了一个键为“1”的事件。此事件到达 RichCoFlatMapFunction 内部,并存储在状态内部,因为操作员仍在等待另一个具有相同键的事件来生成连接。如果另一个具有“1”键的事件从第二个任务管理器中被消耗,并且它们不共享它们的状态或通信,则不可能进行加入。 我的推理有什么问题?

【问题讨论】:

    标签: join state apache-flink


    【解决方案1】:

    两个任务管理器不需要为了状态共享而进行通信——Flink 中没有状态共享。

    下面显示的这三个执行图中的任何一个都是可能的,具体取决于您如何安排源的详细信息。在每个图的左侧,我们看到 A 和 B 的源运算符,在右侧,两个输入运算符的两个并行实例通过 RichCoFlatMap 实现连接。

    keyBy 不是运算符,而是指定源和两个 RichCoFlatMap 实例如何连接。它安排这是一个散列连接,对源流进行重新分区。

    使用这三种场景中的哪一种并不重要,因为在所有三种情况下,keyBy 将具有相同的效果,即会将某些键的所有事件引导至 Join1,并将其他键的所有事件引导至 Join2。

    换句话说,对于任何给定的键,该键的所有事件都将在同一个任务槽中处理。您可以将ValueState&lt;A&gt; 视为分布式(分片)键/值存储,其中值的类型为 A。每个任务管理器都有该键/值存储切片的状态(用于键的不相交子集) ,并处理这些键的所有事件(并且仅处理那些键)。

    例如:在flatMap1 中,当使用来自streamA 的元素调用BState.value() 时,Flink 运行时将访问BState 的值当前在上下文中的键,表示与当前正在处理的来自streamA 的事件的键关联的值。在当前任务中,此状态将始终是本地的。同样,flatMap2 将始终使用来自 streamB 的元素调用。

    这种设计避免了任务管理器之间的任何耦合,这有利于可扩展性和性能。

    【讨论】:

    • 非常感谢您的回答。但是我对 connect() 方法的工作原理有点困惑。两个 TM 都将在各自的任务槽上执行从 addSource() 到 keyBy() 操作的作业。那么,当操作变成streamA.connect(streamB)时,究竟发生了什么?因为,第一个任务管理器使用来自第一个 kafka 主题的事件,而第二个来自第二个 Kafka 主题的事件,即它们处理不同的事件。
    • 我已经更新了我的答案。如果仍然不清楚这是如何工作的,请告诉我。
    • 谢谢大卫,非常感谢您的回答,但我仍然有些疑问。我已经编辑了我的疑问。
    • 问题在于,使用 keyBy,键为“1”的事件无法发送到两个任务管理器。键为“1”的所有连接源的所有事件都将发送到其中一个任务管理器。
    • 我还稍微扩展并澄清了这个答案的第一段。任务管理器之间的 TCP 连接用于跨作业图的数据流。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-02-22
    • 1970-01-01
    • 1970-01-01
    • 2022-11-30
    • 1970-01-01
    • 1970-01-01
    • 2017-05-03
    相关资源
    最近更新 更多