【问题标题】:How can I timestamp messages in nifi?如何在 nifi 中为消息添加时间戳?
【发布时间】:2021-01-06 23:18:02
【问题描述】:

免责声明:我对 nifi 一无所知。

我需要从ListenHTTP处理器接收消息,然后将每条消息转换成带时间戳的json消息。

所以,假设我在凌晨 5 点收到消息 hello world。它应该将其转换为{"timestamp": "5 am", "message":"hello world"}

我该怎么做?

【问题讨论】:

    标签: apache-nifi


    【解决方案1】:

    每个流文件都有属性,它们是存储在内存中键/值对中的元数据片段(可用于快速读/写)。发生任何操作时,NiFi 框架都会将元数据片段写入与流文件相关的 provenance 事件,有时还会写入流文件本身。例如,如果ListenHTTP 是流中的第一个处理器,则任何进入流的流文件都将具有属性entryDate,其起源时间的值采用Thu Jan 24 15:53:52 PST 2019 格式。您可以使用各种处理器(即UpdateAttributeRouteOnAttribute 等)读取和写入这些属性。

    对于您的用例,您可以在 ListenHTTP 处理器之后立即使用 ReplaceText 处理器,搜索值为 (?s)(^.*$)(整个流文件内容,或“您通过 HTTP 调用收到的内容”)和替换{"timestamp_now":"${now():format('YYYY-MM-dd HH:mm:ss.SSS Z')}", "timestamp_ed": "${entryDate:format('YYYY-MM-dd HH:mm:ss.SSS Z')}", "message":"$1"} 的值。

    上面的例子提供了两个选项:

    1. entryDate 是流文件通过ListenHTTP 处理器生成的时间
    2. now() 函数获取自纪元以来的当前时间戳(以毫秒为单位)

    这两个值可能会因性能/排队/等而略有不同。在我的简单示例中,它们相隔 2 毫秒。您可以使用format() 方法和普通的Java time format syntax 格式化它们,因此您可以使用h a(完整示例:now():format('h a'):toLower())获得“5 am”。

    示例

    • ListenHTTP 在端口 9999 上运行,路径为 contentListener
    • ReplaceText如上
    • LogAttribute 带有日志有效负载 true

    卷曲命令:curl -d "helloworld" -X POST http://localhost:9999/contentListener

    示例输出:

    2019-01-24 16:04:44,529 INFO [Timer-Driven Process Thread-6] o.a.n.processors.standard.LogAttribute LogAttribute[id=8246b0a0-0168-1000-7254-2c2e43d136a7] logging for flow file StandardFlowFileRecord[uuid=5e1c6d12-298d-4d9c-9fcb-108c208580fa,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1548374015429-1, container=default, section=1], offset=3424, length=122],offset=0,name=5e1c6d12-298d-4d9c-9fcb-108c208580fa,size=122]
    --------------------------------------------------
    Standard FlowFile Attributes
    Key: 'entryDate'
        Value: 'Thu Jan 24 16:04:44 PST 2019'
    Key: 'lineageStartDate'
        Value: 'Thu Jan 24 16:04:44 PST 2019'
    Key: 'fileSize'
        Value: '122'
    FlowFile Attribute Map Content
    Key: 'filename'
        Value: '5e1c6d12-298d-4d9c-9fcb-108c208580fa'
    Key: 'path'
        Value: './'
    Key: 'restlistener.remote.source.host'
        Value: '127.0.0.1'
    Key: 'restlistener.remote.user.dn'
        Value: 'none'
    Key: 'restlistener.request.uri'
        Value: '/contentListener'
    Key: 'uuid'
        Value: '5e1c6d12-298d-4d9c-9fcb-108c208580fa'
    --------------------------------------------------
    {"timestamp_now":"2019-01-24 16:04:44.518 -0800", "timestamp_ed": "2019-01-24 16:04:44.516 -0800", "message":"helloworld"}
    

    【讨论】:

      【解决方案2】:

      所以,我用这段代码添加了一个ExecuteScript 处理器:

      import org.apache.commons.io.IOUtils
      import java.nio.charset.StandardCharsets
      import java.time.LocalDateTime
      
      flowFile = session.get()
      if(!flowFile)return
      def text = ''
      // Cast a closure with an inputStream parameter to InputStreamCallback
      session.read(flowFile, {inputStream ->
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        // Do something with text here
      } as InputStreamCallback)
      
      
      def outputMessage = '{\"timestamp\":\"' + LocalDateTime.now().toString() + '\", \"message:\":\"' + text + '\"}'
      
      flowFile = session.write(flowFile, {inputStream, outputStream ->
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        outputStream.write(outputMessage.getBytes(StandardCharsets.UTF_8))
      } as StreamCallback)
      
      session.transfer(flowFile, REL_SUCCESS)
      

      它成功了。

      【讨论】:

        猜你喜欢
        • 2017-11-12
        • 1970-01-01
        • 2013-09-19
        • 2018-10-30
        • 1970-01-01
        • 2013-09-16
        • 1970-01-01
        • 1970-01-01
        • 2017-11-07
        相关资源
        最近更新 更多