【问题标题】:Apache NiFi: merge SQL lines into JsonApache NiFi:将 SQL 行合并到 Json 中
【发布时间】:2019-03-21 22:25:58
【问题描述】:

我有一个 SQL 数据库,我提取了一些行,将它们转换为 Json 以提供 MongoDB。我坚持转型步骤。我试过这个流程: 该过程在 MergeRecord 处理器上停止,我不知道为什么。

目的是转换这种(简化的)SQL查询结果:

ID      ROUTE_CODE  STATUS SITE_ID SITE_CODE      
379619  1801300001  10     220429   100001
379619  1801300001  10     219414   014037
379619  1801300001  10     220429   100001
379620  1801300002  10     220429   100001
379620  1801300002  10     219454   014075
379620  1801300002  10     220429   100001

到这个json:

[
  {
    "routeId": "379619",
    "routeCode": "1901300001",
    "routeStatus": "10",
    sites: [
        { "siteId": "220429", "siteCode" : "100001" },
        { "siteId": "219414", "siteCode" : "014037" }           
    ]
  },
  {
    "routeId": "379620",
    "routeCode": "1901300002",
    "routeStatus": "10",
    sites: [
        { "siteId": "220429", "siteCode" : "100001" },
        { "siteId": "219454", "siteCode" : "014075" }           
    ]
  }
]

MergeRecord 应该按“routeId”分组,我也不知道将站点分组为数组的正确 Jolt 变换...

【问题讨论】:

    标签: apache-nifi


    【解决方案1】:

    流被卡住,因为在 ConvertAvrToJson 和 MergeRecord 之间的队列上出现了反压,这可以通过红色指示器看到,表明队列已达到其最大大小 10k 流文件。这意味着 ConvertAvroToJson 处理器在队列的阈值降低之前将不再执行,除非 MergeRecord 可能正在等待更多文件,因此队列不会减少。

    您可以更改队列上的设置以将阈值增加到高于您正在等待的记录数,或者您可以以不同的方式实现流程...

    在 ExecuteSql 之后,看起来 3 个处理器基本上被用于拆分、转换为 json 并重新合并在一起。这可以通过不拆分并仅将 ConvertRecord 与 Avro 读取器和 JSON 写入器一起使用来更有效地完成,这样您就可以执行 ExecuteSQL -> ConvertRecord -> JOLT。

    此外,您可能希望将 JoltTransformRecord 视为 JoltTransformJson 的替代品。

    【讨论】:

    • 您也可以将 ExecuteSQLRecord 与 JSON Writer 一起使用,而不是 ExecuteSQL -> ConvertRecord
    • 听起来更好:)
    • 感谢您的所有 cmets。我会试试你的建议。
    【解决方案2】:

    在 ExecuteSQL(或 ExecuteSQLRecord)之后,您可以使用添加了以下用户定义属性的 PartitionRecord(属性名称在 = 左侧,值在右侧):

    routeId = /ROUTE_ID
    routeCode = /ROUTE_CODE
    routeStatus = /STATUS
    

    PartitionRecord 应该使用 JSON 编写器,然后您可以使用 JoltTransformJson 和以下规范:

    [
      {
        "operation": "shift",
        "spec": {
          "*": {
            "ID": "routeId",
            "ROUTE_CODE": "routeCode",
            "STATUS": "routeStatus",
            "SITE_ID": "sites[#2].siteId",
            "SITE_CODE": "sites[#2].siteCode"
          }
        }
      },
      {
        "operation": "modify-overwrite-beta",
        "spec": {
          "routeId": "=firstElement(@(1,routeId))",
          "routeCode": "=firstElement(@(1,routeCode))",
          "routeStatus": "=firstElement(@(1,routeStatus))"
        }
      }
    ]
    

    这会将每个站点 ID/代码分组到 sites 字段中。然后你只需要 MergeRecord 将它们重新组合在一起。不幸的是,PartitionRecord 还不支持fragment.* 属性(我已经写了NIFI-6139 来介绍这个改进),所以MergeRecord 不能保证来自原始输入文件的所有转换记录都在同一个合并流文件。但是,每个合并的流文件都将包含带有 sites 数组的记录,其中包含一些唯一的 routeId/routeCode/routeStatus 值。

    【讨论】:

    • 谢谢,非常有帮助!我只是想知道是否有办法避免站点数组中的重复?
    • 有一个开放的 Jira 案例 (issues.apache.org/jira/browse/NIFI-6047) 添加一个 DetectDuplicateRecord 处理器。同时,您可能能够使用 QueryRecord 来执行某种SELECT DISTINCTGROUP BY。另一个不太理想的解决方法是在 siteIdsiteCode 字段上再次使用 PartitionRecord,然后在每个流文件上使用 SELECT * FROM FLOWFILE LIMIT 1 QueryRecord
    猜你喜欢
    • 2017-05-20
    • 2019-09-21
    • 2020-03-22
    • 2023-03-13
    • 2020-10-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-24
    相关资源
    最近更新 更多