【问题标题】:Spring boot Poller for AWS SQSAWS SQS 的 Spring Boot Poller
【发布时间】:2021-03-06 09:17:23
【问题描述】:

在固定间隔后连续调用方法的最佳方式是什么?

我想设计一个轮询器,它可以在定义的时间间隔后自动从 AWS SQS 中提取消息。

非常感谢任何好的建议。

【问题讨论】:

  • “固定间隔后连续方法”是什么意思?你能再解释一下,你想达到什么目标?

标签: amazon-web-services spring-boot amazon-sqs


【解决方案1】:

有两种轮询机制短轮询 - 如果您希望更频繁地获取数据,如果您期望数据不那么频繁,则需要长轮询。 你应该使用上述混合的东西,即 在 10ms 的超时时间内递归拉取, 如果拉取包含任何消息(成功拉取)继续以相同的速度轮询,否则 将超时更改为 5000 毫秒。 示例:

//timeout is in ms
timeout = 10;
function pullFromSQS() {
  message = sqs.pull();
  if (message.length) {
    processMessage(message);
    timeout = 10;
  } else {
    timeout = 5000;
  }
  wait(timeout);
  pullFromSQS();
}

您可以根据自己的方便更改超时以获得更好的优化(成本和性能)

【讨论】:

    【解决方案2】:

    您可以使用 AWS 提供的 SDK 进行消息轮询

    import java.util.List;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import com.amazonaws.AmazonClientException;
    import com.amazonaws.auth.profile.ProfileCredentialsProvider;
    import com.amazonaws.regions.Regions;
    import com.amazonaws.services.sqs.AmazonSQS;
    import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
    import com.amazonaws.services.sqs.model.Message;
    import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
    
    public class SQSRealtimePoller implements Runnable {
    
    
    public static final int MAX_MESSAGES = 10;
    
    public static final int DEFAULT_VISIBILITY_TIMEOUT = 15;
    
    //Value greater that 0 makes it long polling, which will reduce SQS cost
    public static final int WAIT_TIME = 20;
    public static final int PROCESSORS = 2;
    
    ExecutorService executor = Executors.newFixedThreadPool(1);
    private String queueUrl;
    
    private AmazonSQS amazonSqs;
    
    ArrayBlockingQueue<Message> messageHoldingQueue = new ArrayBlockingQueue<Message>(
            1);
    
    public SQSRealtimePoller(String topic, String queueUrl
           ) {
        this.queueUrl = queueUrl;
        this.amazonSqs = getSQSClient();
        messageHoldingQueue = new ArrayBlockingQueue<Message>(PROCESSORS);
        //process more than 1 messages at a time. 
        executor = Executors.newFixedThreadPool(PROCESSORS);
    
    }
    
    @Override
    public void run() {
        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
                .withQueueUrl(queueUrl)
                .withMaxNumberOfMessages(MAX_MESSAGES)
                .withVisibilityTimeout(DEFAULT_VISIBILITY_TIMEOUT)
                .withWaitTimeSeconds(WAIT_TIME);
        while(true){
            try {
                List<Message> messages = amazonSqs
                        .receiveMessage(receiveMessageRequest).getMessages();
                if (messages == null || messages.size() == 0) {
                    // If there were no messages during this poll period, SQS
                    // will return this list as null. Continue polling.
                    continue;
                } else {
                    for (Message message : messages) {
                        try {
                            //will wait here till the queue has free space to add new messages. Read documentation
                            messageHoldingQueue.put(message);
                        } catch (InterruptedException e) {
    
                        }
                        Runnable run = new Runnable() {
                            @Override
                            public void run() {
    
                                try {
                                    Message messageToProcess = messageHoldingQueue
                                            .poll();
                                   //Process your message here 
                                    
                                    System.out.println(messageToProcess);
                                    
                                    //Delete the messages from queue
                                        amazonSqs.deleteMessage(queueUrl,
                                                messageToProcess
                                                        .getReceiptHandle());
                                    
                                } catch (Exception e) {
                                           e.printStackTrace();
                                }
                            }
                        };
                        executor.execute(run);
        }}
                }       catch (Exception e) {
                    e.printStackTrace();
                    }
        
    }
    }
    
    //Make this singleton
    public static AmazonSQS getSQSClient(){
        ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider();
        try {
            credentialsProvider.getCredentials();
        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. " +
                    "Please make sure that your credentials file is at the correct " +
                    "location , and is in valid format.",
                    e);
        }
    
        AmazonSQS sqs = AmazonSQSClientBuilder.standard()
                               .withCredentials(credentialsProvider)
                               .withRegion(Regions.US_WEST_2)
                               .build();
        return sqs;
    }}
    

    【讨论】:

      猜你喜欢
      • 2016-09-14
      • 2017-09-26
      • 1970-01-01
      • 2020-01-23
      • 2017-09-17
      • 2021-10-15
      • 1970-01-01
      • 2023-01-25
      • 2018-01-24
      相关资源
      最近更新 更多