【问题标题】:Merge two JSON flowfile together in NiFi在 NiFi 中将两个 JSON 流文件合并在一起
【发布时间】:2018-09-02 13:31:02
【问题描述】:

我想通过相同的指定属性合并两个包含 JSON 对象的流文件...

流程1:

attribute:    
xuuid = 123456

content:
{
"sname":"jack",
"id":"00001",
"state":"NY"
}

flow2:

attribute:    
xuuid = 123456

content:
{
"country":"US",
"date":"1983"
}

我希望这种形式的数据在单个输出流中:

desired_flow:

attribute:    
xuuid = 123456

content:
{
"sname":"jack",
"id":"00001",
"state":"NY",
"country":"US",
"date":"1983"
}

我怎么玩这个? MergeContent 处理器还是 MergeRecord? 我认为 mergerecord 可以处理它,但我对此感到困惑。

【问题讨论】:

    标签: apache-nifi


    【解决方案1】:

    是的,MergeContent 可以为您做到这一点。

    我使用 EvalJson --> MergeContent --> AttributesToJson

    我在这里发布了一个模板,你可以用它来玩。 Apache NiFi Merge Json Template

    MergeContent 必须具有以下设置:“保留所有属性”、“2 个整数”、“分隔符策略为文本”

    【讨论】:

    • 属性(xuuid)怎么样?此合并必须通过此属性的相关性来完成...
    • 使用 EvalJson 比其他方法更消耗系统 RAM(因为将属性存储在 RAM 中),我不喜欢这种解决方案。我上面介绍的两个 json 是示例,在实际工作中,我的工作(多文本字段)的每个流大小都接近 10MB,那么使用该处理器是不可能的。
    • 我在其他项目中使用合并记录,这是非常有用和强大的处理器...我认为 JSON 树阅读器和 JSON 编写器可以处理这个问题,但由于它的复杂性,我很困惑,我需要谁有这方面的专家。
    • 在两个流文件中它是有效的,但在我的情况下,流不是序数,一个流必须与来自其他流的其他流合并,该属性与它相同....我测试你的流并改变记录生成器的时间然后结果出错!
    • 那不在你最初的问题范围内
    【解决方案2】:

    您要求的是流式连接,这不是 NiFi 真正做的事情,类似的问题和答案在这里:

    https://stackoverflow.com/a/42909221/5650316

    合并处理器用于一个接一个地合并数据,而不是执行流连接。例如,如果您有许多小的 json 消息,您可能希望在写入 HDFS 之前使用 MergeContent 或 MergeRecord 将数千条消息合并到一个流文件中。

    【讨论】:

      【解决方案3】:

      An answer to another question 展示了如何使用MergeContent 后跟JoltTransformJSON 来完成此操作。

      就像这里的 OP 一样,我想合并一个特定的属性(在我的例子中是 filename),所以我的 MergeContent 配置略有不同:

      Merge Strategy: Bin-Packing Algorithm
      Merge Format: Binary Concatenation
      Correlation Attribute Name: filename  # or xuuid, or whatever you want
      Minimum Number of Entries: 2
      Delimiter Strategy: Text
      Header: [
      Footer: ]
      Demarcator: ,
      

      之后第二部分的解法同理:

      然后转移到 JoltTrasnformJSON 并将 Jolt Transformation DSL 设置为 Shift 并将 Jolt Specification 设置为:

      {
        "*": {
          "*": "&"
        }
      }
      

      这应该可以完成工作:)

      粉碎解决方案,向@Ben Yaakobi致敬。

      作为解释,我唯一可以补充的是 @Bryan Bendeanswer 在技术上是正确的,即 NiFi 不是为这种事情而设计的。因此,上面的答案有点破解:

      • 在第一部分中,MergeContent 实际上完全忽略了我们正在使用 JSON 的事实(它的Binary Concatenation 意味着它只是将内容作为原始字节处理)。它只是通过使用 Header、Footer 和 Decmarcator 设置“伪造”将两条记录合并到一个 JSON 数组中,如图所示,这 恰好是 JSON 语法。
      • 然后在第二部分中,Jolt 能够将经过处理的文本解析为有效的 JSON,并应用其转换魔法。

      为了更好地理解所使用的 Jolt 语法,这里有一些关于该主题的有用资源:

      另见some alternative approaches mentioned here。特别是,我认为将使用 MergeRecord / MergeContent 与相关属性或碎片整理模式,然后使用 QueryRecordCOALESCEGROUP BY 结合在一起的方法将两个数据集中的列连接在一起,与这个问题最相关(尽管我自己没有尝试过)。

      【讨论】:

        猜你喜欢
        • 2020-03-27
        • 2011-10-23
        • 1970-01-01
        • 1970-01-01
        • 2017-12-21
        • 2018-04-16
        • 1970-01-01
        • 1970-01-01
        • 2021-08-13
        相关资源
        最近更新 更多