【问题标题】:Dask Distributed: Reading .csv from HDFSDask 分布式:从 HDFS 读取 .csv
【发布时间】:2019-02-17 03:03:36
【问题描述】:

我正在使用 "Distributed Pandas on a Cluster with Dask DataFrames" 作为指导对 Dask 进行性能测试。

在 Matthew 的示例中,他有一个 20GB 的文件和 64 个工作器(8 个物理节点)。

就我而言,我有一个 82GB 的文件和 288 个工作人员(12 个物理节点;每个节点上都有一个 HDFS 数据节点)。

在所有 12 个节点上,我可以访问 HDFS 并执行显示文件信息的简单 Python 脚本:

import pyarrow as pa
fs = pa.hdfs.connect([url], 8022)
print(str(fs.info('/path/to/file.csv')))

如果我只使用运行 Dask Scheduler 的机器创建一个单节点集群(只有 24 个工作人员),我可以从 HDFS 读取 .csv 并打印长度:

import dask
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
dask.config.set(hdfs_backend='pyarrow')
df = dd.read_csv('hdfs://[url]:8022/path/to/file.csv')
df = client.persist(df)
print(str(len(df)))

最后一行给出了 1046250873(很好!),运行时间为 3 分 17 秒。

但是,当我使用完整的集群时,最后一行调用 len(df) 会死掉,我会收到此错误:

KilledWorker: ("('pandas_read_text-read-block-from-delayed-9ad3beb62f0aea4a07005d5c98749d7e', 1201)", 'tcp://[url]:42866')

这类似于here 提到的问题,它有一个解决方案here 涉及 Dask Yarn 和一个配置 (?),如下所示:worker_env={'ARROW_LIBHDFS_DIR': ...}

但是,我没有使用 Yarn,尽管我的猜测是 Dask Worker 没有配置他们连接所需的 HDFS/Arrow 路径。

我没有看到任何关于此的文档,因此我想问我缺少什么。

编辑:

这是我在 Dask Workers 的输出中看到的错误回溯:

distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95N\x05\x00\x00\x00\x00\x00\x00(\x8c\x14dask.dataframe.utils\x94\x8c\ncheck_meta\x94\x93\x94(\x8c\x12dask
.compatibility\x94\x8c\x05apply\x94\x93\x94\x8c\x15dask.dataframe.io.csv\x94\x8c\x10pandas_read_text\x94\x93\x94]\x94(\x8c\x11pandas.io.parsers\x94\x8c\x08read_csv\x94\x93\x94(
\x8c\x0fdask.bytes.core\x94\x8c\x14read_block_from_file\x94\x93\x94h\r\x8c\x08OpenFile\x94\x93\x94(\x8c\x12dask.bytes.pyarrow\x94\x8c\x17PyArrowHadoopFileSystem\x94\x93\x94)\x8
1\x94}\x94(\x8c\x02fs\x94\x8c\x0cpyarrow.hdfs\x94\x8c\x10HadoopFileSystem\x94\x93\x94(\x8c\r10.255.200.91\x94MV\x1fNN\x8c\x07libhdfs\x94Nt\x94R\x94\x8c\x08protocol\x94\x8c\x04h
dfs\x94ub\x8c\x1a/path/to/file.csv\x94\x8c\x02rb\x94NNNt\x94R\x94K\x00J\x00\x90\xd0\x03C\x01\n\x94t\x94C\x12animal,weight,age\n\x94\x8c\x08builtins\x94\x8c\x04dict\x94
\x93\x94]\x94\x86\x94h*]\x94(]\x94(\x8c\x06animal\x94\x8c\x05numpy\x94\x8c\x05dtype\x94\x93\x94\x8c\x02O8\x94K\x00K\x01\x87\x94R\x94(K\x03\x8c\x01|\x94NNNJ\xff\xff\xff\xffJ\xff
\xff\xff\xffK?t\x94be]\x94(\x8c\x06weight\x94h2\x8c\x02i8\x94K\x00K\x01\x87\x94R\x94(K\x03\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94be]\x94(\x8c\x03age\x94h<e
e\x86\x94]\x94(h/h9h@eeh*]\x94(]\x94(\x8c\x0cwrite_header\x94\x89e]\x94(\x8c\x07enforce\x94\x89e]\x94(\x8c\x04path\x94Nee\x86\x94t\x94\x8c\x11pandas.core.frame\x94\x8c\tDataFra
me\x94\x93\x94)\x81\x94}\x94(\x8c\x05_data\x94\x8c\x15pandas.core.internals\x94\x8c\x0cBlockManager\x94\x93\x94)\x81\x94(]\x94(\x8c\x18pandas.core.indexes.base\x94\x8c\n_new_In
dex\x94\x93\x94hW\x8c\x05Index\x94\x93\x94}\x94(\x8c\x04data\x94\x8c\x15numpy.core.multiarray\x94\x8c\x0c_reconstruct\x94\x93\x94h0\x8c\x07ndarray\x94\x93\x94K\x00\x85\x94C\x01
b\x94\x87\x94R\x94(K\x01K\x03\x85\x94h5\x89]\x94(h/h9h@et\x94b\x8c\x04name\x94Nu\x86\x94R\x94hY\x8c\x19pandas.core.indexes.range\x94\x8c\nRangeIndex\x94\x93\x94}\x94(hjN\x8c\x0
5start\x94K\x00\x8c\x04stop\x94K\x00\x8c\x04step\x94K\x01u\x86\x94R\x94e]\x94(h`hbK\x00\x85\x94hd\x87\x94R\x94(K\x01K\x02K\x00\x86\x94h<\x89C\x00\x94t\x94bh`hbK\x00\x85\x94hd\x
87\x94R\x94(K\x01K\x01K\x00\x86\x94h5\x89]\x94t\x94be]\x94(hYh[}\x94(h]h`hbK\x00\x85\x94hd\x87\x94R\x94(K\x01K\x02\x85\x94h5\x89]\x94(h9h@et\x94bhjNu\x86\x94R\x94hYh[}\x94(h]h`
hbK\x00\x85\x94hd\x87\x94R\x94(K\x01K\x01\x85\x94h5\x89]\x94h/at\x94bhjNu\x86\x94R\x94e}\x94\x8c\x060.14.1\x94}\x94(\x8c\x04axes\x94hV\x8c\x06blocks\x94]\x94(}\x94(\x8c\x06valu
es\x94hy\x8c\x08mgr_locs\x94h(\x8c\x05slice\x94\x93\x94K\x01K\x03K\x01\x87\x94R\x94u}\x94(h\x9dh\x7fh\x9eh\xa0K\x00K\x01K\x01\x87\x94R\x94ueust\x94b\x8c\x04_typ\x94\x8c\tdatafr
ame\x94\x8c\t_metadata\x94]\x94ub\x8c\x0cfrom_delayed\x94t\x94.'
Traceback (most recent call last):
  File "/usr/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
  File "/usr/lib64/python3.6/site-packages/pyarrow/hdfs.py", line 38, in __init__
    self._connect(host, port, user, kerb_ticket, driver, extra_conf)
  File "pyarrow/io-hdfs.pxi", line 89, in pyarrow.lib.HadoopFileSystem._connect
  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: Unable to load libjvm

再次,我可以使用 pyarrow 从 12 个节点中的任何一个从 HDFS 成功读取文件。

【问题讨论】:

    标签: python hdfs dask dask-distributed


    【解决方案1】:

    查看回溯我的猜测是 PyArrow 没有正确安装在工作节点上。我可能会在 PyArrow 问题跟踪器上询问他们是否可以帮助您诊断该回溯。

    【讨论】:

      【解决方案2】:

      嘿男孩!在从头构建 libhdfs3 并部署到集群的一部分并找到相同的确切结果 (ImportError: Can not find the shared library: libhdfs3.so) 后,我意识到问题是我一直在通过 pssh 启动 Dask 工作人员,所以他们没有捕获环境变量他们应该。

      【讨论】:

      • 我正在尝试在集群中运行 8gb csv 文件。我收到错误“FileNotFoundError:[Errno 2] 没有这样的文件或目录:'hadoop':'hadoop'”。 pyarrow 已经安装并且 hdfs3 也安装了。请逐步告诉您如何能够从客户端读取_csv 在工作人员中执行 csv。工作人员和客户在不同的笔记本电脑上。
      猜你喜欢
      • 2015-03-28
      • 2019-07-16
      • 1970-01-01
      • 1970-01-01
      • 2013-06-13
      • 1970-01-01
      • 2016-03-19
      • 2016-12-09
      相关资源
      最近更新 更多