【问题标题】:Low parallelism when running Apache Beam wordcount pipeline on Spark with Python SDK使用 Python SDK 在 Spark 上运行 Apache Beam wordcount 管道时并行度低
【发布时间】:2020-11-17 16:06:50
【问题描述】:

我对 Spark 集群配置和运行 Pyspark 管道非常有经验,但我只是从 Beam 开始。因此,我试图在 Spark PortableRunner 上对 Pyspark 和 Beam python SDK 进行苹果对苹果的比较(在同一个小型 Spark 集群上运行,4 个工作人员,每个工作人员有 4 个内核和 8GB RAM),我已经确定了一个相当大的数据集的字数统计工作,将结果存储在 Parquet 表中。

因此,我下载了 50GB 的 Wikipedia 文本文件,拆分为大约 100 个未压缩文件,并将它们存储在目录 /mnt/nfs_drive/wiki_files//mnt/nfs_drive 是安装在所有工作人员上的 NFS 驱动器)。

首先,我正在运行以下 Pyspark 字数统计脚本:

from pyspark.sql import SparkSession, Row
from operator import add
wiki_files = '/mnt/nfs_drive/wiki_files/*'

spark = SparkSession.builder.appName("WordCountSpark").getOrCreate()

spark_counts = spark.read.text(wiki_files).rdd.map(lambda r: r['value']) \
    .flatMap(lambda x: x.split(' ')) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(add) \
    .map(lambda x: Row(word=x[0], count=x[1]))

spark.createDataFrame(spark_counts).write.parquet(path='/mnt/nfs_drive/spark_output', mode='overwrite')

脚本运行良好,大约 8 分钟后将 parquet 文件输出到所需位置。主要阶段(读取和拆分令牌)被划分为合理数量的任务,以便高效地使用集群:

我现在正在尝试使用 Beam 和便携式跑步者实现相同的目标。首先,我使用以下命令启动了 Spark 作业服务器(在 Spark 主节点上):

docker run --rm --net=host -e SPARK_EXECUTOR_MEMORY=8g apache/beam_spark_job_server:2.25.0 --spark-master-url=spark://localhost:7077

然后,在主节点和工作节点上,我正在运行 SDK Harness,如下所示:

 docker run --net=host -d --rm -v /mnt/nfs_drive:/mnt/nfs_drive apache/beam_python3.6_sdk:2.25.0 --worker_pool

现在 Spark 集群已设置为运行 Beam 管道,我可以提交以下脚本:

import apache_beam as beam
import pyarrow
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import fileio

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=EXTERNAL",
    "--environment_config=localhost:50000",
    "--job_name=WordCountBeam"
])


wiki_files = '/mnt/nfs_drive/wiki_files/*'

p = beam.Pipeline(options=options)
beam_counts = (
    p
    | fileio.MatchFiles(wiki_files)
    | beam.Map(lambda x: x.path)
    | beam.io.ReadAllFromText()
    | 'ExtractWords' >> beam.FlatMap(lambda x: x.split(' '))
    | beam.combiners.Count.PerElement()
    | beam.Map(lambda x: {'word': x[0], 'count': x[1]})
)


_ = beam_counts | 'Write' >> beam.io.WriteToParquet('/mnt/nfs_drive/beam_output',
      pyarrow.schema(
          [('word', pyarrow.binary()), ('count', pyarrow.int64())]
      )
)

result = p.run().wait_until_finish()

代码提交成功,我可以在 Spark UI 上看到作业,并且工作人员正在执行它。但是,即使运行超过 1 小时,它也不会产生任何输出!

因此,我想确保我的设置没有问题,因此我在较小的数据集(只有 1 个 Wiki 文件)上运行了完全相同的脚本。这在大约 3.5 分钟内成功完成(同一数据集上的 Spark 字数统计需要 16 秒!)。

我想知道 Beam 怎么会那么慢,所以我开始查看 Beam 管道通过作业服务器提交给 Spark 的 DAG。我注意到 Spark 作业大部分时间都在以下阶段:

这只是分成 2 个任务,如下所示:

打印调试行表明此任务是执行“繁重”(即从 wiki 文件中读取行并拆分令牌)的地方 - 但是,由于这仅发生在 2 个任务中,因此工作将分布在 2 个最多工人。同样有趣的是,在大型 50GB 数据集上运行完全相同相同的 DAG 和 完全相同相同数量的任务。

我很不确定如何进一步进行。 Beam 管道似乎降低了并行度,但我不确定这是否是由于作业服务器对管道的次优转换,或者我是否应该以其他方式指定我的 PTransforms 以增加 Spark 上的并行度.

