【问题标题】:How to Run SQLAlchemy Query on Dask Distributed?如何在 Dask 分布式上运行 SQLAlchemy 查询?
【发布时间】:2020-04-26 05:35:34
【问题描述】:

我正在尝试使用我设置的 dask 集群运行和并行化此 sqlalchemy 查询,因为我没有足够的内存从本地计算机执行它。

我的代码如下 - 我不确定这是否是实现此目的的最佳方法:

from dask.distributed import Client
import dask.dataframe as dd
from dask.delayed import delayed
client = Client(<IP Address>)

recent_dates = ['2020-04-24', '2020-04-23', 2020-04-22']

query = """SELECT * FROM table WHERE date = '%s'"""
queries = [query.format(d) for d in recent_dates]

from sqlalchemy.engine import create_engine
conn = create_engine(f'presto://{user}:{password}@{host}:{port}/{catalog}/{schema}',
                           connect_args={'protocol': 'https',
                                         'requests_kwargs': {'verify': key}})

con = engine.connect()
df = dd.from_delayed([delayed(pd.read_sql_query)(q, conn) for q in queries])

我收到以下错误:

TypeError: can't pickle _thread.RLock objects

【问题讨论】:

    标签: python sqlalchemy dask dask-distributed dask-dataframe


    【解决方案1】:

    您应该使用函数read_sql_table,它就是为此目的而设计的。如果您阅读文档字符串和/或代码,您会看到传递给工作人员的是查询本身,这些工作人员在本地创建自己的引擎实例。这是因为 sqlalchemy 实例具有无法在工作人员之间发送的状态,如您所见。

    请注意,read_sql_table 也关心对数据进行分区,因为这是 Dask,重点是处理大于内存的数据。在您的示例中,我猜索引/分区列是date,并且您想传递要明确拆分的“分区”。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-03-16
      • 1970-01-01
      • 2020-08-19
      • 2019-11-16
      • 2021-03-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多