【问题标题】:How to create a Spark data frame using snow flake connection in python?如何在 python 中使用雪花连接创建 Spark 数据框?
【发布时间】:2024-05-19 19:05:02
【问题描述】:

我是 Spark 和 Python 的新手,我有一个存储在 python 变量中的 sql,我们使用 SnowFlake 数据库。如何使用带有雪花连接的 SQL 创建 Spark 数据?

    import sf_connectivity (we have a code for establishing connection with Snowflake database)
    emp = 'Select * From Employee'
    snowflake_connection = sf_connectivity.collector() (It is a method to establish snowflake conenction)
    requirement 1: Create Spark Dataframe (sf_df) using 'emp' and 'snowflake_connection '
    requirement 2: sf_df.createOrReplaceTempView(Temp_Employee)

它需要哪些包或库?我怎样才能做到这一点?

【问题讨论】:

标签: dataframe apache-spark pyspark apache-spark-sql snowflake-cloud-data-platform


【解决方案1】:

帮助我解决这个问题的文档在这里:https://docs.databricks.com/data/data-sources/snowflake.html

我花了一段时间才弄清楚如何让它工作!在问了很多问题后,我让我公司的 IT 部门配置了一个带有私钥/公钥身份验证的雪花用户帐户,他们将该 ID 配置为可在我们的公司 Databricks 帐户中访问。

设置完成后,以下代码是一个示例,如何将 sql 命令作为变量传递给 Spark,并让 Spark 将其转换为数据帧。

optionsSource = dict(sfUrl="mycompany.east-us-2.azure.snowflakecomputing.com", # Snowflake Account Name
                          sfUser="my_service_acct",
                          pem_private_key=dbutils.secrets.get("my_scope", "my_secret"),
                   sfDatabase="mydatabase", # Snowflake Database
                   sfSchema="myschema", # Snowflake Schema
                   sfWarehouse="mywarehouse",
                   sfRole="myrole"
                        )   

    sqlcmd = '''
    select current_date;
    '''

    df = spark.read.format("snowflake").options(**optionsSource).option("query", sqlcmd).load()
    display(df)

【讨论】:

    【解决方案2】:

    使用公钥/私钥,您需要生成证书 https://community.snowflake.com/s/article/How-to-connect-snowflake-with-Spark-connector-using-Public-Private-Key 然后你就可以使用下面的代码了。

    from pyspark.sql import SQLContext
    from pyspark import SparkConf, SparkContext
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives import serialization
    import re
    import os
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
        .builder \
        .config("spark.jars", "<path/to/>/snowflake-jdbc-<version>.jar,<path/to/>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
        .config("spark.repl.local.jars",
                "<path/to/>/snowflake-jdbc-<version>.jar,<path/to/>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
        .config("spark.sql.catalogImplementation", "in-memory") \
        .getOrCreate()
    
    spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(
        spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
    
    with open("<path/to/>/rsa_key.p8", "rb") as key_file:
        p_key = serialization.load_pem_private_key(
            key_file.read(),
            password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
            backend=default_backend()
        )
    
    pkb = p_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption()
    )
    pkb = pkb.decode("UTF-8")
    pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n", "", pkb).replace("\n", "")
    
    
    sfOptions = {
        "sfURL": "<URL>",
        "sfAccount": "<ACCOUNTNAME>",
        "sfUser": "<USER_NAME",
        "pem_private_key": pkb,
        # "sfPassword": "xxxxxxxxx",
        "sfDatabase": "<DBNAME>",
        "sfSchema": "<SCHEMA_NAME>",
        "sfWarehouse": "<WH_NAME>",
        "sfRole": "<ROLENAME>",
    }
    
    SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
    
    df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
        .options(**sfOptions) \
        .option("query", "<TABLENAME>") \
        .load()
    
    df.show()
    

    【讨论】: