【发布时间】:2020-08-27 12:50:19
【问题描述】:
我们可以直接将数据写入雪花表而不使用Python使用雪花内部阶段吗????
先在stage中写入,然后转换,然后加载到表中,这似乎是辅助任务。是否可以像RDBMS中的JDBC连接一样一步完成。
【问题讨论】:
标签: python-3.x snowflake-cloud-data-platform
我们可以直接将数据写入雪花表而不使用Python使用雪花内部阶段吗????
先在stage中写入,然后转换,然后加载到表中,这似乎是辅助任务。是否可以像RDBMS中的JDBC连接一样一步完成。
【问题讨论】:
标签: python-3.x snowflake-cloud-data-platform
将数据加载到 Snowflake 中绝对最快的方法是从内部或外部阶段的文件中加载。时期。所有连接器都能够使用标准插入命令插入数据,但这不会很好。也就是说,许多 Snowflake 驱动程序现在透明地使用 PUT/COPY 命令通过内部阶段将大数据加载到 Snowflake。如果这是您所追求的,那么您可以利用 pandas write_pandas 命令在单个命令中将数据从 pandas 数据帧加载到 Snowflake。在幕后,它将为您执行 PUT 和 COPY INTO。
我强烈推荐这种模式,而不是任何驱动程序中的 INSERT 命令。而且我还建议在加载到雪花之后进行转换,而不是之前。
【讨论】:
write_pandas 的parallel 和chunk_size 参数。调整这些可能会对总加载时间产生巨大影响。
如果有人在处理大型数据集时遇到问题。尝试使用 dask 并生成划分为块的数据帧。然后您可以将 dask.delayed 与 sqlalchemy 一起使用。在这里,我们使用雪花的本机连接器方法,即 pd_writer,它在后台使用 write_pandas 并最终使用 PUT COPY 和压缩的 parquet 文件。最后,老实说,它归结为您的 I/O 带宽。您拥有的吞吐量越多,它在雪花表中加载的速度就越快。但是这个 sn-p 总体上提供了相当多的并行度。
import functools
from dask.diagnostics import ProgressBar
from snowflake.connector.pandas_tools import pd_writer
import dask.dataframe as dd
df = dd.read_csv(csv_file_path, blocksize='64MB')
ddf_delayed = df.to_sql(
table_name.lower(),
uri=str(engine.url),
schema=schema_name,
if_exists=if_exists,
index=False,
method=functools.partial(
pd_writer,quote_identifiers=False),
compute=False,
parallel=True
)
with ProgressBar():
dask.compute(ddf_delayed, scheduler='threads', retries=3)
【讨论】:
Java:
加载驱动类:
Class.forName("net.snowflake.client.jdbc.SnowflakeDriver")
Maven:
添加以下代码块作为依赖项
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>{version}</version>
春天:
application.yml:
spring:
datasource
hikari:
maximumPoolSize: 4 # Specify maximum pool size
minimumIdle: 1 # Specify minimum pool size
driver-class-name: com.snowflake.client.jdbc.SnowflakeDriver
Python:
import pyodbc
# pyodbc connection string
conn = pyodbc.connect("Driver={SnowflakeDSIIDriver}; Server=XXX.us-east-2.snowflakecomputing.com; Database=VAQUARKHAN_DB; schema=public; UID=username; PWD=password")
# Cursor
cus=conn.cursor()
# Execute SQL statement to get current datetime and store result in cursor
cus.execute("select current_date;")
# Display the content of cursor
row = cus.fetchone()
print(row)
Apache Spark:
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>spark-snowflake_2.11</artifactId>
<version>2.5.9-spark_2.4</version>
</dependency>
代码
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.DataFrame
/ Use secrets DBUtil to get Snowflake credentials.
val user = dbutils.secrets.get("data-warehouse", "<snowflake-user>")
val password = dbutils.secrets.get("data-warehouse", "<snowflake-password>")
val options = Map(
"sfUrl" -> "<snowflake-url>",
"sfUser" -> user,
"sfPassword" -> password,
"sfDatabase" -> "<snowflake-database>",
"sfSchema" -> "<snowflake-schema>",
"sfWarehouse" -> "<snowflake-cluster>"
)
// Generate a simple dataset containing five values and write the dataset to Snowflake.
spark.range(5).write
.format("snowflake")
.options(options)
.option("dbtable", "<snowflake-database>")
.save()
// Read the data written by the previous cell back.
val df: DataFrame = spark.read
.format("snowflake")
.options(options)
.option("dbtable", "<snowflake-database>")
.load()
display(df)
将数据加载到 Snowflake 中的最快方法是从文件中
【讨论】: