【问题标题】:Not able to connect to Snowflake from EMR Cluster using Pyspark using airflow emr operator无法使用 Pyspark 使用气流 emr 运算符从 EMR 集群连接到雪花
【发布时间】:2021-02-20 20:17:35
【问题描述】:

我正在尝试从气流 EMR 操作员启动的 EMR 集群连接到雪花,但出现以下错误

py4j.protocol.Py4JJavaError: 调用时出错 o147.load。 : java.lang.ClassNotFoundException: 找不到数据 来源:net.snowflake.spark.snowflake。请在以下位置找到包裹 http://spark.apache.org/third-party-projects.html

这些是我添加到我的 EMRaddsteps 操作符以运行脚本 load_updates.py 的步骤,我在“Args”中描述了我的雪花包

STEPS = [
    {
        "Name" : "convo_facts",
        "ActionOnFailure" : "TERMINATE_CLUSTER",
        "HadoopJarStep" : {
            "Jar" : "command-runner.jar",
            "Args" : ["spark-submit", "s3://dev-data-lake/spark_files/cf/load_updates.py", \
                      "--packages net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4", \
                      "INPUT=s3://dev-data-lake/table_exports/public/", \
                      "OUTPUT=s3://dev-data-lake/emr_output/cf/"]
        }
    }
]

JOB_FLOW_OVERRIDES = {
    'Name' : 'cftest',
    'LogUri' : 's3://dev-data-lake/emr_logs/cf/log.txt',
    'ReleaseLabel' : 'emr-5.32.0',
    'Instances' : {
        'InstanceGroups' : [
            {
                'Name' : 'Master nodes',
                'Market' : 'ON_DEMAND',
                'InstanceRole' : 'MASTER',
                'InstanceType' : 'r6g.4xlarge',
                'InstanceCount' : 1,
            },
            {
                'Name' : 'Slave nodes',
                'Market' : 'ON_DEMAND',
                'InstanceRole' : 'CORE',
                'InstanceType' : 'r6g.4xlarge',
                'InstanceCount' : 3,
            }
        ],
        'KeepJobFlowAliveWhenNoSteps' : True,
        'TerminationProtected' : False
    },
    'Applications' : [{
        'Name' : 'Spark'
    }],
    'JobFlowRole' : 'EMR_EC2_DefaultRole',
    'ServiceRole' : 'EMR_DefaultRole'
}

而且,这就是我在 load_updates.py 脚本中添加雪花凭据以提取到 pyspark 数据帧中的方式。

# Set options below
sfOptions = {
  "sfURL" : "xxxx.us-east-1.snowflakecomputing.com",
  "sfUser" : "user",
  "sfPassword" : "xxxx",
  "sfDatabase" : "",
  "sfSchema" : "PUBLIC",
  "sfWarehouse" : ""
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

query_sql = """select * from cf""";

messages_new = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query", query_sql) \
  .load()

不确定我是否在这里遗漏了什么或者我在哪里做错了。

【问题讨论】:

    标签: amazon-web-services apache-spark pyspark airflow amazon-emr


    【解决方案1】:

    在 spark-submit 命令中,选项 --package 应该放在 s3://.../load_updates.py 之前。否则,将被视为应用程序参数。

    试试这个:

    STEPS = [
        {
            "Name": "convo_facts",
            "ActionOnFailure": "TERMINATE_CLUSTER",
            "HadoopJarStep": {
                "Jar": "command-runner.jar",
                "Args": [
                    "spark-submit",
                    "--packages",
                    "net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4",
                    "s3://dev-data-lake/spark_files/cf/load_updates.py",
                    "INPUT=s3://dev-data-lake/table_exports/public/",
                    "OUTPUT=s3://dev-data-lake/emr_output/cf/"
                ]
            }
        }
    ]
    

    【讨论】:

    • 您的回答有效,非常感谢您。我还将数据从 EMR 写入雪花表,我可以看到这种情况正在发生,但是从 EMR 到雪花或 s3 的数据传输也需要将近 1 个小时。想知道您是否遇到过类似的情况,如果遇到过,您是如何面对的?计算部分几乎不需要 3-5 分钟。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-30
    • 2017-04-16
    • 1970-01-01
    • 2022-10-14
    • 1970-01-01
    • 2016-09-15
    相关资源
    最近更新 更多