【问题标题】:how to transfer flowfiles one by one using a custom processor in NIFI如何使用 NIFI 中的自定义处理器一一传输流文件
【发布时间】:2018-01-20 02:26:03
【问题描述】:

我正在 Nifi v 1.3 中编写自定义处理器

处理器执行从结果集中读取的 SQL 查询并将每一行转换为 json 文档并将其存储到 ArrayList 中,最后它将每 1000 个文档(fetchSize 参数)传输到一个流文件,这对我有用,但它发送所有流文件一次。

我想要的是它在我调用 transferFlowFile 方法时独立传输每个流文件,而无需等待 onTrigger 方法结束一次传输所有内容。

这里是代码:

public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
    FlowFile fileToProcess = null;
    if (context.hasIncomingConnection()) {
       fileToProcess = session.get();
       if (fileToProcess == null && context.hasNonLoopConnection()) {
          return;
       }
    }

    final ResultSet resultSet = st.executeQuery();
    final ResultSetMetaData meta = resultSet.getMetaData();
    final int nrOfColumns = meta.getColumnCount();
    List<Map<String, Object>> documentList = new ArrayList<>();

        while (resultSet.next()) {

           final AtomicLong nrOfRows = new AtomicLong(0L);
           cpt++;

           Map<String, Object> item = new HashMap<>();
           for (int i = 1; i <= nrOfColumns; i++) {
               int javaSqlType = meta.getColumnType(i);
               String nameOrLabel = StringUtils.isNotEmpty(meta.getColumnLabel(i)) ? meta.getColumnLabel(i)
                    : meta.getColumnName(i);
               Object value = null;
               value = resultSet.getObject(i);
               if (value != null) {
                  item.put(nameOrLabel, value.toString());
               }
           }
           Document document = new Document(item);
           documentList.add(document);

           if (fetchSize!=0 && cpt % fetchSize == 0) {
            FlowFile flowFile = session.create();
            transferFlowFile(flowFile, session, documentList, fileToProcess, nrOfRows, stopWatch);
           }
       }

       if (!documentList.isEmpty()) {
          final AtomicLong nrOfRows = new AtomicLong(0L);
          FlowFile flowFile = session.create();
          transferFlowFile(flowFile, session, documentList, fileToProcess, nrOfRows, stopWatch);
       }
}


public void transferFlowFile(FlowFile flowFile, ProcessSession session, List<Map<String, Object>> documentList,
        FlowFile fileToProcess, AtomicLong nrOfRows, StopWatch stopWatch) {

    flowFile = session.write(flowFile, out -> {
        ObjectMapper mapper = new ObjectMapper();
        IOUtils.write(mapper.writeValueAsBytes(documentList), out);
    });

    documentList.clear();

    flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");

    session.getProvenanceReporter().modifyContent(flowFile, "Retrieved " + nrOfRows.get() + " rows",
            stopWatch.getElapsed(TimeUnit.MILLISECONDS));

    session.transfer(flowFile, REL_SUCCESS);
}

【问题讨论】:

    标签: apache-nifi


    【解决方案1】:

    之后拨打session.commit()

    session.transfer(flowFile, REL_SUCCESS)

    自上次提交以来创建的任何流文件,或者如果从未提交过,则从一开始创建的任何流文件都将在提交会话时传输。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-22
      相关资源
      最近更新 更多