【问题标题】:Nifi - Process the files based on count or time elapsed?Nifi - 根据计数或经过的时间处理文件?
【发布时间】:2018-12-14 19:20:31
【问题描述】:

我有以下流程, ListFile ---> FetchFile ---> ? ExecuteScript(也许)---> 通知

基本上,我想去通知,如果

  • 总流文件(来自提取文件)是 200;
  • 经过的时间(从最后一个信号开始)是 3 小时。

我认为第一个条件很容易实现。我可以有一个 groovy 脚本,它可以读取流文件的数量,如果 200 进入 SUCCESS 或 ROLLBACK 会话。

但我想知道如何检查队列中 n(数量可以小于 200)个流文件的经过时间是否超过 3 小时左右?

更新 问题是:我们目前有一个批处理(约200个文件,将来可以根据业务增加)。我们有一个 NiFi 管道,即 List、Fetch、校验和的基本验证等以及运行良好的进程(调用 SQL)。 根据业务,我们可以全天对数据进行更正,以便我们可以让所有或部分文件“重新处理”。这也很好,可以工作。

现在,根据新的要求,我们需要在这个“批次”完成后构建流程。所以在最好的情况下,我可以拥有最大 bin 为 n 的 MergeContent 处理器,并向我的新处理器发出信号或通知。 但是,如上所述,在这一天中,我们可以再次处理很少或所有文件。所以现在我的“n”可能与重新处理的文件的新“数量”不匹配。因此,即使在这种情况下,如果我们已经过去了 3 个小时,那么无论“n”不等于重新处理的新文件数,我都应该通知新进程再次运行。 因此,我正在寻找 n 个文件或 m 小时经过的检查。

【问题讨论】:

    标签: apache-nifi


    【解决方案1】:

    我认为这可能是XY problem 的一个示例——您正在尝试解决问题并相信计算获取的文件数或经过的时间会有所帮助,但这种模式在 Apache NiFi 和原始问题还有其他解决方案。我鼓励你更全面地描述你试图解决的更高层次的问题,看看是否有更好的解决方案。

    不过,我会回答这个问题(这些都不是理想的解决方案)。

    • 您可以使用MergeContent 处理器,最小 bin 数为 200
    • 您可以按照您的说明使用ExecuteScript 处理器
    • 您可以在Notify 处理器执行时将一个值(当前时间戳)写入DistributedCacheMapServer,并使用FetchDistributedCacheMap 处理器检查该值与当前时间戳,并使用简单的表达式语言语句来比较时间戳值

    我认为您可能还想阅读Wait/Notify 逻辑的一些示例,因为创建诸如“200 个传入流文件|| 3 小时经过时间”之类的阈值是Wait 处理器所做的。

    【讨论】:

    • 嗨,安迪,感谢您回来。我已按问题编辑了有关当前流程和新要求的更多详细信息。等待并通知确实有意义,我将对此进行更多探索,以了解如何在此处放置“n 个文件 || m 分钟”检查。谢谢。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-09-11
    • 1970-01-01
    • 2010-09-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多