【问题标题】:Python read Cassandra data into pandasPython 将 Cassandra 数据读入 pandas
【发布时间】:2017-05-05 23:11:07
【问题描述】:

将 Cassandra 数据读入 pandas 的正确且最快的方法是什么?现在我使用下面的代码,但是速度很慢……

import pandas as pd

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
    auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
session.row_factory = dict_factory

sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)

df = pd.DataFrame()

for row in session.execute(sql_query):
    df = df.append(pd.DataFrame(row, index=[0]))

df = df.reset_index(drop=True).fillna(pd.np.nan)

读取 1000 行需要 1 分钟,而我还有“多一点”... 如果我运行相同的查询,例如。在 DBeaver 中,我可以在一分钟内获得全部结果(约 40k 行)。

谢谢!!!

【问题讨论】:

  • 如果session.execute(sql_query) 的输出是一个字典列表,我会尝试df = pd.DataFrame(session.execute(sql_query)) 或在该列表的某些部分运行pd.DataFrame。将行逐一追加到数据帧是低效的。
  • session.execute(sql_query) 的结果是一个特殊的<cassandra.cluster.ResultSet at 0x1b4b61d0> 可迭代对象。它的行可以是元组、named_tuples 或字典。
  • 我明白了。不过,最好先将其转换为列表,例如lst=[]; for row in session...: lst.append(row),如果没有其他方法。然后连接结果:df = pd.concat(lst)。这样您就可以避免对 pd.DataFrame.append 进行昂贵的 4 万次调用。

标签: python pandas cassandra


【解决方案1】:

我在官方mailing list得到了答案(效果很好):

嗨,

尝试定义自己的 pandas 行工厂:

def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)

session.row_factory = pandas_factory
session.default_fetch_size = None

query = "SELECT ..."
rslt = session.execute(query, timeout=None)
df = rslt._current_rows

我就是这样做的——它应该更快......

如果您找到更快的方法 - 我有兴趣:)

迈克尔

【讨论】:

  • 这个应该标记为答案,它整洁、简洁、通用。
  • 即使对于陌生的 cassandra 类型也能发挥魅力
  • 适用于读取,但在写回数据时可能会出现问题,因为 pandas 必须猜测 dtypes。例如:我读取了一个包含许多空行的 int 列,pandas 猜测为 float,然后 CQL 在插入类似表时出错,因为该列的类型错误。
【解决方案2】:

我所做的(在 python 3 中)是:

query = "SELECT ..."
df = pd.DataFrame(list(session.execute(query)))

【讨论】:

    【解决方案3】:

    我一直致力于将数据从 Cassandra 移动到 mssql,并使用此处给出的答案作为参考,我能够移动数据,但我在 cassandra 中的源表很大,我的查询从 cassandra 收到超时错误,事情是我们不能增加超时,我只剩下在我的查询中批量选择行的选项,我的代码也将 cassandra 集合数据类型转换为 str,因为我想在 mssql 中插入这些数据类型然后解析它,请让我知道如果有人遇到类似的问题,我构建的代码如下:

    import sys
    import pandas as pd
    import petl as etl
    import pyodbc
    import sqlalchemy
    from cassandra.auth import PlainTextAuthProvider
    from cassandra.cluster import Cluster
    from sqlalchemy import *
    from cassandra.query import SimpleStatement
    
    
    def pandas_factory(colnames, rows):
        return pd.DataFrame(rows, columns=colnames)
        engine = sqlalchemy.create_engine('sql_server_connection string')
    
    cluster = Cluster(
        contact_points=['cassandra_host'], 
        auth_provider = PlainTextAuthProvider(username='username', password='passwrd')
    )
    
    session = cluster.connect('keyspace',wait_for_all_pools=True)
    
    session.row_factory = pandas_factory
    request_timeout = 60000
    query = "SELECT * FROM cassandratable"
    statement = SimpleStatement(query, fetch_size=5000) 
    rows = session.execute(statement)
    
    df = rows._current_rows
    df['attributes'] = df.attributes.astype(str)
    df['attributesgenerated'] = df.attributesgenerated.astype(str)
    df['components'] = df.components.astype(str)
    df['distributioncenterinfo'] = df.distributioncenterinfo.astype(str)
    df['images'] = df.images.astype(str)
    df['itemcustomerzonezoneproductids'] = 
    df.itemcustomerzonezoneproductids.astype(str)
    df['itempodconfigids'] = df.itempodconfigids.astype(str)
    df['keywords'] = df.keywords.astype(str)
    df['validationmessages'] = df.validationmessages.astype(str)
    df['zones'] = df.zones.astype(str)
    #error_bad_lines=False
    #print(df)
    df.to_sql(
               name='mssql_table_name',
               con=engine,
               index=False,
               if_exists='append',
               chunksize=1
             )
    

    【讨论】:

      【解决方案4】:

      通过页面自动迭代将 Cassandra 数据读入 pandas 的最快方法。创建字典并通过自动迭代所有页面来添加每个字典。然后,用这个字典创建数据框。

      import pandas as pd
      from cassandra.cluster import Cluster
      from cassandra.auth import PlainTextAuthProvider
      from cassandra.query import dict_factory
      
      auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
      cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
          auth_provider=auth_provider)
      
      session = cluster.connect(CASSANDRA_DB)
      session.row_factory = dict_factory
      
      sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)
      
      dictionary ={"column1":[],"column2":[]}
      
      for row in session.execute(sql_query):
          dictionary["column1"].append(row.column1)
          dictionary["column1"].append(row.column1)
      
      df = pd.DataFrame(dictionary)
      

      【讨论】:

        【解决方案5】:

        我使用 row_factory 解决方案几周,然后在尝试将数据帧写入另一个具有相同结构的表时遇到数据类型问题。 Pandas 猜测 float 数据类型是一个包含许多空字段的 int 列。在写入过程中,cassandra 驱动程序抱怨类型不匹配。

        TypeError: Received an argument of invalid type for column "frequency". Expected: <class 'cassandra.cqltypes.Int32Type'>, Got: <class 'float'>; (required argument is not an integer)
        

        Pandas int 列不支持 NaN 或 None,因此最好的选择可能是将该列设为 python 对象。

        一个快速的 hack 正在调整 pandas_factory 以避免 pandas 推断。不是一个理想的一揽子政策:

        def pandas_factory(colnames, rows):
            df = pd.DataFrame(rows, columns=colnames, dtype=object)
            return df
        

        我还发现如果我不想要行工厂,我可以这样做:df = pandas.DataFrame(result.all())

        作为一个临时解决方案,我希望有一个健壮的result_to_df() 函数,它使用result.column_types(例如:cassandra.cqltypes.Int32Type)并很好地猜测将它们转换为python 对象或numpy 类型。如果/当我有时间写这个答案时,我会编辑这个答案。 Pandas read_cqlto_cql 会很理想,但可能超出我的带宽。

        【讨论】:

          猜你喜欢
          • 2018-02-27
          • 2017-09-25
          • 1970-01-01
          • 2015-01-29
          • 1970-01-01
          • 1970-01-01
          • 2021-09-07
          • 2021-07-28
          • 2014-01-03
          相关资源
          最近更新 更多