【问题标题】:Airflow SparkSubmitOperator push value to xcom气流 SparkSubmitOperator 推送值到 xcom
【发布时间】:2019-06-10 15:30:35
【问题描述】:

在我的气流火花作业中,我需要将火花作业统计信息传递给工作流中的其他任务。如何将 SparkSubmitOperator 中的值推送到 xcom?

task1 = SparkSubmitOperator(
    task_id='spark_task',
    conn_id='spark_default',
    java_class='com.example',
    application='example.jar',
    name='spark-job',
    verbose=True,
    application_args=["10"],  
    conf={'master':'yarn'},
    dag=dag,
)


#pass value from task1 to task 2 via xcom

def somefunc(**kwargs):
    #pull value from task1
    kwargs["ti"].xcom_pull(task_ids='spark_task')

task2 = PythonOperator(task_id='task2',
                       python_callable=somefunc,
                       provide_context=True,
                       dag=dag)

【问题讨论】:

    标签: python pyspark pipeline airflow


    【解决方案1】:

    目前,SparkSubmitOperator/SparkSubmitHook 并非旨在将作业统计信息返回给 XCom。您可以轻松更新运算符以满足您的需求:

    from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
    
    class SparkSubmitOperatorXCom(SparkSubmitOperator):
    
        def execute(self, context):
            super().execute(context)
            return self._hook._driver_status
    

    然后可以初始化操作符,将execute方法的返回值发送给XCom:

    task1 = SparkSubmitOperatorXCom(
        do_xcom_push=True,
        ...
    )
    

    注意:在这种情况下,我们正在访问私有属性。这是 SparkSubmitHook 提供驱动程序状态的唯一方式。对于更复杂的工作统计信息,您必须实施自己的解决方案,因为钩子似乎不够灵活,无法为您提供一切。

    【讨论】:

    • 这个解决方案似乎不起作用。您是否成功通过此实现推送 XCom 消息?
    猜你喜欢
    • 2020-01-16
    • 1970-01-01
    • 1970-01-01
    • 2022-11-26
    • 2022-01-03
    • 1970-01-01
    • 2019-01-22
    • 1970-01-01
    相关资源
    最近更新 更多