【问题标题】:Apache Camel - aggregate messages based on bodyApache Camel - 基于正文聚合消息
【发布时间】:2017-08-26 21:47:04
【问题描述】:

我目前正在使用 apache camel 来使用 SQS 消息,并且一切正常。

作为一个项目的一部分,我在上传文件时使用 S3 通知事件。这些文件被上传到一个可预测的 S3 密钥 - (<type>/<account-id>/<id>/<file>)

处理时,我使用骆驼将消息聚合到单个交换中(等待 10 条消息,或 1 秒后超时)。我想知道,但有没有一种方法可以基于 S3 进行聚合 - 例如,聚合具有相同类型或 id 的消息。

根据我从阅读骆驼文档中了解到的情况,有一些方法可以查询 Json 有效负载或标头值 - 这是一种可能的方法吗(因为 S3 事件通知是 Json 消息,并且根据 AWS 文档,PUT 操作将只生成一个记录条目)?还是我需要实现自己的聚合器?

添加一些上下文 - 我有一个收集数据并将数据上传到 S3 的服务。然后,另一个服务将在收到通知时下载此数据,对其进行处理并上传到另一个存储桶。如果我可以聚合 S3 通知,我就可以合并数据并上传它,从而减少上传和 API 调用等的数量。

【问题讨论】:

    标签: java amazon-s3 apache-camel


    【解决方案1】:

    如果您使用camel-aws s3 组件,那么您可以从消息的CamelAwsS3Key 标头中访问/获取 S3 密钥,并且您不需要查询正文,但您需要从 S3 密钥中提取必填字段.

    【讨论】:

    • 是的,我已经检查了 s3 组件,我遇到的问题是我无法删除已处理的文件(其他应用程序可能需要它),并且需要跟踪哪些文件已被处理投票后
    • 该组件有一个参数可以禁止删除已处理文件:deleteAfterRead=false。 “需要跟踪哪些文件已被处理”是什么意思?如果您要过滤掉重复项,则可以使用 idempotent repositoryCamelAwsS3Key 作为键。
    • 是的,这些文件将保留用于备份和其他应用程序处理 - 存储库看起来像一个解决方案,但我试图避免额外的基础设施等,并且会担心随着时间的推移性能。如果不需要保留文件,这可能是最好的解决方案(因为我已经在使用 Camel 进行 SQS,所以我也有一些经验,这是一个奖励)。
    【解决方案2】:

    虽然不是最好或最通用的解决方案,但我确实找到了一种方法来解决这个问题 -

    我只是添加了一个额外的处理器,它在传递给聚合器之前被调用。处理器只检查 S3 键的事件记录(因为我正在侦听来自 S3 的 PUT 事件,根据 AWS 文档,应该只有一个记录)并在消息上设置标头。

    然后聚合器能够根据这些标头(简单地说 S3-TypeS3-Account-IdS3-Id 组合 Exchange em>)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-01-30
      • 2018-03-19
      • 1970-01-01
      • 2014-01-01
      相关资源
      最近更新 更多