【问题标题】:How to map the column wise data in flowfile in NiFi?如何在 NiFi 的流文件中映射列数据?
【发布时间】:2017-03-02 05:03:01
【问题描述】:

我有 csv 文件,其结构如下。,

Alfreds,Centro,Ernst,Island,Bacchus
Germany,Mexico,Austria,UK,Canada
01,02,03,04,05

现在我必须像下面那样将该数据移动到数据库中。

Name,City,ID
Alfreds,Germay,01
Centro,Mexico,02
Ernst,Austria,03
Island,UK,04
Bacchus,Canda,05

我尝试映射这些列,但我无法按列提取数据。

这里我的输入数据按列排列,但我需要在 SQLServer 中按行插入这些数据

任何人都可以建议在 sql server 中将列式数据传输到行式的方法吗?

谢谢

【问题讨论】:

    标签: sql sql-server apache-nifi


    【解决方案1】:

    没有现有的 Apache NiFi 处理器来执行列转置。问题之一是这很难以流方式完成,因为大多数 NiFi 组件都是设计的,因为在幼稚的实现中,您需要同时将流文件的全部内容保存在活动内存中。

    我建议使用ExecuteScript 处理器来执行此操作 (here's a 6 line Python example)。这样做要小心,因为如果设置不正确/您将意外的大文件读入内存,您很容易导致堆溢出。

    您可以编写一个自定义处理器,通过迭代每一行 n 行并读取到您的分隔符,每行存储一个字节计数器,结合 n 元素作为单个输出行,并从每行的相应字节计数器开始重复该过程。 (给定 m 列,这是O(m * n))。

    另一种解决方案是使用 SplitText 处理器将 CSV 输入拆分为单独的行,使用 ExecuteScript 或自定义处理器将单行转换为单列,然后使用自定义合并操作(扩展现有的MergeContent 处理器或编写脚本来执行此操作)将传入的列横向连接到重构矩阵中。 (O(n) + O(n) + O(m) => O(2n + m) 但单独的转置操作可以并行执行,因此 x 线程是O(n + n/x + m))。

    任何这些方法都需要一定程度的定制开发。如果您真的犹豫不决,可以尝试使用ExecuteStreamCommandmany bash solutions 之一在命令行上进行换位。

    【讨论】:

      【解决方案2】:

      @安迪,

      在 NiFi 中也可以不使用 ExecuteScript。

      我已将 3 个输入行提取为 ExtractText 中的 input.1、input.2、input.3。然后使用 expression language 中的 AnydelinateValues 计算“input.1”中的列数,并将其存储在 “TotalCount” 属性中。

      最初制作“Count=1”。

      使用循环概念通过使用“Count”获取第一列,然后增加“Count”检查RouteOnAttribute中的“Count” “le(总数)”

      现在使用 "Count" 属性形成插入查询。

      它对我很有效。它可能对某人有用。

      【讨论】:

      • 这将无法很好地扩展,并且会将流文件的全部内容加载到堆空间中——具有数千行的流文件可能会溢出您的堆。
      • 是的,它占用了堆空间。对于大规模,这些方法不适用,它会影响堆内存。
      猜你喜欢
      • 1970-01-01
      • 2017-10-03
      • 2021-05-08
      • 2021-09-29
      • 1970-01-01
      • 2021-07-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多