【问题标题】:Understanding --archive in dataproc pyspark了解 dataproc pyspark 中的 --archive
【发布时间】:2022-02-18 02:03:02
【问题描述】:

命令帮助是这样说的:

--archives=[存档,...] 要提取到工作中的档案的逗号分隔列表 每个执行者的目录。必须是以下文件格式之一: .zip、.tar、.tar.gz 或 .tgz。

而且,这个答案here 告诉我--archives 只会在工作节点上提取

我正在通过以下方式测试 --archive 行为: tl;dr - 1. 我创建一个存档并压缩它。 2.我创建了一个简单的rdd并将其元素映射到os. walk('./')。 3. archive.zip 被列为一个目录,但os.walk 没有遍历这个分支

我的archive 目录:

.
├── archive
│   ├── a1.py
│   ├── a1.txt
│   └── archive1
│       ├── a1_in.py
│       └── a1_in.txt
├── archive.zip
└── main.py

2 directories, 6 files

测试代码:

import os
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize(range(1))
walk_worker = rdd.map(lambda x: str(list(os.walk('./')))).distinct().collect()
walk_driver = list(os.walk('./'))
print('driver walk:', walk_driver)
print('worker walk:',walk_worker)

Dataproc 运行命令:

gcloud dataproc jobs submit pyspark main.py --cluster pyspark-monsoon31 --region us-central1 --archives archive.zip

输出:

driver walk: [('./', [], ['.main.py.crc', 'archive.zip', 'main.py', '.archive.zip.crc'])]
worker walk: ["[('./', ['archive.zip', '__spark_conf__', 'tmp'], ['pyspark.zip', '.default_container_executor.sh.crc', '.container_tokens.crc', 'default_container_executor.sh', 'launch_container.sh', '.launch_container.sh.crc', 'default_container_executor_session.sh', '.default_container_executor_session.sh.crc', 'py4j-0.10.9-src.zip', 'container_tokens']), ('./tmp', [], ['liblz4-java-5701923559211144129.so.lck', 'liblz4-java-5701923559211144129.so'])]"]

驱动节点的输出archive.zip 可用但未提取 - 预期

工作节点的输出os.walkarchive.zip 列为提取目录。可用的 3 个目录是 ['archive.zip', '__spark_conf__', 'tmp']。但是,令我惊讶的是,只有./tmp 被进一步遍历,仅此而已

我使用os.listdir 检查了archive.zip 实际上是一个目录而不是一个zip。它的结构是:

└── archive.zip
    └── archive
        ├── a1.py
        ├── a1.txt
        └── archive1
            ├── a1_in.py
            └── a1_in.txt

那么,为什么os.walk 没有走下archive.zip 目录?

【问题讨论】:

    标签: apache-spark pyspark google-cloud-dataproc dataproc


    【解决方案1】:

    archive.zip 作为符号链接添加到工作节点。默认情况下不遍历符号链接。

    如果您更改为walk_worker = rdd.map(lambda x: str(list(os.walk('./', followlinks=True)))).distinct().collect(),您将获得您正在寻找的输出:

    worker walk: ["[('./', ['__spark_conf__', 'tmp', 'archive.zip'], ...
     ('./archive.zip', ['archive'], []), ('./archive.zip/archive', ['archive1'], ['a1.txt', 'a1.py']), ...."]
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-06-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-01-01
      • 1970-01-01
      相关资源
      最近更新 更多