【问题标题】:How to load SUBSET of large Oracle table into Dask dataframe?如何将大型 Oracle 表的 SUBSET 加载到 Dask 数据帧中?
【发布时间】:2019-12-19 12:30:39
【问题描述】:

这是我尝试过的:

dask_rf = dd.from_pandas(pd.read_sql('select ...)', conn_cx_Oracle), npartitions = 10)

这给了我一个“大对象”警告并建议使用 client.scatter。问题是 client.scatter 似乎需要首先将数据加载到 Pandas 数据帧中,这就是为什么我首先使用 Dask 的原因,因为 RAM 限制。

Oracle 表太大,无法使用 Dask 的 read_sql_table 读取,因为 read_sql_table 不会以任何方式过滤该表。

想法? Dask 不适用于我的用例?

编辑 - 根据下面的答案和研究如何这样做之后,这是我尝试转换为使用 sqlalchemy 表达式:

from sqlalchemy import create_engine, Table, Column, String, MetaData, select

sql_engine = create_engine(f'oracle+cx_oracle://username:password@environment')

metadata = MetaData(bind=sql_engine)

table_reference = Table('table', metadata, autoload=True, schema='schema')

s = select([table_reference ]).where(table_reference .c.field_to_filter == filtered_value)

import dask.dataframe as dd

dask_df = dd.read_sql_table(s, 'sqlalchemy_connection_string', 'index_col', schema = 'schema')

dask_df.count()

Dask 系列结构:npartitions=1 action_timestamp int64 vendor_name ... dtype:int64 Dask 名称:dataframe-count-agg, 1996年任务

dask_df.count().compute()

DatabaseError: (cx_Oracle.DatabaseError) ORA-02391: 超出 同时 SESSIONS_PER_USER 限制(此错误的背景: http://sqlalche.me/e/4xp6)

为什么要尝试连接到 Oracle?

编辑 #2 - 以防万一,我进行了额外的测试。我想证明 sqlalchemy 可以独立工作,所以我通过以下方式证明了这一点:

result = sql_engine.execute(s)

type(result)

sqlalchemy.engine.result.ResultProxy

result.fetchone()

结果已显示

这似乎排除了 SQLAlchemy/Oracle 问题,那么有什么想法可以尝试下一步吗?

【问题讨论】:

    标签: dask


    【解决方案1】:

    我现在正在寻找同样的东西。

    为了不被卡住...您可能没有足够的 RAM,但您可能有很多可用存储空间。所以...现在的建议

    # imports
    import pandas as pd 
    import cx_Oracle as cx
    import dask.dataframe as dd
    
    # Connection stuff
    ...
    conn = ...
    
    # Query
    qry = "SELECT * FROM HUGE_TABLE"
    
    # Pandas Chunks
    for ix , chunk in enumerate(pd.io.sql.read_sql(qry , conn , ... , chunksize=1000000)):
        pd.DataFrame(chunk).to_csv(f"chunk_{ix}.csv" , sep=";") # or to_parquet 
    
    # Dask dataframe reading from files (chunks)
    dataset = dd.read_csv("chunk_*.csv" , sep=";" , blocksie=32e6) # or read_parquet
    

    由于这是 IO 密集型操作,并且您正在执行顺序操作,因此可能需要一段时间。

    我对“导出”更快的建议是对表进行分区并按每个分区并行执行块导出。

    【讨论】:

    • 谢谢。我会看看。我还能够通过将表导出到 csv 并将 csv 加载到 dask 来解决它,但这当然不是自动化的。我报告了一个错误,并被要求创建一个最小的错误报告,我没有时间使用 ATM,但是当我最终能够了解它时会在这里报告。与此同时,我还没有看到任何人从 oracle 成功加载 dask,所以就是这样。
    【解决方案2】:

    问题在于 client.scatter 似乎需要先将数据加载到 Pandas 数据帧中

    那是因为你在这里调用 Pandas 代码

    dd.from_pandas(pd.read_sql('select ...)', conn_cx_Oracle), npartitions = 10)
    #              pd.read_sql('select ...)', conn_cx_Oracle) # <<-----
    

    根据read_sql_table docstring 你应该能够传入一个 SQLAlchemy 表达式对象。

    read_sql_table 不接受任意 SQL 查询字符串的原因是因为它需要能够对您的查询进行分区,因此每个任务只加载整体的一部分。对于那里的许多方言来说,这是一件棘手的事情,所以我们依靠 sqlalchemy 来进行格式化。

    【讨论】:

    • 转换为使用 slqalchemy 表达式。有关详细信息,请参阅已编辑的问题。
    • 请参阅编辑#2 以测试 SQLAlchemy/Oracle 是否正常运行。
    猜你喜欢
    • 2018-11-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-01
    • 1970-01-01
    • 2013-03-29
    • 2020-06-09
    相关资源
    最近更新 更多