【发布时间】:2019-12-19 12:30:39
【问题描述】:
这是我尝试过的:
dask_rf = dd.from_pandas(pd.read_sql('select ...)', conn_cx_Oracle), npartitions = 10)
这给了我一个“大对象”警告并建议使用 client.scatter。问题是 client.scatter 似乎需要首先将数据加载到 Pandas 数据帧中,这就是为什么我首先使用 Dask 的原因,因为 RAM 限制。
Oracle 表太大,无法使用 Dask 的 read_sql_table 读取,因为 read_sql_table 不会以任何方式过滤该表。
想法? Dask 不适用于我的用例?
编辑 - 根据下面的答案和研究如何这样做之后,这是我尝试转换为使用 sqlalchemy 表达式:
from sqlalchemy import create_engine, Table, Column, String, MetaData, select
sql_engine = create_engine(f'oracle+cx_oracle://username:password@environment')
metadata = MetaData(bind=sql_engine)
table_reference = Table('table', metadata, autoload=True, schema='schema')
s = select([table_reference ]).where(table_reference .c.field_to_filter == filtered_value)
import dask.dataframe as dd
dask_df = dd.read_sql_table(s, 'sqlalchemy_connection_string', 'index_col', schema = 'schema')
dask_df.count()
Dask 系列结构:npartitions=1 action_timestamp int64 vendor_name ... dtype:int64 Dask 名称:dataframe-count-agg, 1996年任务
dask_df.count().compute()
DatabaseError: (cx_Oracle.DatabaseError) ORA-02391: 超出 同时 SESSIONS_PER_USER 限制(此错误的背景: http://sqlalche.me/e/4xp6)
为什么要尝试连接到 Oracle?
编辑 #2 - 以防万一,我进行了额外的测试。我想证明 sqlalchemy 可以独立工作,所以我通过以下方式证明了这一点:
result = sql_engine.execute(s)
type(result)
sqlalchemy.engine.result.ResultProxy
result.fetchone()
结果已显示
这似乎排除了 SQLAlchemy/Oracle 问题,那么有什么想法可以尝试下一步吗?
【问题讨论】:
标签: dask