【问题标题】:How to join two CSVs with Apache Nifi如何使用 Apache Nifi 加入两个 CSV
【发布时间】:2017-03-20 16:22:45
【问题描述】:

我正在研究 ETL 工具(如 Talend)并调查是否可以使用 Apache Nifi。 Nifi 可用于执行以下操作:

  1. 提取两个放在本地磁盘上的 CSV 文件
  2. 在公共列上加入 CSV
  3. 将连接的 CSV 写入磁盘

我尝试在 Nifi 中设置工作,但看不到如何执行两个单独的 CSV 文件的连接。在 Apache Nifi 中是否可以执行此任务?

看起来QueryDNS processor 可用于使用另一个 CSV 文件来丰富一个 CSV 文件,但对于这个用例来说这似乎过于复杂。

这是一个输入 CSV 的示例,需要在 state_id 上加入:

输入文件

customers.csv

id | name | address      | state_id
---|------|--------------|---------
1  | John | 10 Blue Lane | 100
2  | Bob  | 15 Green St. | 200

states.csv

state_id | state
---------|---------
100      | Alabama
200      | New York

输出文件

输出.csv

id | name | address      | state
---|------|--------------|---------
1  | John | 10 Blue Lane | Alabama
2  | Bob  | 15 Green St. | New York

【问题讨论】:

    标签: etl apache-nifi


    【解决方案1】:

    Apache NiFi 更像是一种数据流工具,并不是真正用于执行流数据的任意连接。通常,这些类型的操作更适合 Storm、Flink、Apex 等流处理系统或 ETL 工具。

    NiFi 可以做得很好的连接类型是富集查找,其中存在固定大小的查找数据集,并且对于传入数据中的每条记录,您可以使用查找数据集来检索一些值。例如,在您的情况下,可能有一个名为 LookUpState 的处理器,它有一个属性“State Data”,指向一个包含所有状态的文件,那么 customers.csv 可能是该处理器的输入。

    一位社区成员启动了一个项目,为 NiFi 提供通用查找服务: https://github.com/jfrazee/nifi-lookup-service

    【讨论】:

      【解决方案2】:

      为此遵循的典型模式是将参考集加载到 NiFi 中的地图缓存控制器服务中。在这种情况下,这是 states.csv 数据。然后,客户数据的实时提要进来,并使用 ReplaceText 之类的参考数据丰富了此参考数据,或者您甚至可以在 Groovy 中编写自定义处理器。有很多方法可以切片。还有一个JIRA/PR 来让这更容易。实时流连接的某些元素最好在 Apache Storm、Spark 和 Flink 等处理系统中完成,但对于您提到的情况,它可以在 NiFi 中完成。

      【讨论】:

        【解决方案3】:

        workflow

        我还尝试使用 common 列连接两个 CSV 文件,并使用 nifi lookup record config 中的查找记录属性成功地做到了

        在这里,我使用simplecsvlookup 服务作为我的查找服务,我还附加了它的配置simplecsvlookup configuration

        我们首先要学习的是如何使用查找记录属性。在这里,我有两个 csv 文件:

        sample.csv: id,msisdn,recharge_amount 1,9048108594,399

        新1: msisdn,类型 9048108594,1

        输出: id,msisdn,recharge_amount,类型 1,9048108594,399,1

        最需要注意的是结果记录路径和key 在这种情况下,键是 msisdn (因为这是两个文件中的通用键) 并且,对于结果记录路径,我们应该使用我们需要与我们合并的列名,在这种情况下是“类型:”

        result record path--->> /type
        key----->> /msisdn
        

        并且,在查找服务中,给出各自的键名和值名。

        它会起作用的。

        【讨论】:

        • @Adrian Mole,你有这个模板吗?当我尝试上面的示例时,Flowfile 到达 LookupRecord 但不会生成任何输出到任何一个 Matched/Unmatched / Failure 。
        • @Vizag 你问错人了。我在这里所做的只是改进答案的印刷格式。但是您的 ping 也将到达此答案的作者。
        • 事实上,我什至没有自己进行编辑:我只是在审核队列中批准了它。
        • 啊,感谢 Adrain 提醒我我的错误并为此感到抱歉。 @Bruce,如果可能的话,请在这里分享模板 xml。
        • @Vizag 您可以查看这些图像并正确配置查找服务
        猜你喜欢
        • 1970-01-01
        • 2020-12-29
        • 1970-01-01
        • 2020-03-22
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-01-05
        • 1970-01-01
        相关资源
        最近更新 更多