【发布时间】:2020-06-09 13:49:24
【问题描述】:
我编写了一个 Python/Jython 脚本以在 NiFi 的 ExecuteScript 处理器中运行来解析我的无效 JSON 文档。我根据this question 中的脚本和 Matt Burgess 的精彩cookbook 编写了下面的脚本,但它没有返回多个流文件。相反,它返回应用了正则表达式更正的输入流文件,但仅作为一个文件。我需要修改什么来为循环中的每一行返回 1 个流文件?
脚本
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import re
# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
# regex out invalid escapes -- WORKS
text = re.sub(r"[(\\)]", "", text)
# split out each line into a separate file -- DOES NOT WORK
for t in text.splitlines():
outputStream.write(t)
# end class
flowFile = session.get()
if(flowFile != None):
try:
flowFile = session.write(flowFile, PyStreamCallback())
session.transfer(flowFile, REL_SUCCESS)
except Exception as e:
log.error('Something went wrong', e)
session.transfer(flowFile, REL_FAILURE)
# implicit return at the end
json -- 目标是每一行 == 一个流文件
{"fruit":"apple", "vegetable":"celery", "location":{"country":"nor\\way", "city":"oslo"}, "color":"blue"}
{"fruit":"cherry", "vegetable":"kale", "location":{"country":"france", "city":"calais"}, "color":"green"}
{"fruit":"peach", "vegetable":"peas", "location":{"country":"united\\kingdom", "city":"london"}, "color":"yellow"}
ETA 添加了session.create() 并根据this 删除了带有session.remove(flowFile) 的原始流文件,但NiFi 说流文件未定义?
# imports not changed
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
text = re.sub(r"[(\\)]", "", text)
flowfiles_list = []
for t in text.splitlines():
flowFile = session.create()
if (flowFile != None):
flowFile = session.write(flowfile, t)
flowfiles_list.append(flowFile)
for flow in flowfiles_list:
session.transfer(flow, REL_SUCCESS)
originalFlowFile = session.get()
if(originalFlowFile != None):
# NiFi says flowFile not defined
originalFlowFile = session.write(flowFile, PyStreamCallback())
session.remove(originalFlowFile)
【问题讨论】:
-
来自同一本食谱检查 session.create() 示例community.cloudera.com/t5/Community-Articles/…
-
@daggett 我试图合并
session.create()(添加到原始帖子)但我收到一个错误,即未指定传输关系,尽管我认为我在session.transfer(new_flowFile, REL_SUCCESS)中这样做了。我只是没有把代码放在正确的位置,还是我仍然错过了标记? -
您必须对所有流文件进行处理 -
transfer或remove。可能您忘记删除原始流文件... -
@daggett 修改了 op,但我添加了
session.remove(flowFile),但它说它已经标记为转移到其他地方,但我看不到除了块之外的位置 -
originalFlowFile = session.write(flowFile, PyStreamCallback())我认为应该是originalFlowFile = session.write(originalFlowFile, PyStreamCallback())