【问题标题】:Execute a Databricks Notebook with PySpark code using Apache Airflow使用 Apache Airflow 使用 PySpark 代码执行 Databricks Notebook
【发布时间】:2019-11-07 11:35:41
【问题描述】:

我正在使用 Airflow、Databricks 和 PySpark。我想知道当我想通过 Airflow 执行 Databricks Notebook 时是否可以添加更多参数。

我在 Python 中编写了名为 MyETL 的下一个代码:

def main(**kwargs):
      spark.sql("CREATE TABLE {0} {1}".format(table, columns))
      print("Running my ETL!")

    if __name__== "__main__":
      main(arg1, arg2)

我想定义其他任务参数来运行具有更多参数的 Databricks 笔记本,我想添加方法的名称以及这些方法的参数。例如,当我想在 Airflow 中的 DAG 中注册任务时:

   notebook_task_params = {
        'new_cluster': new_cluster,
        'notebook_task': {
            'notebook_path': '/Users/airflow@example.com/MyETL',
            'method_name': 'main',
            'params':'[{'table':'A'},{'columns':['a', 'b']}]'
        },
    }

我不知道这是否可能,因为我没有找到类似的例子。

# Example of using the JSON parameter to initialize the operator.
notebook_task = DatabricksSubmitRunOperator(
    task_id='notebook_task',
    dag=dag,
    json=notebook_task_params)

换句话说,我想使用 Airflow 执行带有参数的笔记本。我的问题是我该怎么做?

【问题讨论】:

    标签: python apache-spark airflow databricks


    【解决方案1】:

    您也可以将method_name 添加为params,然后在笔记本上解析出您的逻辑。

    但是,这里更常见的模式是确保该方法已安装在您的集群上。

    params = '[{'table':'A'},{'columns':['a', 'b']}]'

    然后在数据块上的笔记本中:

    table = getArgument("table", "DefaultValue")
    columns = getArgument("columns", "DefaultValue")
    
    result = method(table, columns)
    

    如果您可以在笔记本作业运行中看到您的参数(如上图所示),您还可以通过 getArgument() 访问这些参数。

    【讨论】:

    • getArgument 是 Airflow 方法吗?那么关键字“params”就可以了吗?你将如何编写 notebook_task_params?
    • getArgument 是一个特殊的数据块函数,它从环境中获取变量。现在显然不赞成使用getdocs.databricks.com/user-guide/dev-tools/…。除了method_name,您的 DAG 似乎还不错。尝试提交作业运行并在笔记本中调用print get('table') 以测试参数是否正确传递。您还应该在作业运行的数据块笔记本中看到您的参数。我会在我之前的回答中附上一张图片。
    猜你喜欢
    • 2023-02-12
    • 1970-01-01
    • 2020-12-26
    • 2019-12-02
    • 2017-06-06
    • 1970-01-01
    • 2018-05-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多