【问题标题】:Import Airflow variable to PySpark将气流变量导入 PySpark
【发布时间】:2018-12-13 20:59:33
【问题描述】:

最近我一直在玩 Airflow 和 PySpark。我看到 Airflow 有许多变量。我的目标是解析其中一个变量并将其导入到我的 pySpark 脚本中。到目前为止,我试图回显变量的值(有效),但是我找不到导入 pySpark 的方法(我想将该变量的值传递给我的 pyspark 脚本中的另一个变量)。我还附上了我的代码(job_id 是我正在谈论的变量)。

test_bash = """
export un_id={{ti.job_id}}
echo $un_id
"""

bash_task = BashOperator(
    task_id='test',
    bash_command=test_bash,
    xcom_push=True,
    provide_context=True,
    dag=dag)

def pull_function(**kwargs):
    ti = kwargs['ti']
    rt = ti.xcom_pull(task_ids='test')
    print(rt)

pull_task = PythonOperator(
    task_id='pull_task',
    python_callable=pull_function,
    provide_context=True,
    dag=dag
)

#############
bash_task >> pull_task

知道我应该如何继续或者我做错了什么吗?

【问题讨论】:

    标签: pyspark airflow


    【解决方案1】:

    这个值实际上叫做run_id,可以通过上下文或者宏来访问。

    Pythonoperator 中,这是通过上下文访问的,而在BashOperator 中,这是通过bash_command 字段上的jinja 模板访问的。

    有关宏中可用功能的更多信息:

    https://airflow.incubator.apache.org/code.html#macros

    更多关于 jinja 的信息:

    https://airflow.incubator.apache.org/concepts.html#jinja-templating

    from airflow.models import DAG
    from datetime import datetime
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import PythonOperator
    
    
    dag = DAG(
        dag_id='run_id',
        schedule_interval=None,
        start_date=datetime(2017, 2, 26)
    )
    
    def my_func(**kwargs):
        context = kwargs
        print(context['dag_run'].run_id)
    
    t1 = PythonOperator(
        task_id='python_run_id',
        python_callable=my_func,
        provide_context=True,
        dag=dag
        )
    
    t2 = BashOperator(
        task_id='bash_run_id',
        bash_command='echo {{run_id}}',
        dag=dag)
    
    t1.set_downstream(t2)
    

    以这个 dag 为例,检查每个操作员的日志,你应该会看到日志中打印了run_id

    【讨论】:

    • 谢谢它正在工作!我还想问你一件事,为了在我的 pySpark 脚本中检索 run_id,我是否必须像导入系统变量一样遵循该过程?或者是否有一种特殊的方式来检索和引用该变量?
    • 很高兴它有帮助,您能否接受答案,以便偶然发现此问题的未来用户知道它:) 关于在 PySpark 脚本中引入run_id,取决于您用来运行的运算符。如果你使用BashOperator,你可以在你的脚本中以同样的方式使用{{ run_id }}
    • 我还在为导入而苦苦挣扎。我认为这将是一个环境变量,但显然不是。我也试过这个我在你提供的链接中找到的:testing = "{{run_id}}" bash_task = BashOperator( task_id='bash_run_id', #bash_command='echo {{run_id}}', bash_command= 'tmp/myfile.py', provide_context=True, dag=dag, env = {'UNIQUE_ID': testing}) 但它也没有用。我无法理解我错过了什么。
    • 嘿,不,您应该使用PythonOperator 运行该文件,然后它就可以工作了。如果要使用 BashOperator,则需要使用 bash 脚本,例如 bash_command=run_pyspark.sh。而你的run_pyspark.sh,第一行可以是export RUNID={{run_id}}; python tmp/myfile.py,在你的python文件中你可以通过os.environ[RUNID]使用它
    【解决方案2】:

    我没有尝试过@kaxil 的建议,但如果我理解正确你的问题,你想从 Airflow 中检索 run_id 变量并在你的 python (pySpark) 脚本中使用它。如果是这种情况,我假设您使用 BashOperatorspark-submit 您的工作。提交 spark-job 时,您可以提交(连同您的工作)some arguments。这些参数显示为系统参数,如果您执行print(sys.argv),您可以看到这些参数(有助于查看您的变量在哪个位置)。 由于您已经使用 bash_task 推送了变量,因此您必须将其拉出。因此,当您提交 Spark 作业时,您还应该添加一个额外的参数,如下所示:

    cmd=""spark-submit your-pyspark-file.py {{ ti.xcom_pull("test") }}
    
    retrieval = BashOperator(
        namespace='randomname',
        arguments=[cmd],
        name='example-dag1',
        task_id='name-you-desire',
        provide_context=True,
        get_logs=True, 
        dag=dag)
    

    然后,如果您确实执行了print(sys.argv),您将能够将您的变量视为一个参数,并且在您的脚本中您可以通过sys.argv[1] 引用该变量(如果它在第二个位置,则为 0在第一个等)。

    【讨论】:

      猜你喜欢
      • 2022-07-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-15
      • 2023-02-11
      • 2020-01-16
      • 2023-01-28
      • 1970-01-01
      相关资源
      最近更新 更多