【问题标题】:How to set a Message Handler programmatically in Spring Cloud AWS SQS?如何在 Spring Cloud AWS SQS 中以编程方式设置消息处理程序?
【发布时间】:2017-02-06 21:33:10
【问题描述】:

也许有人对我的以下问题有想法:

我目前在一个项目中,我想将 AWS SQS 与 Spring Cloud 集成一起使用。对于接收者部分,我想提供一个 API,用户可以在队列上注册一个“消息处理程序”,这是一个接口,将包含用户的业务逻辑,例如

MyAwsSqsReceiver receiver = new MyAwsSqsReceiver();
receiver.register("a-queue-name", new MessageHandler(){
  @Override
  public void handle(String message){
    //... business logic for the received message
  }
});

我找到了例子,例如 https://codemason.me/2016/03/12/amazon-aws-sqs-with-spring-cloud/ 并阅读文档 http://cloud.spring.io/spring-cloud-aws/spring-cloud-aws.html#_sqs_support

但我在那里发现的唯一“连接”处理传入消息的功能是方法上的注释,例如@SqsListener@MessageMapping

不过,这些注释被固定为某个队列名称。所以现在我不知所措,如何将我提供的“MessageHandler”(来自我的 API)动态“连接”到指定队列名称的传入消息。

在配置示例中有一个SimpleMessageListenerContainer,它得到一个QueueMessageHandler 集,但是这个QueueMessageHandler 似乎没有 成为设置我的处理程序或覆盖其方法并提供我自己的QueueMessageHandler 子类的正确位置。

我已经使用 Spring Amqp 集成和 RabbitMq 做了类似的事情,并认为 AWS SQS 在这里也将类似。

有没有人有想法,如何做到这一点?

谢谢+再见, 西门

编辑

我发现 Spring JMS 实际上可以做到这一点,例如www.javacodegeeks.com/2016/02/aws-sqs-spring-jms-integration.html。有谁知道,使用 JMS 协议的后果是好是坏?

【问题讨论】:

  • 您好,您找到解决问题的方法了吗?我有同样的问题,正在寻找解决方案。谢谢
  • 试试我在@user1167253发布的解决方案

标签: spring rabbitmq spring-cloud amazon-sqs


【解决方案1】:

我也面临同样的问题。

我试图以一种不寻常的方式进行,在构建时设置 Aws 客户端 bean,然后我不使用 sqslistener 注释从特定队列中消费,而是使用我可以以编程方式进行的预定注释 pool(在我的情况下每 10 秒)我想从哪个队列消费。

我做了一个例子,它遍历属性中定义的队列,然后从每个队列中消费。

客户端 Bean:

@Bean
@Primary
public AmazonSQSAsync awsSqsClient() {
    return AmazonSQSAsyncClientBuilder
            .standard()
            .withRegion(Regions.EU_WEST_1.getName())
            .build();
}

消费者:

// injected in the constructor
private final AmazonSQSAsync awsSqsClient;

@Scheduled(fixedDelay = 10000)
public void pool() {
    properties.getSqsQueues()
            .forEach(queue -> {
                val receiveMessageRequest = new ReceiveMessageRequest(queue)
                        .withWaitTimeSeconds(10)
                        .withMaxNumberOfMessages(10);

                // reading the messages
                val result = awsSqsClient.receiveMessage(receiveMessageRequest);
                val sqsMessages = result.getMessages();
                log.info("Received Message on queue {}: message = {}", queue, sqsMessages.toString());

                // deleting the messages
                sqsMessages.forEach(message -> {
                    val deleteMessageRequest = new DeleteMessageRequest(queue, message.getReceiptHandle());
                    awsSqsClient.deleteMessage(deleteMessageRequest);
                });
            });
}

澄清一下,就我而言,我需要多个队列,每个租户一个,每个租户的队列 URL 都在属性文件中传递。当然,在您的情况下,您可以从另一个来源获取队列名称,也许是一个 ThreadLocal,其中包含您在运行时创建的队列。

如果您愿意,您还可以尝试 JMS 方法,在该方法中创建消息消费者并为每个您希望的消费者添加一个侦听器(请参阅文档 Aws Jms documentation)。

【讨论】:

    【解决方案2】:

    当我们使用 Spring 和 SQS 时,我们使用 spring-cloud-starter-aws-messaging。

    然后只需创建一个 Listener 类

    @Component
    public class MyListener {
    
    @SQSListener(value="myqueue")
    public void listen(MyMessageType message) {
    //process the message
    }
    }
    

    【讨论】:

    • 嗨,谢谢回答。不幸的是,这不是我想要的。在我的用例中,我不知道队列的名称 - 它是动态创建的,例如通过 API“createQueue(String name)”调用。之后我想动态设置一个监听器。
    • 哦,好吧,我从问题中并不清楚。我只是想用很少的代码给你一个非常有弹性的方法。我喜欢 Spring 包装 AWS 库的方式,但在我们的用例中,所有队列都是预定义的,因此您可以使用 SPEL 注入队列名称。这种方法不适用于您的情况,因为当您启动应用程序并且 Spring 尝试启动该组件时,它还不知道队列名称(即使您使用它设置了一个 env 变量并使用 SPEL 来访问它) .
    猜你喜欢
    • 2018-03-16
    • 2019-03-22
    • 1970-01-01
    • 2011-05-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-08-19
    相关资源
    最近更新 更多