【问题标题】:Kafka aggregate single log event lines to a combined log eventKafka 将单个日志事件行聚合到一个组合日志事件中
【发布时间】:2017-04-25 04:46:09
【问题描述】:

我正在使用 Kafka 来处理日志事件。我具备 Kafka Connect 和 Kafka Streams 的基本知识,可用于简单的连接器和流转换。

现在我有一个具有以下结构的日志文件:

timestamp event_id event

一个日志事件有多个由 event_id 连接的日志行(例如邮件日志)

例子:

1234 1 START
1235 1 INFO1
1236 1 INFO2
1237 1 END

而且通常有多个事件:

例子:

1234 1 START
1234 2 START
1235 1 INFO1
1236 1 INFO2
1236 2 INFO3
1237 1 END
1237 2 END

时间窗口(START 和 END 之间)最长可达 5 分钟。

因此我想要一个类似的主题

event_id combined_log

例子:

1 START,INFO1,INFO2,END
2 START,INFO2,END

实现这一目标的正确工具是什么?我试图用 Kafka Streams 解决它,但我可以弄清楚如何..

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    在您的用例中,您实际上是在基于消息有效负载重建会话或事务。目前还没有对此类功能的内置、即用型支持。但是,您可以使用 Kafka 的 Streams API 的处理器 API 部分自己实现此功能。您可以编写自定义处理器,使用状态存储来跟踪对于给定键的会话/事务何时开始、添加和结束。

    邮件列表中的一些用户一直在做 IIRC,尽管我不知道我可以指出你的现有代码示例。

    您需要注意的是正确处理乱序数据。在上面的示例中,您按正确顺序列出了所有输入数据:

    1234 1 START
    1234 2 START
    1235 1 INFO1
    1236 1 INFO2
    1236 2 INFO3
    1237 1 END
    1237 2 END
    

    但实际上,消息/记录可能会乱序到达,就像这样(我只显示带有键 1 的消息以简化示例):

    1234 1 START
    1237 1 END
    1236 1 INFO2
    1235 1 INFO1
    

    即使发生这种情况,我知道在您的用例中,您仍然希望将此数据解释为:START -> INFO1 -> INFO2 -> END 而不是 START -> END(忽略/删除 INFO1INFO2 = 数据丢失)或 @987654328 @(不正确的顺序,可能也违反了你的语义约束)。

    【讨论】:

    • 感谢您的回答,我将看看处理器 API。是的,也应该考虑订单问题。
    • @imehl:也许你想用一些关于你最终为解决问题所做的工作的信息来更新你的问题,既然你找到了解决方案?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多