【发布时间】:2021-06-12 11:30:15
【问题描述】:
我为我的 ETL 管道设置了 Apache Nifi,并希望使用 Apache Airflow 启动(然后监控)特定处理器。
我看到了从气流 DAG 中实现这一目标的两种方法:
- 从头开始生成流文件并将其插入到 Nifi 队列/处理器中
- 触发“生成流文件处理器”以创建流文件,然后将其插入队列中
我查看了气流官方文档并知道如何使用 PythonOperator 编写(基本)DAG:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
dag = DAG(
dag_id='python_nifi_operator',
schedule_interval=None,
start_date=days_ago(2),
tags=['example'],
)
def generate_flow_file():
"""Generate and insert a flow file"""
# connect to Nifi
pass
# access processor
pass
# create flow file
pass
# insert flow file
pass
return 'Success-message for the log'
run_this = PythonOperator(
task_id='generate_a_custom_flow_file',
python_callable=generate_flow_file,
dag=dag,
)
问题是:如何使用 Python 生成流文件? 我一直在寻找一个库,但我只找到了其他带有代码摘录的 stackoverflow 帖子,这些帖子对我没有帮助,我可以甚至找不到他们使用的软件包的文档。欢迎任何提示/完整的代码示例/链接。
【问题讨论】:
标签: python airflow apache-nifi flowfile