【问题标题】:AttributeError: 'generator' object has no attribute 'to_sql' While creating datframe using generatorAttributeError:“生成器”对象没有属性“to_sql”使用生成器创建数据框时
【发布时间】:2018-05-01 15:54:20
【问题描述】:

我正在尝试从固定宽度文件创建一个 datafrmae 并加载到 postgresql 数据库中。我的输入文件非常大(~16GB)和 2000 万条记录。因此,如果我创建数据框,它会消耗大部分可用的 RAM。需要很长时间才能完成。所以我想到了使用 chunksize(使用 python 生成器)选项并将记录提交到表中。但它因'AttributeError: 'generator' object has no attribute 'to_sql' 错误而失败。

受此答案启发 https://stackoverflow.com/a/47257676/2799214

输入文件:test_file.txt

XOXOXOXOXOXO9
AOAOAOAOAOAO8
BOBOBOBOBOBO7
COCOCOCOCOCO6
DODODODODODO5
EOEOEOEOEOEO4
FOFOFOFOFOFO3
GOGOGOGOGOGO2
HOHOHOHOHOHO1

sample.py

import pandas.io.sql as psql
import pandas as pd
from sqlalchemy import create_engine

def chunck_generator(filename, header=False,chunk_size = 10 ** 5):
    for chunk in pd.read_fwf(filename, colspecs=[[0,12],[12,13]],index_col=False,header=None, iterator=True, chunksize=chunk_size):
        yield (chunk)

def _generator( engine, filename, header=False,chunk_size = 10 ** 5):
    chunk = chunck_generator(filename, header=False,chunk_size = 10 ** 5)
    chunk.to_sql('sample_table', engine, if_exists='replace', schema='sample_schema', index=False)
    yield row

if __name__ == "__main__":
    filename = r'test_file.txt'
    engine = create_engine('postgresql://ABCD:ABCD@ip:port/database')
    c = engine.connect()
    conn = c.connection
    generator = _generator(engine=engine, filename=filename)
    while True:
       print(next(generator))
    conn.close()

错误:

    chunk.to_sql('sample_table', engine, if_exists='replace', schema='sample_schema', index=False)
AttributeError: 'generator' object has no attribute 'to_sql'

我的主要目标是提高性能。请帮助我解决问题或建议更好的方法。提前致谢。

【问题讨论】:

  • chunck_generator 是一个没有to_sql() 方法的生成器对象。您可能需要使用current_chunk = next(chunk) 来获取块。另外,row 没有定义。
  • @TwistedSim 是的,我同意。无论如何我可以解决这个问题。我应该保留数据框属性。
  • 您希望在哪里定义这个to_sql 方法?当然不是在所有的生成器上,或者所有的可迭代对象上,或者你通过一个函数的 yielding 值创建的特定生成器?如果您也想调用 DataFrame 的方法,则必须在 DataFrame 上调用它,而不是在其他类型的对象上调用。

标签: python python-3.x pandas dataframe generator


【解决方案1】:

'chunck_generator' 将返回一个 'generator' 对象,而不是块的实际元素。您需要迭代对象以从中取出块。

>>> def my_generator(x):
...     for y in range(x):
...         yield y
...
>>> g = my_generator(10)
>>> print g.__class__
<type 'generator'>
>>> ele = next(g, None)
>>> print ele
0
>>> ele = next(g, None)
>>> print ele
1

所以要修复你的代码,你只需要循环生成器

for chunk in chunck_generator(filename, header=False,chunk_size = 10 ** 5):
    yield chunk.to_sql()

但这似乎令人费解。我会这样做:

import pandas.io.sql as psql
import pandas as pd
from sqlalchemy import create_engine

def sql_generator(engine, filename, header=False,chunk_size = 10 ** 5):
    frame = pd.read_fwf(
        filename, 
        colspecs=[[0,12],[12,13]],
        index_col=False,
        header=None, 
        iterator=True, 
        chunksize=chunk_size
    ):
   
    for chunk in frame:
        yield chunk.to_sql(
            'sample_table', 
            engine, 
            if_exists='replace', 
            schema='sample_schema', 
            index=False
        )


if __name__ == "__main__":
    filename = r'test_file.txt'
    engine = create_engine('postgresql://USEE:PWD@IP:PORT/DB')
    for sql in sql_generator(engine, filename):
        print sql

【讨论】:

  • 代码正在运行。我可以使用 del[df] 或抓取收集器释放每个块内存吗?
  • 它应该自行收集垃圾。
  • 代码运行良好,但 to_sql 运行速度很慢。插入 100k 条记录(100k 行,98 列所有文本类型列)花了 30 分钟。有什么见解吗?
  • 您不想分块——您可能想使用可以关闭事务的数据库加载实用程序。 Postgres 似乎提供了一个 COPY 功能来使大文件加载更有效。 dba.stackexchange.com/questions/151930/…
  • 谢谢。我找到了一种加载 980 万条记录的方法。我已经使用了您的部分代码和 psycopg2 包来加载数据。数据在 30 分钟内加载
【解决方案2】:

结论: to_sql 方法加载大文件效率不高。所以我在包 psycopg2 中使用了 copy_from 方法,并在创建数据帧时使用了 chunksize 选项。 在 30 分钟内加载了 980 万条记录(~17GB),每条记录 98 列。

我已经删除了我的实际文件的原始引用(我在原始帖子中使用示例文件)。

import pandas as pd
import psycopg2
import io

def sql_generator(cur,con, filename, boundries, col_names, header=False,chunk_size = 2000000):
    frame = pd.read_fwf(filename,colspecs=boundries,index_col=False,header=None,iterator=True,chunksize=chunk_size,names=col_names)
    for chunk in frame:
        output = io.StringIO()
        chunk.to_csv(output, sep='|', quoting=3, escapechar='\\' , index=False, header=False,encoding='utf-8')
        output.seek(0)
        cur.copy_from(output, 'sample_schema.sample_table', null="",sep="|")
        yield con.commit()

if __name__ == "__main__":
    boundries = [[0,12],[12,13]]
    col_names = ['col1','col2']
    filename = r'test_file.txt'  #Refer to sample file in the original post
    con = psycopg2.connect(database='database',user='username', password='pwd', host='ip', port='port')
    cur = con.cursor()
    for sql in sql_generator(cur,con, filename, boundries, col_names):
        print(sql)
    con.close()

【讨论】:

    【解决方案3】:

    我建议你这样:

    def _generator( engine, filename, ...):
        for chunk in pd.read_fwf(filename, ...):
            yield chunk.to_sql('sample_table', engine, ...)  # not sure about this since row was not define
    
    for row in _generator(engine=engine, filename=filename)
        print(row)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-08-30
      • 1970-01-01
      • 2023-04-10
      相关资源
      最近更新 更多