【问题标题】:How to pass an argument to a spark submit job in airflow如何将参数传递给气流中的火花提交作业
【发布时间】:2022-01-02 14:04:06
【问题描述】:

我必须使用 sparksubmit 运算符从气流中触发 pyspark 模块。但是,pyspark 模块需要将 spark 会话变量作为参数。我使用 application_args 将参数传递给 pyspark 模块。但是,当我运行 dag 时,spark submit 运算符失败了,我传入的参数被视为 None 类型变量。 需要知道如何将参数传递给通过 spark_submit_operator 触发的 pyspark 模块。

DAG 代码如下:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PRJT").enableHiveSupport().getOrCreate()

spark_config = {
    'conn_id': 'spark_default',
    'driver_memory': '1g',
    'executor_cores': 1,
    'num_executors': 1,
    'executor_memory': '1g'
}

dag = DAG(
    dag_id="spark_session_prgm",
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False)

spark_submit_task1 = SparkSubmitOperator(
    task_id='spark_submit_task1',
    application='/home/airflow_home/dags/tmp_spark_1.py',
    application_args=['spark'],
    **spark_config, dag=dag)

tmp_spark_1.py程序中的示例代码:

【问题讨论】:

    标签: pyspark airflow


    【解决方案1】:

    经过一番调试,我找到了解决问题的方法。

    argparse 是它不起作用的原因。相反,我将syssys.argv[1] 一起使用,它完成了这项工作。

    【讨论】:

    • 请仅使用答案发布解决方案。不要问问题。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多