【问题标题】:Batch processing flowfiles in apache nifiapache nifi中的批处理流文件
【发布时间】:2019-04-26 10:12:55
【问题描述】:

我编写了自定义 nifi 处理器,它尝试批处理输入流文件。

但是,它的行为似乎不如预期。这是发生的事情:

我在服务器上复制粘贴一些文件。 FethFromServerProcessor 从服务器获取这些文件并将其放入 queue1MyCustomProcessorqueue1 批量读取文件。我在MyCustomProcessor 及其onTrigger() 方法中定义了batchSize 属性,我通过执行以下操作从当前批次中的queue1 获取所有流文件:

session.get(context.getProperty(batchSize).asInteger())

onTrigger() 的第一行创建时间戳并将此时间戳添加到所有流文件中。所以批处理中的所有文件都应该有相同的时间戳。然而,这并没有发生。通常第一个流文件获取一个时间戳,其余流文件获取另一个时间戳。

似乎当FetchFromServerProcessor 从服务器获取第一个文件并将其放入queue1 时,MyCustomProcessor 被触发并从队列中获取所有文件。顺便说一句,曾经有一个文件,在这批中被选为唯一的文件。当MyCustomProcessor 处理完这个文件时,FetchFromServerProcessor 已经从服务器获取所有文件并将它们放在queue1 中。因此,在处理完第一个文件后,MyCustomProcessor 获取queue1 中的所有文件并形成第二批,而我希望所有文件都在一个批次中拾取。

如何避免形成两个批次?我看到人们在这种情况下讨论等待通知:12。但我无法从这些帖子中快速理解。有人可以给我最小的步骤来使用等待通知处理器来实现这一点,或者有人可以指出我的最小教程,它给出了使用等待通知处理器的分步过程?还有等待通知模式标准方法来解决我解释的批处理相关问题吗?还是有其他标准方法可以完成这项工作?

【问题讨论】:

  • 您能否描述一种逻辑来确定队列不完整?因为它可能完全不同。从你的话听起来你只需要添加一些延迟和回滚,以防你的文件太“年轻”session.get(n)
  • 这是什么FetchFromServerProcessor
  • FetchFromServerProcessor 可以是从远程服务器获取文件的任何东西,比如 SFTP 服务器或 Amazon S3,因此需要时间一个一个地获取文件并将它们放入 queue1
  • 我目前没有逻辑来确定队列是否不完整。问题是它应该是什么逻辑?我不能依赖一些延迟,对吧? (因为我无法确定通过网络获取当前批次中的所有文件需要多长时间)。我可以将放在服务器上的文件数作为流文件的属性,但是我该如何使用它呢?
  • 看来你需要改变 FetchFromServerProcessor 的逻辑...因为只有这个处理器知道它什么时候结束...

标签: apache-nifi


【解决方案1】:

听起来好像这个批量大小是传入流文件到CustomProcessor 所需的计数,所以为什么不写你的CustomProcessor#onTrigger() 如下:

@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    final ComponentLog logger = getLogger();
    // Try to get n flowfiles from incoming queue
    final Integer desiredFlowfileCount = context.getProperty(batchSize).asInteger();
    final int queuedFlowfileCount = session.getQueueSize().getObjectCount();
    if (queuedFlowfileCount < desiredFlowfileCount) {
        // There are not yet n flowfiles queued up, so don't try to run again immediately
        if (logger.isDebugEnabled()) {
            logger.debug("Only {} flowfiles queued; waiting for {}", new Object[]{queuedFlowfileCount, desiredFlowfileCount});
        }
        context.yield();
        return;
    }

    // If we're here, we do have at least n queued flowfiles
    List<FlowFile> flowfiles = session.get(desiredFlowfileCount);

    try {
        // TODO: Perform work on all flowfiles
        flowfiles = flowfiles.stream().map(f -> session.putAttribute(f, "timestamp", "my static timestamp value")).collect(Collectors.toList());
        session.transfer(flowfiles, REL_SUCCESS);

        // If extending AbstractProcessor, this is handled for you and you don't have to explicitly commit
        session.commit();
    } catch (Exception e) {
        logger.error("Helpful error message");
        if (logger.isDebugEnabled()) {
            logger.error("Further stacktrace: ", e);
        }
        // Penalize the flowfiles if appropriate (also done for you if extending AbstractProcessor and an exception is thrown from this method
        session.rollback(true);
        //  --- OR ---
        // Transfer to failure if they can't be retried
        session.transfer(flowfiles, REL_FAILURE);
    }
}

如果不熟悉 Java 8 stream 语法,可以用这个替换:

        for (int i = 0; i < flowfiles.size(); i++) {
            // Write the same timestamp value onto all flowfiles
            FlowFile f = flowfiles.get(i);
            flowfiles.set(i, session.putAttribute(f, "timestamp", "my timestamp value"));
        }

semantics between penalization(告诉处理器延迟对特定流文件执行工作)和让步(告诉处理器等待一段时间以再次尝试执行任何工作)很重要。

您可能还希望在您的自定义处理器上使用@TriggerSerially annotation,以确保您没有运行多个线程,从而可能出现竞争条件。

【讨论】:

  • 在你的回答中重新扩展 Java 8 的内容多么有礼貌,Andy。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-14
  • 2019-06-13
  • 2016-09-21
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多