【问题标题】:Generating multiple flowfiles using the Nifi ExecuteScript processor使用 Nifi ExecuteScript 处理器生成多个流文件
【发布时间】:2017-03-29 22:49:57
【问题描述】:

我正在处理一个 Nifi 流程,我正在获取一个包含多个键值对的 JSON 文档。我正在使用ExecuteScript 处理器和python

我的目标是创建基于 JSON keys 的各种 URL。键是数字的,它们看起来像这样:

keys = [10200, 10201, 10202, ...]

我想要的 URL 有 3 种类型,它们应该如下所示:

http://google.com/10200
http://bing.com/10200
http://yahoo.com/10200

我正在尝试遍历我的 keys[] 并为其包含的每个数字键创建 3 个特定的 url。我正在尝试以下代码:

-->列表中读取一个数字键创建3个网址-->吐出一个流文件。

......并读取列表中的下一个数字键并继续循环......

我有以下代码,但是当我给它一个 JSON 流文件时,它现在什么都不做。有人可以告诉我我做错了什么吗?

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class ModJSON(StreamCallback):

  def __init__(self):
        self.parentFlowFile = None
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    flowfiles_list = [] 

    outputStream.write(bytearray(json.dumps(obj.keys(), indent=4).encode('utf-8')))


    for numerical_key in obj.keys():
      # create 1 flowfile for each numerical_key. Each flow file should have 3 url attributes 
      flowFile = session.create(self.parentFlowFile)
      if (flowFile != None):
        flowFile = session.write(flowFile, "Does not matter")
        flowFile = session.putAttribute(flowFile, "google", "http://google.com/"+ numerical_key)

        flowFile = session.putAttribute(flowFile, "google", "http://bing.com/"+ numerical_key)

        flowFile = session.putAttribute(flowFile, "google", "http://yahoo.com/"+ numerical_key)
        flowfiles_list.append(flowFile)

    for flow in flowfiles_list:
      session.transfer(flow, REL_SUCCESS)

【问题讨论】:

    标签: python apache-nifi


    【解决方案1】:

    问得好,这是流文件 API 的回调方法的细微差别。您已创建 StreamCallback 的子类,但尚未检索输入流文件或使用它通过您的类的实例覆盖内容。

    在定义你的 ModJSON 类之后试试这个:

    originalFlowFile = session.get()
    if(originalFlowFile != None):
        originalFlowFile = session.write(flowFile, ModJSON())
        session.remove(originalFlowFile)
    

    这将获得一个输入流文件(或等待一个出现),然后调用您的 StreamCallback 以覆盖您的流文件的内容。在我的示例中,您将丢弃输入流文件,因此如果这对您的用例来说是正确的行为,那么您可以只扩展 InputStreamCallback 而不是 StreamCallback 并删除 outputStream.write(),如果您不使用 outputStream 任何东西.为此,请将 StreamCallback 替换为 InputStreamCallback 并从 process() 方法中删除“outputStream”参数。

    在您的示例中,一旦您在上面添加了我的 sn-p,您将使用 json.dumps() 命令覆盖输入内容,以及创建和传输新文件,所有这些都具有相同的关系(成功),所以如果它们的格式不同,这可能会导致问题(这就是我添加 session.remove() 的原因)。如果您需要原始流文件与其他文件建立不同的关系,请考虑InvokeScriptedProcessor 而不是 ExecuteScript。如果你不关心处理后的输入流文件(添加 URL 属性完成),那么按照我上面的建议。如果他们都可以走出相同的关系(成功),那么将我的 session.remove() 替换为

    session.transfer(originalFlowFile, REL_SUCCESS)
    

    查看我的 ExecuteScript 食谱文章(part 2,共 3 篇),了解更多 Jython(和其他语言)用例的示例:)

    【讨论】:

      猜你喜欢
      • 2020-03-01
      • 1970-01-01
      • 2016-08-30
      • 1970-01-01
      • 2018-01-11
      • 2021-02-03
      • 2022-09-27
      • 2020-09-20
      • 2019-10-12
      相关资源
      最近更新 更多