【发布时间】:2020-11-17 16:06:50
【问题描述】:
我对 Spark 集群配置和运行 Pyspark 管道非常有经验,但我只是从 Beam 开始。因此,我试图在 Spark PortableRunner 上对 Pyspark 和 Beam python SDK 进行苹果对苹果的比较(在同一个小型 Spark 集群上运行,4 个工作人员,每个工作人员有 4 个内核和 8GB RAM),我已经确定了一个相当大的数据集的字数统计工作,将结果存储在 Parquet 表中。
因此,我下载了 50GB 的 Wikipedia 文本文件,拆分为大约 100 个未压缩文件,并将它们存储在目录 /mnt/nfs_drive/wiki_files/(/mnt/nfs_drive 是安装在所有工作人员上的 NFS 驱动器)。
首先,我正在运行以下 Pyspark 字数统计脚本:
from pyspark.sql import SparkSession, Row
from operator import add
wiki_files = '/mnt/nfs_drive/wiki_files/*'
spark = SparkSession.builder.appName("WordCountSpark").getOrCreate()
spark_counts = spark.read.text(wiki_files).rdd.map(lambda r: r['value']) \
.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add) \
.map(lambda x: Row(word=x[0], count=x[1]))
spark.createDataFrame(spark_counts).write.parquet(path='/mnt/nfs_drive/spark_output', mode='overwrite')
脚本运行良好,大约 8 分钟后将 parquet 文件输出到所需位置。主要阶段(读取和拆分令牌)被划分为合理数量的任务,以便高效地使用集群:
我现在正在尝试使用 Beam 和便携式跑步者实现相同的目标。首先,我使用以下命令启动了 Spark 作业服务器(在 Spark 主节点上):
docker run --rm --net=host -e SPARK_EXECUTOR_MEMORY=8g apache/beam_spark_job_server:2.25.0 --spark-master-url=spark://localhost:7077
然后,在主节点和工作节点上,我正在运行 SDK Harness,如下所示:
docker run --net=host -d --rm -v /mnt/nfs_drive:/mnt/nfs_drive apache/beam_python3.6_sdk:2.25.0 --worker_pool
现在 Spark 集群已设置为运行 Beam 管道,我可以提交以下脚本:
import apache_beam as beam
import pyarrow
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import fileio
options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=EXTERNAL",
"--environment_config=localhost:50000",
"--job_name=WordCountBeam"
])
wiki_files = '/mnt/nfs_drive/wiki_files/*'
p = beam.Pipeline(options=options)
beam_counts = (
p
| fileio.MatchFiles(wiki_files)
| beam.Map(lambda x: x.path)
| beam.io.ReadAllFromText()
| 'ExtractWords' >> beam.FlatMap(lambda x: x.split(' '))
| beam.combiners.Count.PerElement()
| beam.Map(lambda x: {'word': x[0], 'count': x[1]})
)
_ = beam_counts | 'Write' >> beam.io.WriteToParquet('/mnt/nfs_drive/beam_output',
pyarrow.schema(
[('word', pyarrow.binary()), ('count', pyarrow.int64())]
)
)
result = p.run().wait_until_finish()
代码提交成功,我可以在 Spark UI 上看到作业,并且工作人员正在执行它。但是,即使运行超过 1 小时,它也不会产生任何输出!
因此,我想确保我的设置没有问题,因此我在较小的数据集(只有 1 个 Wiki 文件)上运行了完全相同的脚本。这在大约 3.5 分钟内成功完成(同一数据集上的 Spark 字数统计需要 16 秒!)。
我想知道 Beam 怎么会那么慢,所以我开始查看 Beam 管道通过作业服务器提交给 Spark 的 DAG。我注意到 Spark 作业大部分时间都在以下阶段:
打印调试行表明此任务是执行“繁重”(即从 wiki 文件中读取行并拆分令牌)的地方 - 但是,由于这仅发生在 2 个任务中,因此工作将分布在 2 个最多工人。同样有趣的是,在大型 50GB 数据集上运行完全相同相同的 DAG 和 完全相同相同数量的任务。
我很不确定如何进一步进行。 Beam 管道似乎降低了并行度,但我不确定这是否是由于作业服务器对管道的次优转换,或者我是否应该以其他方式指定我的 PTransforms 以增加 Spark 上的并行度.
任何建议表示赞赏!
【问题讨论】:
标签: python apache-spark apache-beam