【问题标题】:Redshift COPY operation doesn't work in SQLAlchemyRedshift COPY 操作在 SQLAlchemy 中不起作用
【发布时间】:2015-04-01 00:05:58
【问题描述】:

我正在尝试在 SQLAlchemy 中进行 Redshift COPY。

当我在 psql 中执行以下 SQL 时,它会正确地将对象从我的 S3 存储桶复制到我的 Redshift 表中:

COPY posts FROM 's3://mybucket/the/key/prefix' 
WITH CREDENTIALS 'aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' 
JSON AS 'auto';

我有几个名为

的文件
s3://mybucket/the/key/prefix.001.json
s3://mybucket/the/key/prefix.002.json   
etc.

我可以使用select count(*) from posts 验证新行是否已添加到表中。

但是,当我在 SQLAlchemy 中执行完全相同的 SQL 表达式时,执行完成且没有错误,但没有任何行添加到我的表中。

session = get_redshift_session()
session.bind.execute("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey'    JSON AS 'auto';")
session.commit()

我做上面的没关系

from sqlalchemy.sql import text 
session = get_redshift_session()
session.execute(text("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey'    JSON AS 'auto';"))
session.commit()

【问题讨论】:

    标签: python sqlalchemy amazon-redshift


    【解决方案1】:

    我已成功使用核心表达式语言和Connection.execute()(与 ORM 和会话相反)通过以下代码将分隔文件复制到 Redshift。也许您可以将其调整为 JSON。

    def copy_s3_to_redshift(conn, s3path, table, aws_access_key, aws_secret_key, delim='\t', uncompress='auto', ignoreheader=None):
        """Copy a TSV file from S3 into redshift.
    
        Note the CSV option is not used, so quotes and escapes are ignored.  Empty fields are loaded as null.
        Does not commit a transaction.
        :param Connection conn: SQLAlchemy Connection
        :param str uncompress: None, 'gzip', 'lzop', or 'auto' to autodetect from `s3path` extension.
        :param int ignoreheader: Ignore this many initial rows.
        :return: Whatever a copy command returns.
        """
        if uncompress == 'auto':
            uncompress = 'gzip' if s3path.endswith('.gz') else 'lzop' if s3path.endswith('.lzo') else None
    
        copy = text("""
            copy "{table}"
            from :s3path
            credentials 'aws_access_key_id={aws_access_key};aws_secret_access_key={aws_secret_key}'
            delimiter :delim
            emptyasnull
            ignoreheader :ignoreheader
            compupdate on
            comprows 1000000
            {uncompress};
            """.format(uncompress=uncompress or '', table=text(table), aws_access_key=aws_access_key, aws_secret_key=aws_secret_key))    # copy command doesn't like table name or keys single-quoted
        return conn.execute(copy, s3path=s3path, delim=delim, ignoreheader=ignoreheader or 0)
    

    【讨论】:

      【解决方案2】:

      我基本上也有同样的问题,但在我的情况下它更多:

      engine = create_engine('...')
      engine.execute(text("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey'    JSON AS 'auto';"))
      

      通过单步执行 pdb,问题显然是缺少 .commit() 被调用。我不知道为什么 session.commit() 在您的情况下不起作用(可能会话“丢失跟踪”已发送命令?)所以它实际上可能无法解决您的问题。

      无论如何,就像explained in the sqlalchemy docs

      鉴于此要求,SQLAlchemy 实现了自己的“自动提交”功能,该功能在所有后端完全一致。这是通过检测表示数据更改操作的语句来实现的,即 INSERT、UPDATE、DELETE [...] 如果语句是纯文本语句并且未设置标志,则使用正则表达式来检测 INSERT、UPDATE 、DELETE 以及用于特定后端的各种其他命令。

      所以,有两种解决方案:

      • text("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' JSON AS 'auto';").execution_options(autocommit=True).
      • 或者,获得一个固定版本的红移方言...我只是 opened a PR 讨论它

      【讨论】:

        【解决方案3】:

        在对我有用的副本末尾添加一个提交:

        <your copy sql>;commit;
        

        【讨论】:

          猜你喜欢
          • 2019-02-24
          • 2017-11-19
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2017-06-04
          • 1970-01-01
          相关资源
          最近更新 更多