【问题标题】:Transmitting data between components in Airflow在 Airflow 中的组件之间传输数据
【发布时间】:2019-04-05 09:01:57
【问题描述】:

我对 Airflow 非常陌生,并且已经阅读了它的大部分文档。从文档中,我了解到可以使用 XCom 类共享 DAG 中组件之间的小数据。 DAG 中发布数据的组件必须推送,订阅数据的组件必须拉取。

但是,我对推拉的语法部分不是很清楚。我指的是documentation 上关于 XCom 的部分,并开发了一个代码模板。假设我有以下代码,它只有两个组件,一个推杆和一个拉杆。 pusher 发布 puller 必须消耗的当前时间并将其写入日志文件。

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

log_file_location = '/usr/local/airflow/logs/time_log.log'

default_args = {'owner':'apache'}
dag = DAG('pushpull', default_args = default_args)

def push_function():
    #push this data on the DAG as key-value pair
    return(datetime.now()) #current time

def pull_function():
    with open(log_file_location, 'a') as logfile:
        current_time = '' #pull data from the pusher as key - value pair
        logfile.writelines('current time = '+current_time)
    logfile.close()

with dag:
    t1 = PythonOperator(
        task_id = 'pusher', 
        python_callable = push_function)

    t2 = PythonOperator(
        task_id = 'puller', 
        python_callable = pull_function)

    t2.set_upstream(t1)

我在这里需要 Airflow 大师的帮助,有两种语法:

  1. 如何从推送功能中推送数据以及按键
  2. 如何获取 pull 函数使用 key 拉取数据。

谢谢!

【问题讨论】:

    标签: python publish-subscribe airflow


    【解决方案1】:

    使用密钥推送到 Xcom 的示例:

    def push_function(**context):
        msg='the_message'
        print("message to push: '%s'" % msg)
        task_instance = context['task_instance']
        task_instance.xcom_push(key="the_message", value=msg)
    

    使用密钥拉取 Xcom 的示例:

    def pull_function(**kwargs):
        ti = kwargs['ti']
        msg = ti.xcom_pull(task_ids='push_task',key='the_message')
        print("received message: '%s'" % msg)
    

    DAG 示例:

    from datetime import datetime, timedelta
    from airflow.models import DAG
    from airflow.operators.python_operator import PythonOperator
    
    DAG = DAG(
      dag_id='simple_xcom',
      start_date=datetime(2017, 10, 26),
      schedule_interval=timedelta(1)
    )
    
    def push_function(**context):
        msg='the_message'
        print("message to push: '%s'" % msg)
        task_instance = context['task_instance']
        task_instance.xcom_push(key="the_message", value=msg)
    
    push_task = PythonOperator(
        task_id='push_task', 
        python_callable=push_function,
        provide_context=True,
        dag=DAG)
    
    def pull_function(**kwargs):
        ti = kwargs['ti']
        msg = ti.xcom_pull(task_ids='push_task',key='the_message')
        print("received message: '%s'" % msg)
    
    pull_task = PythonOperator(
        task_id='pull_task', 
        python_callable=pull_function,
        provide_context=True,
        dag=DAG)
    
    push_task >> pull_task
    

    【讨论】:

    • 谢谢 :) 我现在可以让操作员进行交流了。
    猜你喜欢
    • 2020-03-13
    • 2018-05-14
    • 2019-06-24
    • 2017-03-06
    • 1970-01-01
    • 2021-11-11
    • 2019-09-10
    • 2017-10-31
    • 2021-01-29
    相关资源
    最近更新 更多