【问题标题】:Starting Apache Nifi with Apache Airflow - how to generate a flow file使用 Apache Airflow 启动 Apache Nifi - 如何生成流文件
【发布时间】:2021-06-12 11:30:15
【问题描述】:

我为我的 ETL 管道设置了 Apache Nifi,并希望使用 Apache Airflow 启动(然后监控)特定处理器。

我看到了从气流 DAG 中实现这一目标的两种方法:

  1. 从头开始生成流文件并将其插入到 Nifi 队列/处理器中
  2. 触发“生成流文件处理器”以创建流文件,然后将其插入队列中

我查看了气流官方文档并知道如何使用 PythonOperator 编写(基本)DAG:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

dag = DAG(
    dag_id='python_nifi_operator',
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['example'],
)

def generate_flow_file():
    """Generate and insert a flow file"""
    # connect to Nifi
    pass
    # access processor
    pass
    # create flow file
    pass 
    # insert flow file 
    pass 
    return 'Success-message for the log'

run_this = PythonOperator(
    task_id='generate_a_custom_flow_file',
    python_callable=generate_flow_file,
    dag=dag,
)

问题是:如何使用 Python 生成流文件? 我一直在寻找一个库,但我只找到了其他带有代码摘录的 stackoverflow 帖子,这些帖子对我没有帮助,我可以甚至找不到他们使用的软件包的文档。欢迎任何提示/完整的代码示例/链接。

【问题讨论】:

    标签: python airflow apache-nifi flowfile


    【解决方案1】:

    没有用于“生成”FlowFile 的 API,拥有一个也没有多大意义。

    也就是说,您可以使用 GenerateFlowFile 处理器并使用 REST API 停止/启动它 - 之前有一些问题询问如何使用 API 执行此操作 https://nifi.apache.org/docs/nifi-docs/rest-api/index.html https://pypi.org/project/nipyapi/

    或者你可以让一个 ListenHTTP/HandleHttpRequest 在一个端点上监听 Airflow,你可以在 Python 中通过向配置的端点发送一个空的 HTTP 请求来触发它,从而生成一个 FlowFile

    【讨论】:

    • 感谢有关 REST API 的提示!我会调查的
    猜你喜欢
    • 1970-01-01
    • 2017-01-16
    • 2019-04-09
    • 1970-01-01
    • 1970-01-01
    • 2020-10-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多