【问题标题】:Writing data into snowflake using Python使用 Python 将数据写入雪花
【发布时间】:2020-08-27 12:50:19
【问题描述】:

我们可以直接将数据写入雪花表而不使用Python使用雪花内部阶段吗????

先在stage中写入,然后转换,然后加载到表中,这似乎是辅助任务。是否可以像RDBMS中的JDBC连接一样一步完成。

【问题讨论】:

    标签: python-3.x snowflake-cloud-data-platform


    【解决方案1】:

    将数据加载到 Snowflake 中绝对最快的方法是从内部或外部阶段的文件中加载。时期。所有连接器都能够使用标准插入命令插入数据,但这不会很好。也就是说,许多 Snowflake 驱动程序现在透明地使用 PUT/COPY 命令通过内部阶段将大数据加载到 Snowflake。如果这是您所追求的,那么您可以利用 pandas write_pandas 命令在单个命令中将数据从 pandas 数据帧加载到 Snowflake。在幕后,它将为您执行 PUT 和 COPY INTO。

    https://docs.snowflake.com/en/user-guide/python-connector-api.html#label-python-connector-api-write-pandas

    我强烈推荐这种模式,而不是任何驱动程序中的 INSERT 命令。而且我还建议在加载到雪花之后进行转换,而不是之前。

    【讨论】:

    • 另外,查看write_pandasparallelchunk_size 参数。调整这些可能会对总加载时间产生巨大影响。
    【解决方案2】:

    如果有人在处理大型数据集时遇到问题。尝试使用 dask 并生成划分为块的数据帧。然后您可以将 dask.delayed 与 sqlalchemy 一起使用。在这里,我们使用雪花的本机连接器方法,即 pd_writer,它在后台使用 write_pandas 并最终使用 PUT COPY 和压缩的 parquet 文件。最后,老实说,它归结为您的 I/O 带宽。您拥有的吞吐量越多,它在雪花表中加载的速度就越快。但是这个 sn-p 总体上提供了相当多的并行度。

    import functools
    from dask.diagnostics import ProgressBar
    from snowflake.connector.pandas_tools import pd_writer
    import dask.dataframe as dd
    df = dd.read_csv(csv_file_path, blocksize='64MB')
    ddf_delayed = df.to_sql(
            table_name.lower(),
            uri=str(engine.url),
            schema=schema_name,
            if_exists=if_exists,
            index=False,
            method=functools.partial(
            pd_writer,quote_identifiers=False),
            compute=False,
            parallel=True
        )
    with ProgressBar():
        dask.compute(ddf_delayed, scheduler='threads', retries=3)
    

    【讨论】:

    • 感谢您的回复 ....但它再次使用舞台仅加载数据
    • 是的,但这就是雪花推荐的方法,因为这是将数据放入雪花表的最快方法。但是当然可以像 pandas 一样使用 sqlalchemy 和 dask to_sql 方法。
    【解决方案3】:

    Java:

    加载驱动类:

    Class.forName("net.snowflake.client.jdbc.SnowflakeDriver")

    Maven:

    添加以下代码块作为依赖项

    <dependency>
      <groupId>net.snowflake</groupId>
      <artifactId>snowflake-jdbc</artifactId>
      <version>{version}</version>
    

    春天:

    application.yml:

        spring:
          datasource
            hikari:
              maximumPoolSize: 4 # Specify maximum pool size
              minimumIdle: 1 # Specify minimum pool size
              driver-class-name: com.snowflake.client.jdbc.SnowflakeDriver
    

    Python:

        import pyodbc
    
        # pyodbc connection string
        conn = pyodbc.connect("Driver={SnowflakeDSIIDriver}; Server=XXX.us-east-2.snowflakecomputing.com; Database=VAQUARKHAN_DB; schema=public; UID=username; PWD=password")
    
        #  Cursor
        cus=conn.cursor()
    
        # Execute SQL statement to get current datetime and store result in cursor
        cus.execute("select current_date;")
    
        # Display the content of cursor
        row = cus.fetchone()
    
        print(row)
    

    Apache Spark:

        <dependency>
             <groupId>net.snowflake</groupId>
             <artifactId>spark-snowflake_2.11</artifactId>
             <version>2.5.9-spark_2.4</version>
        </dependency>
    

    代码

       import org.apache.spark.sql.DataFrame
       import org.apache.spark.sql.DataFrame
    
    
            / Use secrets DBUtil to get Snowflake credentials.
            val user = dbutils.secrets.get("data-warehouse", "<snowflake-user>")
            val password = dbutils.secrets.get("data-warehouse", "<snowflake-password>")
    
            val options = Map(
              "sfUrl" -> "<snowflake-url>",
              "sfUser" -> user,
              "sfPassword" -> password,
              "sfDatabase" -> "<snowflake-database>",
              "sfSchema" -> "<snowflake-schema>",
              "sfWarehouse" -> "<snowflake-cluster>"
            )
    
    
            // Generate a simple dataset containing five values and write the dataset to Snowflake.
            spark.range(5).write
              .format("snowflake")
              .options(options)
              .option("dbtable", "<snowflake-database>")
              .save()
    
    
            // Read the data written by the previous cell back.
            val df: DataFrame = spark.read
              .format("snowflake")
              .options(options)
              .option("dbtable", "<snowflake-database>")
              .load()
    
            display(df)
    

    将数据加载到 Snowflake 中的最快方法是从文件中


    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-08-25
      • 1970-01-01
      • 2019-03-27
      • 1970-01-01
      • 1970-01-01
      • 2020-09-29
      • 1970-01-01
      相关资源
      最近更新 更多