【发布时间】:2019-04-05 09:01:57
【问题描述】:
我对 Airflow 非常陌生,并且已经阅读了它的大部分文档。从文档中,我了解到可以使用 XCom 类共享 DAG 中组件之间的小数据。 DAG 中发布数据的组件必须推送,订阅数据的组件必须拉取。
但是,我对推拉的语法部分不是很清楚。我指的是documentation 上关于 XCom 的部分,并开发了一个代码模板。假设我有以下代码,它只有两个组件,一个推杆和一个拉杆。 pusher 发布 puller 必须消耗的当前时间并将其写入日志文件。
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
log_file_location = '/usr/local/airflow/logs/time_log.log'
default_args = {'owner':'apache'}
dag = DAG('pushpull', default_args = default_args)
def push_function():
#push this data on the DAG as key-value pair
return(datetime.now()) #current time
def pull_function():
with open(log_file_location, 'a') as logfile:
current_time = '' #pull data from the pusher as key - value pair
logfile.writelines('current time = '+current_time)
logfile.close()
with dag:
t1 = PythonOperator(
task_id = 'pusher',
python_callable = push_function)
t2 = PythonOperator(
task_id = 'puller',
python_callable = pull_function)
t2.set_upstream(t1)
我在这里需要 Airflow 大师的帮助,有两种语法:
- 如何从推送功能中推送数据以及按键
- 如何获取 pull 函数使用 key 拉取数据。
谢谢!
【问题讨论】:
标签: python publish-subscribe airflow