【问题标题】:Dask DataFrame Coiled KilledWorker read_sqlDask DataFrame Coiled KilledWorker read_sql
【发布时间】:2022-01-02 07:28:26
【问题描述】:

我正在尝试在 Dash 应用程序旁边运行 Dask 集群来分析非常大的数据集。我能够成功运行LocalCluster,并且 Dask DataFrame 计算成功发生。 Dash 应用程序使用以下gunicorn 命令启动:

不幸的是,当我尝试将集群移动到 coiled 时,我的问题出现了。

coiled.create_software_environment(
    name="my-conda-env",
    conda={
        "channels": ["conda-forge", "defaults"],
        "dependencies": ["dask", "dash"],
    },
)

coiled.create_cluster_configuration(
    name="my-cluster-config",
    scheduler_cpu=1,
    scheduler_memory="1 GiB",
    worker_cpu=2,
    worker_memory="1 GiB",
    software="my-conda-env"
)

cluster = coiled.Cluster(n_workers=2)
CLIENT = Client(cluster)

dd_bills_df = dd.read_sql_table(
    table, conn_string, npartitions=10, index_col='DB_BillID'
)
CLIENT.publish_dataset(bills=dd_bills_df)
del dd_bills_df

log.debug(CLIENT.list_datasets())

x = CLIENT.get_dataset('bills').persist()
log.debug(x.groupby('BillType').count().compute())

集群创建成功,数据集成功发布到集群,然后数据集被客户端成功拉入变量x。问题出现在groupby() 计算过程中。

[2021-12-03 17:40:30 -0600] [78928] [ERROR] Exception in worker process
Traceback (most recent call last):
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/arbiter.py", line 589, in spawn_worker
    worker.init_process()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/workers/base.py", line 134, in init_process
    self.load_wsgi()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/workers/base.py", line 146, in load_wsgi
    self.wsgi = self.app.wsgi()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/base.py", line 67, in wsgi
    self.callable = self.load()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/wsgiapp.py", line 58, in load
    return self.load_wsgiapp()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/app/wsgiapp.py", line 48, in load_wsgiapp
    return util.import_app(self.app_uri)
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/gunicorn/util.py", line 359, in import_app
    mod = importlib.import_module(module)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
  File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 855, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/Users/leowotzak/PenHole/test-containers2/src/application.py", line 61, in <module>
    log.debug(x.groupby('BillType').count().compute())
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/dask/base.py", line 288, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/dask/base.py", line 571, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 2725, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 1980, in gather
    return self.sync(
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 868, in sync
    return sync(
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/utils.py", line 332, in sync
    raise exc.with_traceback(tb)
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/utils.py", line 315, in f
    result[0] = yield future
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/Users/leowotzak/PenHole/test-containers2/venv/lib/python3.9/site-packages/distributed/client.py", line 1845, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('_read_sql_chunk-5519d13b-b80d-468e-afd5-5f072b9adbec', <WorkerState 'tls://10.4.27.61:40673', name: coiled-dask-leowotzc2-75566-worker-6a0538671d, status: closed, memory: 0, processing: 10>)

这是崩溃前的日志输出:

DEBUG:application:Dask DataFrame Structure:
               BillName BillType ByRequest Congress EnactedAs    IntroducedAt BillNumber OfficialTitle PopularTitle ShortTitle CurrentStatus BillSubjectTopTerm     URL TextURL  DB_LastModDate  DB_CreatedDate
npartitions=10                                                                                                                                                                                                 
1.0              object   object     int64   object    object  datetime64[ns]     object        object       object     object        object             object  object  object  datetime64[ns]  datetime64[ns]
2739.9              ...      ...       ...      ...       ...             ...        ...           ...          ...        ...           ...                ...     ...     ...             ...             ...
...                 ...      ...       ...      ...       ...             ...        ...           ...          ...        ...           ...                ...     ...     ...             ...             ...
24651.1             ...      ...       ...      ...       ...             ...        ...           ...          ...        ...           ...                ...     ...     ...             ...             ...
27390.0             ...      ...       ...      ...       ...             ...        ...           ...          ...        ...           ...                ...     ...     ...             ...             ...
Dask Name: from-delayed, 20 tasks
DEBUG:application:('bills',)

我尝试增加分配给每个工作人员的内存和 Dask DataFrame 中的分区数,但无济于事。我正在努力弄清楚是什么杀死了工人,还有其他人遇到这个错误吗?

【问题讨论】:

    标签: python sqlalchemy dask


    【解决方案1】:

    解决方案

    错误的根源在于错误配置的dask-workerdask-scheduler 软件环境,与coiled 和原帖中的代码示例无关。

    dask-schedulerdask-worker 进程在 EC2 实例上的 docker 容器中运行。为了初始化这些进程,使用了以下命令:

    sudo docker run -it --net=host daskdev/dask:latest dask-worker <host>:<port>
    

    daskdev/dask 在文档中是这样定义的:

    这是一个普通的 debian + miniconda 映像,带有完整的 Dask conda 包(包括分布式调度程序)、Numpy 和 Pandas。此图片大小约为 1GB。

    问题是,dask.dataframe.read_sql_table(...) 使用了sqlalchemy,并扩展了一个数据库驱动器,例如pymysql。这些不包含在此基本映像中。为了解决这个问题,可以将之前的docker run 命令修改为:

    sudo docker run -it -e EXTRA_PIP_PACKAGES="sqlalchemy pymysql" --net=host daskdev/dask:latest dask-worker <host>:<port>
    

    【讨论】:

      【解决方案2】:

      如果数据集非常大,将 1GB 设置为工作程序和调度程序可能会受到很大限制。有两种选择:

      1. 将 worker 和 scheduler 的内存设置为与本地计算机相当的水平。

      2. 在表的一个相当小的子集上尝试coiled 版本的代码。

      【讨论】:

      • 我尝试将调度程序和工作人员内存设置为 8GB,但不幸的是没有成功。关于你的第二点,有没有办法使用 Dask 从 SQL 表中只读取选择行?我所看到的只是我需要拉整张桌子。 docs.dask.org/en/latest/generated/…
      • 嗯,这并不理想,但一种选择是通过 SQL 手动将行的子集复制到新表中并用于测试...
      【解决方案3】:

      当使用大结果进行groupby操作时,你可以尝试以下方法:

      【讨论】:

      • 我遵循了你的前两点,不幸的是它们没有奏效,我的命令如下:dd_bills_df.groupby('BillType', observed=True).size(split_out=True).compute()。我开始相信这个问题与 SQL 表的读取方式有关:dd_bills_df = dd.read_sql_table(table, conn_string, index_col='DB_BillID', npartitions=10) 因为我什至无法在没有错误的情况下执行 dd_bills_df.head(3)。
      • 哦,明白了 - 所以这根本不是 groupby 问题。分块查询结果会不会太大?
      • 看来不是,我会进一步调查并更新我的问题,谢谢。
      猜你喜欢
      • 2018-03-23
      • 2018-05-14
      • 2018-03-30
      • 1970-01-01
      • 2022-08-06
      • 1970-01-01
      • 1970-01-01
      • 2019-03-19
      • 1970-01-01
      相关资源
      最近更新 更多