【发布时间】:2018-12-13 20:59:33
【问题描述】:
最近我一直在玩 Airflow 和 PySpark。我看到 Airflow 有许多变量。我的目标是解析其中一个变量并将其导入到我的 pySpark 脚本中。到目前为止,我试图回显变量的值(有效),但是我找不到导入 pySpark 的方法(我想将该变量的值传递给我的 pyspark 脚本中的另一个变量)。我还附上了我的代码(job_id 是我正在谈论的变量)。
test_bash = """
export un_id={{ti.job_id}}
echo $un_id
"""
bash_task = BashOperator(
task_id='test',
bash_command=test_bash,
xcom_push=True,
provide_context=True,
dag=dag)
def pull_function(**kwargs):
ti = kwargs['ti']
rt = ti.xcom_pull(task_ids='test')
print(rt)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=dag
)
#############
bash_task >> pull_task
知道我应该如何继续或者我做错了什么吗?
【问题讨论】: