【问题标题】:GCP Composer 2 (Airflow 2) Data proc operators - pass package to PYSPARK_JOBGCP Composer 2 (Airflow 2) Data proc 运算符 - 将包传递给 PYSPARK_JOB
【发布时间】:2022-06-18 10:07:21
【问题描述】:

我正在使用 GCP Composer2 来安排 pyspark(结构化流)作业, pyspark 代码读取/写入 Kafka。

DAG 使用运算符 - DataprocCreateClusterOperator(创建 GKE 集群), DataprocSubmitJobOperator(运行 pyspark 作业),使用运算符 - DataprocSubmitJobOperator 删除 dataproc 集群。

在下面的代码中,我传递了运行读取/写入 Kafka 的 pyspark 代码所需的 jar 和文件(证书/配置文件)


PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {
        "main_python_file_uri": PYSPARK_URI,
        "jar_file_uris" : ["gs://dataproc-spark-jars/mongo-spark-connector_2.12-3.0.2.jar",
                               'gs://dataproc-spark-jars/bson-4.0.5.jar','gs://dataproc-spark-jars/mongo-spark-connector_2.12-3.0.2.jar','gs://dataproc-spark-jars/mongodb-driver-core-4.0.5.jar',
                               'gs://dataproc-spark-jars/mongodb-driver-sync-4.0.5.jar','gs://dataproc-spark-jars/spark-avro_2.12-3.1.2.jar','gs://dataproc-spark-jars/spark-bigquery-with-dependencies_2.12-0.23.2.jar',
                           'gs://dataproc-spark-jars/spark-token-provider-kafka-0-10_2.12-3.2.0.jar','gs://dataproc-spark-jars/htrace-core4-4.1.0-incubating.jar','gs://dataproc-spark-jars/hadoop-client-3.3.1.jar','gs://dataproc-spark-jars/spark-sql-kafka-0-10_2.12-3.2.0.jar','gs://dataproc-spark-jars/hadoop-client-runtime-3.3.1.jar','gs://dataproc-spark-jars/hadoop-client-3.3.1.jar','gs://dataproc-spark-configs/kafka-clients-3.2.0.jar'],
        "file_uris":['gs://kafka-certs/versa-kafka-gke-ca.p12','gs://kafka-certs/syslog-vani.p12',
                     'gs://kafka-certs/alarm-compression-user.p12','gs://kafka-certs/appstats-user.p12',
                     'gs://kafka-certs/insights-user.p12','gs://kafka-certs/intfutil-user.p12',
                     'gs://kafka-certs/reloadpred-chkpoint-user.p12','gs://kafka-certs/reloadpred-user.p12',
                     'gs://dataproc-spark-configs/topic-customer-map.cfg','gs://dataproc-spark-configs/params.cfg','gs://kafka-certs/issues-user.p12','gs://kafka-certs/anomaly-user.p12']
        }
}

path = "gs://dataproc-spark-configs/pip_install.sh"

CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
    project_id=PROJECT_ID,
    zone="us-east1-b",
    master_machine_type="n1-standard-4",
    worker_machine_type="n1-standard-4",
    num_workers=4,
    storage_bucket="dataproc-spark-logs",
    init_actions_uris=[path],
    metadata={'PIP_PACKAGES': 'pyyaml requests pandas openpyxl kafka-python'},
).make()

with models.DAG(
        'UsingComposer2',
        # Continue to run DAG twice per day
        default_args=default_dag_args,
        schedule_interval='0 0/12 * * *',
        catchup=False,
        ) as dag:


    create_dataproc_cluster = DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        cluster_name="composer2",
        region=REGION,
        cluster_config=CLUSTER_GENERATOR_CONFIG
    )

    run_dataproc_spark = DataprocSubmitJobOperator(
        task_id="run_dataproc_spark",
        job=PYSPARK_JOB,
        location=REGION,
        project_id=PROJECT_ID,
    )

    delete_dataproc_cluster = DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
        region=REGION
    )


create_dataproc_cluster >> run_dataproc_spark >> delete_dataproc_cluster


问题是 - 我如何为spark-kafka 单独传递包而不是罐子? 当我做 spark-submit - 我可以传递一个包时,我如何使用 Composer/Airflow 做同样的事情?

