【发布时间】:2013-11-15 06:23:35
【问题描述】:
我正在进行的一个项目需要从 SQS 读取消息,我决定使用 Akka 来分发这些消息的处理。
由于 Camel 支持 SQS,并且在 Consumer 类中内置了用于 Akka 中使用的功能,我认为最好以这种方式实现端点并读取消息,尽管我没有看到很多人的例子这样做。
我的问题是我无法足够快地轮询队列以保持队列为空或接近空。我最初的想法是,我可以让消费者以 X/s 的速率通过 Camel 从 SQS 接收消息。从那里,我可以简单地创建更多的消费者来达到我需要处理消息的速度。
我的消费者:
import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}
class MyConsumer() extends Consumer {
def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
var count = 0
def receive = {
case msg: CamelMessage => {
count += 1
}
case _ => {
println("Got something else")
}
}
override def postStop(){
println("Count for actor: " + count)
}
}
如图所示,我设置了 delay=1 和 &maxMessagesPerPoll=10 来提高消息速率,但我无法使用同一端点生成多个消费者。
我在文档中读到 By default endpoints are assumed not to support multiple consumers.,我相信这也适用于 SQS 端点,因为产生多个消费者只会给我一个消费者,在运行系统一分钟后,输出消息是 Count for actor: x而不是其他输出Count for actor: 0。
如果这有用的话;在单个消费者上使用此当前实现,我每秒可以读取大约 33 条消息。
这是从 Akka 的 SQS 队列中读取消息的正确方法吗?如果是这样,有没有办法让它向外扩展,以便我可以将消息消耗率提高到接近 900 条消息/秒?
【问题讨论】:
标签: scala apache-camel akka amazon-sqs