【问题标题】:Logstash - input file plugin to keep data in memoryLogstash - 将数据保存在内存中的输入文件插件
【发布时间】:2021-02-14 02:07:48
【问题描述】:

我有 1- 一个 CSV 文件和 2- 一个实时 KAFKA 流。 KAFKA 流引入了实时流日志,CSV 文件包含元数据记录,我需要将它们与流日志连接起来,然后再将它们发送到 Elastic Search。

Kafka 流日志和 CSV 记录示例:

KAFKA log: MachineID: 2424, MachineType: 1, MessageType: 9
CSV record: MachineID: 2424, MachineOwner: JohnDuo

记录我需要在发送到 ES 之前在 logstash 中构建:

MachineID: 2424
MachineOwner: JohnDuo
MachineType: 1
MessageType: 9

我想要一个解决方案,一个 Ruby 或 Logstash 插件或其他任何东西来读取这个 CSV 文件一次 并将它们引入并加入到 Logstash conf 文件中。我需要保留内容 CSV 文件在内存中的数量,否则 CSV 在每个实时 Kafka 日志上的查找都会影响我的 Logstash 性能。

【问题讨论】:

    标签: ruby elasticsearch join apache-kafka logstash


    【解决方案1】:

    试试translate 过滤器。

    你会需要这样的东西。

    filter {
        translate {
            dictionary_path => "/path/to/your/csv/file.csv"
            field => "[MachineId]"
            destination => "[MachineOwner]"
            fallback => "not found"
        }
    }
    

    然后你在你的file.csv你将拥有以下内容。

    2424,JohnDuo
    2425,AnotherUser
    

    对于每个具有MachineId 字段的事件,此过滤器将在字典中查找此 id,如果找到匹配项,它将创建一个名为 MachineOwner 的字段,其中包含匹配项的值,如果它找不到匹配项,它将创建字段MachineOwner,其值为not found,如果您不想在不匹配的情况下创建该字段,您可以删除fallback选项。

    字典在 logstash 启动时加载到内存中,并且每 300 秒重新加载一次,您也可以更改该行为。

    【讨论】:

    • 如果我的 CSV 文件有多个字段,例如 MachineID、MachineOwner、地址、电话等,我可以在一个过滤器中创建多个目标字段吗
    猜你喜欢
    • 1970-01-01
    • 2014-07-24
    • 1970-01-01
    • 1970-01-01
    • 2013-12-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-06-16
    相关资源
    最近更新 更多