示例 spark-submit 命令,我在其中传递 spark-sql-kafka 和 mongo-spark-connector 包

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.mongodb.spark:mongo-spark-connector_2.12:3.0.2 /Users/karanalang/PycharmProjects/Kafka/StructuredStreaming-KafkaConsumer-insignts.py

蒂亚!

更新: 根据@Anjela B 的建议,尝试了以下方法,但它不起作用

更改为 PYSPARK_JOB,以传递包:

PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {
        "main_python_file_uri": PYSPARK_URI,
        "properties": { #you can use this field to pass other properties
            "org.apache.spark": "spark-sql-kafka-0-10_2.12:3.1.3",
            "org.mongodb.spark": "mongo-spark-connector_2.12:3.0.2"
        },
        "file_uris":['gs://kafka-certs/versa-kafka-gke-ca.p12','gs://kafka-certs/syslog-vani.p12',
                     'gs://kafka-certs/alarm-compression-user.p12','gs://kafka-certs/appstats-user.p12',
                     'gs://kafka-certs/insights-user.p12','gs://kafka-certs/intfutil-user.p12',
                     'gs://kafka-certs/reloadpred-chkpoint-user.p12','gs://kafka-certs/reloadpred-user.p12',
                     'gs://dataproc-spark-configs/topic-customer-map.cfg','gs://dataproc-spark-configs/params.cfg','gs://kafka-certs/issues-user.p12','gs://kafka-certs/anomaly-user.p12']
        }

错误:

22/06/17 22:57:28 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1655505629376_0004
22/06/17 22:57:29 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at versa-insights2-m/10.142.0.70:8030
22/06/17 22:57:30 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl: Ignoring exception of type GoogleJsonResponseException; verified object already exists with desired state.
Traceback (most recent call last):
  File "/tmp/8991c714-7036-45ff-b61b-ece54cfffc51/alarm_insights.py", line 442, in <module>
    sys.exit(main())
  File "/tmp/8991c714-7036-45ff-b61b-ece54cfffc51/alarm_insights.py", line 433, in main
    main_proc = insightGen()
  File "/tmp/8991c714-7036-45ff-b61b-ece54cfffc51/alarm_insights.py", line 99, in __init__
    self.all_DF = self.spark.read \
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 210, in load
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o63.load.
: java.lang.ClassNotFoundException: Failed to find data source: mongo. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:692)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:746)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:265)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: mongo.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:666)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:666)
    at scala.util.Failure.orElse(Try.scala:224)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:666)
    ... 14 more 

