【问题标题】:Delayed queue / message processing in StormStorm 中的延迟队列/消息处理
【发布时间】:2013-05-16 17:50:27
【问题描述】:

在我的Storm 拓扑中,在处理流时,我想将一些消息的处理延迟到未来的某个时间点。这样做有哪些合理的选择?

到目前为止,我想到了以下几点:

  • 使用 Java 的 Thread.sleep。 (但是,根据一些讨论,这不是有效利用 Storm 资源的推荐方式。)
  • 使用延迟队列...
  • Storm 是否有一些 API 用于延迟我忽略的消息?
  • ZeroMQ 是否提供 Storm(如果已修改)可以利用的延迟消息传递 API?

【问题讨论】:

  • 您能说明一下您为什么要这样做吗?如果你还没有准备好处理这些东西,你为什么要把它传递给你的风暴拓扑?
  • 我的第一反应:为什么要问为什么?理解或回答问题重要吗?延迟(或重新安排)元组有用的原因有很多。我的元组处理不仅仅是纯粹的数据功能转换。就我而言,处理元组涉及捕获系统外部事物的状态并与其他流集成。由于它会随着时间而变化,因此我想以受控的时间间隔捕获该状态。其中一项要求是不要过于频繁地消耗外部资源。

标签: java clojure apache-storm


【解决方案1】:

我们正在使用拓扑标记元组来批量处理待处理的元组。它基本上只是将它们存储在每个普通元组的内存中,当它接收到一个滴答元组时,它使用批量/流水线处理将它们处理成存储/索引。

我们还使用 redis 的情况下,我们有巨大的卷峰值,如果卷峰值检测到所有元组重定向到每个主机上的本地 redis 存储,然后在卷消失后被推回拓扑处理。我们的情况可能不适用于您的情况,仅适用于我的 2c。

【讨论】:

    【解决方案2】:

    使用外部消息队列实现延时队列。

    由于 Storm 具有容错性和水平分布,因此选择适合该样式的消息队列是有意义的,例如:

    • 卡夫卡
    • 亚马逊 SQS
    • RabbitMQ

    【讨论】:

      猜你喜欢
      • 2014-06-10
      • 1970-01-01
      • 1970-01-01
      • 2010-10-02
      • 1970-01-01
      • 2017-05-05
      • 2015-11-06
      • 2019-10-16
      • 1970-01-01
      相关资源
      最近更新 更多