【问题标题】:Return multiple files from Python ExecuteScript in NiFi从 NiFi 中的 Python ExecuteScript 返回多个文件
【发布时间】:2020-06-09 13:49:24
【问题描述】:

我编写了一个 Python/Jython 脚本以在 NiFi 的 ExecuteScript 处理器中运行来解析我的无效 JSON 文档。我根据this question 中的脚本和 Matt Burgess 的精彩cookbook 编写了下面的脚本,但它没有返回多个流文件。相反,它返回应用了正则表达式更正的输入流文件,但仅作为一个文件。我需要修改什么来为循环中的每一行返回 1 个流文件?

脚本

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import re

# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    # regex out invalid escapes -- WORKS
    text = re.sub(r"[(\\)]", "", text)
    # split out each line into a separate file -- DOES NOT WORK
    for t in text.splitlines():
        outputStream.write(t)
# end class
flowFile = session.get()
if(flowFile != None):
    try:
        flowFile = session.write(flowFile, PyStreamCallback())
        session.transfer(flowFile, REL_SUCCESS)
    except Exception as e:
        log.error('Something went wrong', e)
        session.transfer(flowFile, REL_FAILURE)
# implicit return at the end

json -- 目标是每一行 == 一个流文件

{"fruit":"apple", "vegetable":"celery", "location":{"country":"nor\\way", "city":"oslo"}, "color":"blue"}
{"fruit":"cherry", "vegetable":"kale", "location":{"country":"france", "city":"calais"}, "color":"green"}
{"fruit":"peach", "vegetable":"peas", "location":{"country":"united\\kingdom", "city":"london"}, "color":"yellow"}

ETA 添加了session.create() 并根据this 删除了带有session.remove(flowFile) 的原始流文件,但NiFi 说流文件未定义?

# imports not changed 
class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    text = re.sub(r"[(\\)]", "", text)
    flowfiles_list = []
    for t in text.splitlines():
        flowFile = session.create()
        if (flowFile != None):
            flowFile = session.write(flowfile, t)
            flowfiles_list.append(flowFile)
    for flow in flowfiles_list:
        session.transfer(flow, REL_SUCCESS)

originalFlowFile = session.get()
if(originalFlowFile != None):
    # NiFi says flowFile not defined
    originalFlowFile = session.write(flowFile, PyStreamCallback()) 
    session.remove(originalFlowFile)

【问题讨论】:

  • 来自同一本食谱检查 session.create() 示例community.cloudera.com/t5/Community-Articles/…
  • @daggett 我试图合并session.create()(添加到原始帖子)但我收到一个错误,即未指定传输关系,尽管我认为我在session.transfer(new_flowFile, REL_SUCCESS) 中这样做了。我只是没有把代码放在正确的位置,还是我仍然错过了标记?
  • 您必须对所有流文件进行处理 - transferremove。可能您忘记删除原始流文件...
  • @daggett 修改了 op,但我添加了 session.remove(flowFile),但它说它已经标记为转移到其他地方,但我看不到除了块之外的位置
  • originalFlowFile = session.write(flowFile, PyStreamCallback()) 我认为应该是originalFlowFile = session.write(originalFlowFile, PyStreamCallback())

标签: python-3.x apache-nifi


【解决方案1】:

【讨论】:

  • 这看起来像我需要的,但我不知道 Groovy,所以我不确定如何在 Python 中实现类似的东西。此外,我的实际 JSON 文件非常大,当通过其他处理器/尝试运行时会导致内存错误——将其加载到列表中会导致内存错误,还是会因为它在流中而被否定?跨度>
  • 在 python 中编写代码。只需注意在提供的示例中如何使用会话和流文件即可。
猜你喜欢
  • 1970-01-01
  • 2021-02-03
  • 2018-04-12
  • 1970-01-01
  • 1970-01-01
  • 2018-02-27
  • 2022-09-27
  • 2018-01-11
  • 1970-01-01
相关资源
最近更新 更多