【问题标题】:Python: How to Connect to Snowflake Using Apache Beam?Python:如何使用 Apache Beam 连接到雪花?
【发布时间】:2020-05-14 13:46:48
【问题描述】:

我看到 BigQuery 有一个内置的 I/O 连接器,但我们的很多数据都存储在 Snowflake 中。是否有连接到 Snowflake 的解决方法?我唯一能想到的就是使用 sqlalchemy 运行查询,然后将输出转储到 Cloud Storage Buckets,然后 Apache-Beam 可以从存储在 Bucket 中的文件中获取输入数据。

【问题讨论】:

    标签: python google-cloud-dataflow pipeline apache-beam snowflake-cloud-data-platform


    【解决方案1】:

    最近向 Beam 添加了 Snowflake Python 和 Java 连接器。

    目前(2.24 版)它仅支持 apache_beam.io.external.snowflake 中的 ReadFromSnowflake 操作。

    在 2.25 版本中,WriteToSnowflake 也将在 apache_beam.io.snowflake 模块中提供。您仍然可以使用旧路径,但在此版本中将被视为已弃用。

    目前它仅在 Flink Runner 上运行,但正在努力使其也可用于其他运行器。

    此外,它是一种跨语言转换,因此可能需要一些额外的设置 - 它在此处的 pydoc 中有很好的记录(我将其粘贴在下面): https://github.com/apache/beam/blob/release-2.24.0/sdks/python/apache_beam/io/external/snowflake.py

    Snowflake transforms tested against Flink portable runner.
      **Setup**
      Transforms provided in this module are cross-language transforms
      implemented in the Beam Java SDK. During the pipeline construction, Python SDK
      will connect to a Java expansion service to expand these transforms.
      To facilitate this, a small amount of setup is needed before using these
      transforms in a Beam Python pipeline.
      There are several ways to setup cross-language Snowflake transforms.
      * Option 1: use the default expansion service
      * Option 2: specify a custom expansion service
      See below for details regarding each of these options.
      *Option 1: Use the default expansion service*
      This is the recommended and easiest setup option for using Python Snowflake
      transforms.This option requires following pre-requisites
      before running the Beam pipeline.
      * Install Java runtime in the computer from where the pipeline is constructed
        and make sure that 'java' command is available.
      In this option, Python SDK will either download (for released Beam version) or
      build (when running from a Beam Git clone) a expansion service jar and use
      that to expand transforms. Currently Snowflake transforms use the
      'beam-sdks-java-io-expansion-service' jar for this purpose.
      *Option 2: specify a custom expansion service*
      In this option, you startup your own expansion service and provide that as
      a parameter when using the transforms provided in this module.
      This option requires following pre-requisites before running the Beam
      pipeline.
      * Startup your own expansion service.
      * Update your pipeline to provide the expansion service address when
        initiating Snowflake transforms provided in this module.
      Flink Users can use the built-in Expansion Service of the Flink Runner's
      Job Server. If you start Flink's Job Server, the expansion service will be
      started on port 8097. For a different address, please set the
      expansion_service parameter.
      **More information**
      For more information regarding cross-language transforms see:
      - https://beam.apache.org/roadmap/portability/
      For more information specific to Flink runner see:
      - https://beam.apache.org/documentation/runners/flink/
    

    Snowflake(与大多数便携式 IO 一样)有自己的 java 扩展服务,当您没有指定自己的自定义服务时,它应该会自动下载。我认为不需要它,但我只是为了安全起见才提到它。您可以下载 jar 并使用 java -jar <PATH_TO_JAR> <PORT> 启动它,然后将其传递给 snowflake.ReadFromSnowflake 作为 expansion_service='localhost:<PORT>'。 2.24版本链接:https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-snowflake-expansion-service/2.24.0

    请注意,它仍处于试验阶段,请随时报告有关 Beam Jira 的问题。

    【讨论】:

    • 很高兴听到这个消息!你一直在玩/为它做贡献吗?谢谢
    • 是的,我参与了跨语言工作:)
    【解决方案2】:

    谷歌云支持在这里!

    没有从 Snowflake 到 Cloud Dataflow 的直接连接器,但您提到的一种解决方法是。首先将输出转储到 Cloud Storage,然后将 Cloud Storage 连接到 Cloud Dataflow。

    希望对你有帮助。

    【讨论】:

    • 谢谢亚历杭德罗。您是否建议使用托管在 Kubernetes Engine + Cloud Scheduler 上的简单 python 脚本来促进将数据转储到云存储,还是应该使用 Pyspark + DataProc?
    【解决方案3】:

    对于正在寻找有关如何开始使用 Snowflake 和 Apache Beam 的教程的未来人们,我可以推荐以下由连接器创建者制作的教程。

    https://www.polidea.com/blog/snowflake-and-apache-beam-on-google-dataflow/

    【讨论】: