【问题标题】:Apache NiFi to split data based on conditionApache NiFi 根据条件拆分数据
【发布时间】:2016-12-30 02:56:45
【问题描述】:

我们的要求是根据条件拆分流数据。 我们想为此使用“ExecuteStreamCommand”处理器(实习生将使用 java 类),但它只提供单流数据文件。我们想要两个流数据文件,一个用于匹配条件,另一个用于不匹配条件。

我查看了“RouteText”处理器,但它没有使用 java 类作为其中一部分的功能。

如果有人有任何建议,请告诉我。

【问题讨论】:

  • 什么条件,为什么需要使用java类?
  • 嗨,如果指定位置的特定字符串是“ABC”,那么我必须将该消息移至一个 Kafka 主题,否则将移至另一个主题。 Soi 想从 DB 中动态获取字符串 "ABC" 并检查,未来我们可能会添加更多不同的命令类型。
  • 换句话说,你有一个传入的FlowFile。您想使用 DB 查询的结果来检查结果是否包含在特定位置的 FlowFile 的内容中。然后根据是否是路由。这个对吗?您还使用什么“数据库”?最后,传入的 FlowFile 的内容是特定格式的吗?
  • 嗨,是的,你是正确的。我使用 MongoDB 作为数据库,是的,我有特定的传入数据格式。

标签: apache-nifi


【解决方案1】:

我认为您可以使用GetMongo 读取这些定义值并将它们存储在DistributedMapCacheClientService 访问的映射中,然后使用RouteOnContent 根据检索到的值的缺失/存在来路由传入的流文件。

如果这不起作用,您可以将查询结果从GetMongo 路由到PutFile,然后使用ScanContent,它从文件系统上的字典文件中读取并根据缺席/路由流文件/这些关键字在内容中的存在。

最后,如果所有其他方法都失败了,您可以使用ExecuteScript 将这些步骤组合到一个处理器中并路由到matched/unmatched 关系。它可以轻松处理 Groovy 代码,因此您可以在必要时直接调用现有的 Java 类。

【讨论】:

  • 第一个不工作。所以在第二个选项中,我们如何访问“目录文件”中的 mongo 数据?我需要仅供参考吗,ScanContent 的输入是 Mongo 和 ListeningPort。
  • 您必须将 Mongo 查询的结果写入存储在磁盘上的平面文件,并且根据查询的输出格式,可能使用ReplaceText 正确格式化。然后,使用PutFile 的输出路径作为RouteOnContent 的字典文件路径,以便读取这些定义。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-07-17
  • 1970-01-01
  • 2017-06-17
  • 2010-10-31
  • 2018-03-03
相关资源
最近更新 更多