【问题标题】:Transfer multiple flowFiles from ExecuteScript in Nifi在 Nifi 中从 ExecuteScript 传输多个流文件
【发布时间】:2019-03-21 16:21:34
【问题描述】:

我正在尝试使用 python 中的 ExecuteScript 处理器从一个流文件生成多个流文件。

输出流文件依赖于一个配置属性和输入流文件(xml 内容)。

我尝试了很多东西,但总是以如下错误结束:

  • 此流文件已标记为传输
  • 未指定转移关系

低于上一个版本:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import java.io
from org.python.core.util import StringUtil

class PyStreamCallback(StreamCallback):
    def __init__(self, flowFile):
        global matched
        self.parentFlowFile = flowFile
        pass

    def process(self, inputStream, outputStream):
        try:
            text_content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
            flowfiles_list = []

            new_xml = "blabla"
            outputStream.write(bytearray(new_xml.encode('utf-8')))

            for n in range(0,5):
                flowFile = session.create(self.parentFlowFile)
                if (flowFile != None):
                    flowFile = session.write(flowFile, "Nothing")
                    flowfiles_list.append(flowFile)

            for flow in flowfiles_list:
                session.transfer(flow, REL_SUCCESS)
        except:
            print('Error inside process')
            raise

originalFlowFile = session.get()
if(originalFlowFile != None):
    try :
        originalFlowFile = session.write(originalFlowFile, PyStreamCallback(originalFlowFile))
        session.remove(originalFlowFile)

    except Exception as e:
        originalFlowFile = session.putAttribute(originalFlowFile,'python_error', str(e))
        session.transfer(originalFlowFile, REL_FAILURE)

谁能告诉我我做错了什么以及如何实现我想做的事情?

【问题讨论】:

  • 请编辑您的问题并提供当前代码的完整堆栈跟踪。

标签: python apache-nifi


【解决方案1】:

以下是脚本的一些注释:

1) 您正在继承 StreamCallback 并写入原始流文件,但稍后您将其删除。 StreamCallback 用于当您想要覆盖现有流文件的内容时。如果您不需要这样做,您可以使用 InputStreamCallback 作为基类,它不会采用 outputStream arg 但在这种情况下您不需要它。您还可以在原始流文件上使用session.read,而不是session.write

2) flowFile = session.write(flowFile, "Nothing") 行无效,因为 session.write 需要 OutputStreamCallback 或 StreamCallback 作为参数(与下面使用 PyStreamCallback 调用它的位置相同)。当这引发错误时,它会一直上升到脚本的顶层,但是到那时您已经创建了一个流文件并且没有到达将 flowfiles_list 传输到 REL_SUCCESS 的语句。考虑在session.write 周围添加try/except,然后您可以删除新创建的流文件,然后引发异常。

3) 如果您想将传入流文件的全部内容读入内存(您当前正在这样做),然后删除原始流文件并从中创建新的流文件,请考虑改用 @ 的版本返回 InputStream 的 987654328@(即不需要InputStreamCallback)。然后,您可以将内容保存到全局变量中和/或在您想要向创建的流文件写入内容时将其传递到 OutputStreamCallback。比如:

inputStream = session.read(originalFlowFile)
text_content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
inputStream.close()
flowfiles_list = []

for n in range(0,5):
    flowFile = session.create(originalFlowFile)
    if (flowFile != None):
        try:
            flowFile = session.write(flowFile, PyStreamCallback(text_content))
            flowfiles_list.append(flowFile)
        except Exception as e:
            session.remove(flowFile)
            raise

for flow in flowfiles_list:
    session.transfer(flow, REL_SUCCESS)

session.remove(originalFlowFile)

这不包括将 PyStreamCallback 重构为 OutputStreamCallback,它在构造函数中采用字符串 arg 而不是 FlowFile。

【讨论】:

  • 感谢您的回复。我试试看
猜你喜欢
  • 2021-02-03
  • 2020-06-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-11-13
  • 1970-01-01
  • 2017-04-04
  • 1970-01-01
相关资源
最近更新 更多