【问题标题】:Consumer Poll Rate with Akka, SQS, and CamelAkka、SQS 和 Camel 的消费者投票率
【发布时间】:2013-11-15 06:23:35
【问题描述】:

我正在进行的一个项目需要从 SQS 读取消息,我决定使用 Akka 来分发这些消息的处理。

由于 Camel 支持 SQS,并且在 Consumer 类中内置了用于 Akka 中使用的功能,我认为最好以这种方式实现端点并读取消息,尽管我没有看到很多人的例子这样做。

我的问题是我无法足够快地轮询队列以保持队列为空或接近空。我最初的想法是,我可以让消费者以 X/s 的速率通过 Camel 从 SQS 接收消息。从那里,我可以简单地创建更多的消费者来达到我需要处理消息的速度。

我的消费者:

import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}

class MyConsumer() extends Consumer {
  def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
  var count = 0

  def receive = {
    case msg: CamelMessage => {
      count += 1
    }
    case _ => {
      println("Got something else")
    }
  }

  override def postStop(){
    println("Count for actor: " + count)
  }
}

如图所示,我设置了 delay=1&maxMessagesPerPoll=10 来提高消息速率,但我无法使用同一端点生成多个消费者。

我在文档中读到 By default endpoints are assumed not to support multiple consumers.,我相信这也适用于 SQS 端点,因为产生多个消费者只会给我一个消费者,在运行系统一分钟后,输出消息是 Count for actor: x而不是其他输出Count for actor: 0

如果这有用的话;在单个消费者上使用此当前实现,我每秒可以读取大约 33 条消息。

这是从 Akka 的 SQS 队列中读取消息的正确方法吗?如果是这样,有没有办法让它向外扩展,以便我可以将消息消耗率提高到接近 900 条消息/秒?

【问题讨论】:

    标签: scala apache-camel akka amazon-sqs


    【解决方案1】:

    遗憾的是 Camel 目前不支持在 SQS 上并行消费消息。

    http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html

    为了解决这个问题,我编写了自己的 Actor 来使用 aws-java-sdk 轮询批处理消息 SQS。

      def receive = {
        case BeginPolling => {
          // re-queue sending asynchronously
          self ! BeginPolling
          // traverse the response
          val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry]
          val messages = sqs.receiveMessage(receiveMessageRequest).getMessages
          messages.toList.foreach {
            node => {
              deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId, node.getReceiptHandle))
              //log.info("Node body: {}", node.getBody)
              filterSupervisor ! node.getBody
            }
          }
          if(deleteEntryList.size() > 0){
            val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName, deleteMessageList)
            sqs.deleteMessageBatch(deleteMessageBatchRequest)
          }
        }
    
        case _ => {
          log.warning("Unknown message")
        }
      }
    

    虽然我不确定这是否是最好的实现,当然可以对其进行改进,以使请求不会经常遇到空队列,但它确实适合我目前能够从同一队列轮询消息的需求排队。

    使用这个从 SQS 获取大约 133(消息/秒)/actor。

    【讨论】:

      【解决方案2】:

      Camel 2.15 支持 concurrentConsumers,但不确定这有多大用处,因为我不知道 akka camel 是否支持 2.15,而且我不知道即使有多个消费者,只有一个 Consumer Actor 是否会产生影响。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2015-09-19
        • 1970-01-01
        • 2020-09-13
        • 1970-01-01
        • 1970-01-01
        • 2016-10-22
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多