任何建议表示赞赏!

【问题讨论】:

    标签: python apache-spark apache-beam


    【解决方案1】:

    管道的文件IO部分可以使用apache_beam.io.textio.ReadFromText(file_pattern='/mnt/nfs_drive/wiki_files/*')进行简化。

    Fusion 是另一个可能阻止并行的原因。解决方案是在读取所有文件后输入apache_beam.transforms.util.Reshuffle

    【讨论】:

    • 我同意第一点,其实我之前是直接用ReadFromText,不过改成上面的方式是希望先用MatchFiles能提高并行度。关于重新洗牌:我试过了,但似乎没有帮助,但我会再试一次。但是,强制改组会在工作人员之间移动数据......
    • 不幸的是,我将管道更改为beam_counts = p | beam.io.ReadFromText(textfile, validate=False, coder=CustomCoder()) | beam.Reshuffle() | beam.FlatMap(split_word) | beam.combiners.Count.PerElement(),我可以确认无论更改为ReadFromText 还是添加Reshuffle 都不会提高管道的并行性。
    • 能否请您确认 spark 版本:是 2.x、3 吗?你如何启动它?也许那里隐藏着一个小的并行化。
    • Spark 版本为 2.4.6(Beam 中尚不支持 3.x)。在提交任何作业之前,spark 集群在独立模式下手动启动,使用默认并行度。我还怀疑存在一些隐藏的低默认并行度,但它肯定不在我的集群设置中(纯 Spark 作业在同一集群上具有高并行度)。正如stackoverflow.com/questions/64488060/… 中提到的,作业服务器配置了自己的 spark 上下文,所以它可能在那里......
    【解决方案2】:

    花了一段时间,但我弄清楚了问题所在和解决方法。

    根本问题在于 Beam 的便携式运行器,特别是 Beam 作业被转换为 Spark 作业的地方。

    翻译代码(由作业服务器执行)根据对sparkContext().defaultParallelism() 的调用将阶段拆分为任务。作业服务器没有显式配置默认并行度(并且不允许用户通过管道选项设置),因此理论上它会根据执行器的数量配置并行度(请参阅此处的说明https://spark.apache.org/docs/latest/configuration.html#execution-behavior)。这似乎是调用defaultParallelism()时翻译代码的目标。

    现在,在实践中,众所周知,当依赖回退机制时,过早调用 sparkContext().defaultParallelism() 可能会导致数量低于预期,因为执行程序可能尚未在上下文中注册然而。特别是,过早调用defaultParallelism() 会得到 2 个结果,并且阶段将仅被拆分为 2 个任务。

    因此,我的“肮脏黑客”解决方法包括修改作业服务器的源代码,只需在实例化 SparkContext 之后和执行任何其他操作之前添加 3 秒的延迟:

    $ git diff                                                                                                                                                                                                                                                                                                                         v2.25.0
    diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
    index aa12192..faaa4d3 100644
    --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
    +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
    @@ -95,7 +95,13 @@ public final class SparkContextFactory {
           conf.setAppName(contextOptions.getAppName());
           // register immutable collections serializers because the SDK uses them.
           conf.set("spark.kryo.registrator", SparkRunnerKryoRegistrator.class.getName());
    -      return new JavaSparkContext(conf);
    +      JavaSparkContext jsc = new JavaSparkContext(conf);
    +      try {
    +        Thread.sleep(3000);
    +      } catch (InterruptedException e) {
    +      }
    +      return jsc;
         }
       }
     }
    

    在重新编译作业服务器并使用此更改启动它之后,对defaultParallelism() 的所有调用都在执行程序注册之后完成,并且阶段很好地分为 16 个任务(与数字相同)执行人)。正如预期的那样,这项工作现在完成得更快了,因为有更多的工人在做这项工作(但它仍然比纯 Spark 字数慢 3 倍)。

    虽然这可行,但它当然不是一个很好的解决方案。更好的解决方案是以下之一:

    • 更改翻译引擎,使其能够根据可用执行器的数量以更稳健的方式推断任务数量;
    • 允许用户通过管道选项配置作业服务器用于翻译作业的默认并行度(这是 Flink 便携式运行器所做的)。

    在找到更好的解决方案之前,它显然会阻止在生产集群中使用任何 Beam Spark 作业服务器。我会将问题发布到 Beam 的工单队列,以便可以实施更好的解决方案(希望很快)。

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-06-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多