【问题标题】:Apache Flink: How to execute in parallel but keep order of messages?Apache Flink:如何并行执行但保持消息顺序?
【发布时间】:2017-09-09 14:34:12
【问题描述】:

我有几个关于 flink 并行性的问题。这是我的设置:

我有 1 个主节点和 2 个从节点。在 flink 中,我创建了 3 个 kafka 消费者,每个消费者都来自不同的主题。
由于元素的顺序对我来说很重要,所以每个主题只有一个分区,并且我有 flink 设置来使用事件时间。

然后我在每个数据流上运行以下管道(以伪代码形式):

source
.map(deserialize)
.window
.apply
.map(serialize)
.writeTo(sink)

到目前为止,我使用参数 -p 2 启动我的 flink 程序,假设这将允许我使用我的两个节点。结果不是我所希望的,因为我的输出顺序有时会搞砸。

在阅读了 flink 文档并试图更好地理解它之后,有人可以确认我的以下“学习”吗?

1.) 传递-p 2 仅配置任务并行度,即一个任务(例如map(deserialize))将被拆分成的最大并行实例数。如果我想通过整个管道保持订单,我必须使用-p 1

2.) 这对我来说似乎是矛盾的/令人困惑的:即使并行度设置为 1,不同的任务仍然可以并行运行(同时)。因此,如果我通过-p 1,我的 3 个管道也将并行运行。

作为后续问题:有什么方法可以确定哪些任务映射到了哪个任务槽,以便我自己确认并行执行?

我将不胜感激!

更新

Here是flink对-p 2的执行计划。

【问题讨论】:

    标签: parallel-processing apache-kafka apache-flink


    【解决方案1】:

    我会尽量用我知道的回答。

    1) 是的,对于 CLI 客户端,可以使用 -p 指定并行度参数。您说得对,这是并行实例的最大数量。但是,我没有看到并行性和顺序之间的联系?据我所知,订单是由 Flink 使用事件中提供的时间戳或他自己的摄取时间戳来管理的。如果您想维护不同数据源的顺序,我觉得这很复杂,或者您可以将这些不同的数据源合并为一个。

    2) 如果您将并行度设置为 3,则您的 3 个管道可以并行运行。我认为这里的并行度意味着在不同的插槽上。

    后续问题)您可以在 JobManager 的 Web 前端 http://localhost:8081 上查看哪些任务映射到了哪个任务槽。

    【讨论】:

    • 我上传了我的flink程序的执行计划,在那里你可以看到在最后一个map之前有一个rebalance。根据these flink docs,rebalance() 不能保证订单。我假设在写入 kafka 时两个子任务之间存在竞争条件,这会打乱我的输出顺序。因此,我认为并行度 > 1 会破坏我的结果。
    【解决方案2】:

    Apache Flink user email list 上提出问题后,答案如下:

    1.) -p 选项定义每个作业的任务并行度。如果选择的并行度高于 1 并且数据被重新分配(例如通过 rebalance() 或 keyBy()),则无法保证顺序。

    2.) 将-p 设置为 1 时,仅使用 1 个任务槽,即 1 个 CPU 内核。因此,可能会有多个线程同时在一个内核上运行,而不是并行运行。

    至于我的要求:为了并行运行多个管道并且仍然保持顺序,我可以只运行多个 Flink 作业,而不是在同一个 Flink 作业中运行所有管道。

    【讨论】:

      【解决方案3】:

      请在下面找到使用侧输出和插槽组进行本地扩展的示例。

      package org.example
      
      /*
       * Licensed to the Apache Software Foundation (ASF) under one
       * or more contributor license agreements.  See the NOTICE file
       * distributed with this work for additional information
       * regarding copyright ownership.  The ASF licenses this file
       * to you under the Apache License, Version 2.0 (the
       * "License"); you may not use this file except in compliance
       * with the License.  You may obtain a copy of the License at
       *
       *     http://www.apache.org/licenses/LICENSE-2.0
       *
       * Unless required by applicable law or agreed to in writing, software
       * distributed under the License is distributed on an "AS IS" BASIS,
       * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       * See the License for the specific language governing permissions and
       * limitations under the License.
       */
      
      import org.apache.flink.streaming.api.functions.ProcessFunction
      import org.apache.flink.streaming.api.scala._
      import org.apache.flink.util.Collector
      
      /**
        * This example shows an implementation of WordCount with data from a text socket.
        * To run the example make sure that the service providing the text data is already up and running.
        *
        * To start an example socket text stream on your local machine run netcat from a command line,
        * where the parameter specifies the port number:
        *
        * {{{
        *   nc -lk 9999
        * }}}
        *
        * Usage:
        * {{{
        *   SocketTextStreamWordCount <hostname> <port> <output path>
        * }}}
        *
        * This example shows how to:
        *
        *   - use StreamExecutionEnvironment.socketTextStream
        *   - write a simple Flink Streaming program in scala.
        *   - write and use user-defined functions.
        */
      object SocketTextStreamWordCount {
      
        def main(args: Array[String]) {
          if (args.length != 2) {
            System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
            return
          }
      
          val hostName = args(0)
          val port = args(1).toInt
          val outputTag1 = OutputTag[String]("side-1")
          val outputTag2 = OutputTag[String]("side-2")
      
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          env.getConfig.enableObjectReuse()
      
          //Create streams for names and ages by mapping the inputs to the corresponding objects
          val text = env.socketTextStream(hostName, port).slotSharingGroup("processElement")
          val counts = text.flatMap {
            _.toLowerCase.split("\\W+") filter {
              _.nonEmpty
            }
          }
            .process(new ProcessFunction[String, String] {
              override def processElement(
                                           value: String,
                                           ctx: ProcessFunction[String, String]#Context,
                                           out: Collector[String]): Unit = {
                if (value.head <= 'm') ctx.output(outputTag1, String.valueOf(value))
                else ctx.output(outputTag2, String.valueOf(value))
              }
            })
      
          val sideOutputStream1: DataStream[String] = counts.getSideOutput(outputTag1)
          val sideOutputStream2: DataStream[String] = counts.getSideOutput(outputTag2)
      
          val output1 = sideOutputStream1.map {
            (_, 1)
          }.slotSharingGroup("map1")
            .keyBy(0)
            .sum(1)
      
          val output2 = sideOutputStream2.map {
            (_, 1)
          }.slotSharingGroup("map2")
            .keyBy(0)
            .sum(1)
      
          output1.print()
          output2.print()
      
          env.execute("Scala SocketTextStreamWordCount Example")
        }
      
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2017-06-19
        • 1970-01-01
        • 2016-05-28
        • 1970-01-01
        • 1970-01-01
        • 2012-06-21
        • 2020-08-13
        相关资源
        最近更新 更多