【问题讨论】:

  • 您使用什么命令手动提交作业?你要提交的包是.py吗?
  • 嗨@AnjelaB - 我要么在 UI 上手动启动作业,要么使用以下命令 -> ``` gcloud composer environment runversa-composer2 \ --location us-east1 dags trigger - - Versa-Alarm-Insights-UsingComposer2 \ --run-id=5077 ```。请注意 - 这仅用于测试,一旦开发完成,它将是一个预定的 dag 运行
  • 你要提交的包是.py吗?
  • @AnjelaB - 这些是我想要包含的包:org.mongodb.spark:mongo-spark-connector_2.12:3.0.2, org.apache.spark:spark-sql-kafka- 0-10_2.12:3.2.0(例如 spark-submit 命令:``` spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.mongodb .spark:mongo-spark-connector_2.12:3.0.2 /Users/karanalang/PycharmProjects/Kafka/StructuredStreaming-KafkaConsumer-insignts.py ```
  • @AnjelaB - 我已经用 spark-submit 命令更新了描述,当我使用 Airflow 运算符 DataprocSubmitJobOperator 运行命令时,我需要传递相同的包

标签: google-cloud-platform apache-kafka spark-structured-streaming google-cloud-composer airflow-2.x


【解决方案1】:

您可以使用以下代码来传递配置:

import datetime

from airflow import models
from airflow.operators import bash
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
PYSPARK_JOB = {
    "pyspark_job": {
      "main_python_file_uri": 
        "gs://<bucket>/20220606.py", #this field is for .py packages
      "properties": { #you can use this field to pass other properties
        "org.apache.spark": "spark-sql-kafka-0-10_2.12:3.2.0",
        "org.mongodb.spark": "mongo-spark-connector_2.12:3.0.2"
      },
      "python_file_uris": ["gs://<bucket>/20220606.py"]
    },
    "reference": {
      "project_id": "<project_id>"
    },
    "placement": {
      "cluster_name": "<cluster_name>"
    }
  }


REGION = "us-central1"
PROJECT_ID = "<project_id>"

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with models.DAG(
        'composer_quickstart',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')
    
    
    run_dataproc_spark = DataprocSubmitJobOperator(
        task_id="run_dataproc_spark",
        job=PYSPARK_JOB,
        location=REGION,
        project_id=PROJECT_ID,
    )
    print_dag_run_conf >> run_dataproc_spark

我关注了这个PySpark Job Documentation 以了解使用哪个字段来传递所需的包。

AirFlow DAG 日志:

*** Reading remote log from gs://us-central1-case-20220331-fde8f6be-bucket/logs/composer_quickstart/run_dataproc_spark/2022-06-06T06:53:24.637504+00:00/1.log.
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1033} INFO - Dependencies all met for <TaskInstance: composer_quickstart.run_dataproc_spark manual__2022-06-06T06:53:24.637504+00:00 [queued]>
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1033} INFO - Dependencies all met for <TaskInstance: composer_quickstart.run_dataproc_spark manual__2022-06-06T06:53:24.637504+00:00 [queued]>
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1239} INFO - 
--------------------------------------------------------------------------------
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1240} INFO - Starting attempt 1 of 2
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1241} INFO - 
--------------------------------------------------------------------------------
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1260} INFO - Executing <Task(DataprocSubmitJobOperator): run_dataproc_spark> on 2022-06-06 06:53:24.637504+00:00
[2022-06-06, 06:53:39 UTC] {standard_task_runner.py:52} INFO - Started process 65510 to run task
[2022-06-06, 06:53:39 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'composer_quickstart', 'run_dataproc_spark', 'manual__2022-06-06T06:53:24.637504+00:00', '--job-id', '21439', '--raw', '--subdir', 'DAGS_FOLDER/20220606_1.py', '--cfg-path', '/tmp/tmp7p1eyqqm', '--error-file', '/tmp/tmpdr2m4rwe']
[2022-06-06, 06:53:39 UTC] {standard_task_runner.py:77} INFO - Job 21439: Subtask run_dataproc_spark
[2022-06-06, 06:53:41 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: composer_quickstart.run_dataproc_spark manual__2022-06-06T06:53:24.637504+00:00 [running]> on host airflow-worker-7b5f8fc749-pd8f9
[2022-06-06, 06:53:44 UTC] {taskinstance.py:1426} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=
AIRFLOW_CTX_DAG_OWNER=Composer Example
AIRFLOW_CTX_DAG_ID=composer_quickstart
AIRFLOW_CTX_TASK_ID=run_dataproc_spark
AIRFLOW_CTX_EXECUTION_DATE=2022-06-06T06:53:24.637504+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-06-06T06:53:24.637504+00:00
[2022-06-06, 06:53:44 UTC] {dataproc.py:1878} INFO - Submitting job
[2022-06-06, 06:53:44 UTC] {credentials_provider.py:312} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-06-06, 06:53:45 UTC] {dataproc.py:1890} INFO - Job e7e800e7-fbfd-45e0-8021-eca4e2a7a377 submitted successfully.
[2022-06-06, 06:53:45 UTC] {dataproc.py:1903} INFO - Waiting for job e7e800e7-fbfd-45e0-8021-eca4e2a7a377 to complete
[2022-06-06, 06:54:16 UTC] {dataproc.py:1907} INFO - Job e7e800e7-fbfd-45e0-8021-eca4e2a7a377 completed successfully.
[2022-06-06, 06:54:16 UTC] {taskinstance.py:1268} INFO - Marking task as SUCCESS. dag_id=composer_quickstart, task_id=run_dataproc_spark, execution_date=20220606T065324, start_date=20220606T065339, end_date=20220606T065416
[2022-06-06, 06:54:16 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-06-06, 06:54:16 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check

提交的工作:

【讨论】:

  • thnx,将检查并更新
  • 我试过你提到的方法,但它似乎不起作用..我已经更新了描述中的错误
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-05-13
  • 2016-06-26
  • 2018-10-03
  • 2021-10-21
  • 1970-01-01
  • 2019-05-02
  • 2020-04-08
相关资源
最近更新 更多