【问题标题】:Questions about parallelism with Flink on YARN cluster关于 YARN 集群上的 Flink 并行性的问题
【发布时间】:2017-08-17 07:04:03
【问题描述】:

对于 Apache Flink 以及流处理框架的一般方式,我对它有几个问题,尤其是在并行性方面。

首先这是我的代码:

object KafkaConsuming {

  def main(args: Array[String]) {

    // **** CONFIGURATION & PARAMETERS ****
    val params: ParameterTool = ParameterTool.fromArgs(args)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(8)
        env.getConfig.setGlobalJobParameters(params)

    // **** Kafka CONNECTION ****
    val properties = new Properties();
    properties.setProperty("bootstrap.servers", params.get("server"));

    // **** Get KAFKA source ****
    val stream: DataStream[String] = env.addSource(new FlinkKafkaConsumer010[String](params.get("topic"), new SimpleStringSchema(), properties))

    // **** PROCESSING ****
    val logs: DataStream[MinifiedLog] = stream.map(x => LogParser2.parse(x))

    val sessions = logs.map { x => (x.timestamp, x.bytesSent, 1l, 1)}

    val sessionCnt: DataStream[(Long, Long, Long, Int)] = sessions
      .keyBy(3).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .reduce( (x: (Long, Long, Long, Int), y: (Long, Long, Long, Int)) => (x._1, x._2 + y._2, x._3 + y._3, x._4))
      .map { z => (z._1, z._2 / 10, z._3 / 10, z._4)}


    // **** OUTPUT ****
    val output: DataStream[String] = sessionCnt.map(x => (x.toString() + "\n"))
    output.writeToSocket("X.X.X.X", 3333, new SimpleStringSchema)

    env.execute("Kafka consuming")

  }
}

当我想在我的集群上运行它时,我运行这个命令:

./bin/flink run -m yarn-cluster -yn 8 /directories/myjar.jar --server X.X.X.X --topic mytopic

这工作正常。现在这是我的问题:

我在 Flink Web UI 中得到了这个:

1.为什么收到的记录总是发送记录的一半,而数据量却是一样的?

然后,如果我进入窗口的详细信息:

显然所有的过程都是在我的奴隶 4 上完成的,而且只有一个线程!来源也是如此。只用一个线程来接收数据。

2。为什么 Flink 没有为该步骤使用所有可能的线程?

我注意到源、窗口和接收器由不同的从属设备处理,但我仍然希望该过程在我的集群上并行完成。

我在这篇文章中读到:https://stackoverflow.com/a/32329010/5035392 有效地,如果 Kafka 源只有一个分区(这是我的情况),Flink 无法在不同的节点上共享任务。不过,我的窗口处理应该可以做到吧?

如果这些是微不足道的问题,我很抱歉。我不确定我做错了什么是 Flink 还是我的集群配置。谢谢。

【问题讨论】:

    标签: scala parallel-processing apache-flink


    【解决方案1】:

    广告。 2 同一键的所有值都在单个TaskManager 上处理。 在您的情况下,sessions.keyBy(3) 流的每个元素都具有相同的键 -> 1 因此所有计算都在单个任务槽中执行。

    【讨论】:

    • 你对第一个问题有想法吗?
    猜你喜欢
    • 2015-04-25
    • 2021-10-29
    • 1970-01-01
    • 1970-01-01
    • 2020-12-27
    • 1970-01-01
    • 1970-01-01
    • 2020-04-26
    • 2023-03-15
    相关资源
    最近更新 更多