【发布时间】:2021-03-06 09:17:23
【问题描述】:
在固定间隔后连续调用方法的最佳方式是什么?
我想设计一个轮询器,它可以在定义的时间间隔后自动从 AWS SQS 中提取消息。
非常感谢任何好的建议。
【问题讨论】:
-
“固定间隔后连续方法”是什么意思?你能再解释一下,你想达到什么目标?
标签: amazon-web-services spring-boot amazon-sqs
在固定间隔后连续调用方法的最佳方式是什么?
我想设计一个轮询器,它可以在定义的时间间隔后自动从 AWS SQS 中提取消息。
非常感谢任何好的建议。
【问题讨论】:
标签: amazon-web-services spring-boot amazon-sqs
有两种轮询机制短轮询 - 如果您希望更频繁地获取数据,如果您期望数据不那么频繁,则需要长轮询。 你应该使用上述混合的东西,即 在 10ms 的超时时间内递归拉取, 如果拉取包含任何消息(成功拉取)继续以相同的速度轮询,否则 将超时更改为 5000 毫秒。 示例:
//timeout is in ms
timeout = 10;
function pullFromSQS() {
message = sqs.pull();
if (message.length) {
processMessage(message);
timeout = 10;
} else {
timeout = 5000;
}
wait(timeout);
pullFromSQS();
}
您可以根据自己的方便更改超时以获得更好的优化(成本和性能)
【讨论】:
您可以使用 AWS 提供的 SDK 进行消息轮询
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
public class SQSRealtimePoller implements Runnable {
public static final int MAX_MESSAGES = 10;
public static final int DEFAULT_VISIBILITY_TIMEOUT = 15;
//Value greater that 0 makes it long polling, which will reduce SQS cost
public static final int WAIT_TIME = 20;
public static final int PROCESSORS = 2;
ExecutorService executor = Executors.newFixedThreadPool(1);
private String queueUrl;
private AmazonSQS amazonSqs;
ArrayBlockingQueue<Message> messageHoldingQueue = new ArrayBlockingQueue<Message>(
1);
public SQSRealtimePoller(String topic, String queueUrl
) {
this.queueUrl = queueUrl;
this.amazonSqs = getSQSClient();
messageHoldingQueue = new ArrayBlockingQueue<Message>(PROCESSORS);
//process more than 1 messages at a time.
executor = Executors.newFixedThreadPool(PROCESSORS);
}
@Override
public void run() {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
.withQueueUrl(queueUrl)
.withMaxNumberOfMessages(MAX_MESSAGES)
.withVisibilityTimeout(DEFAULT_VISIBILITY_TIMEOUT)
.withWaitTimeSeconds(WAIT_TIME);
while(true){
try {
List<Message> messages = amazonSqs
.receiveMessage(receiveMessageRequest).getMessages();
if (messages == null || messages.size() == 0) {
// If there were no messages during this poll period, SQS
// will return this list as null. Continue polling.
continue;
} else {
for (Message message : messages) {
try {
//will wait here till the queue has free space to add new messages. Read documentation
messageHoldingQueue.put(message);
} catch (InterruptedException e) {
}
Runnable run = new Runnable() {
@Override
public void run() {
try {
Message messageToProcess = messageHoldingQueue
.poll();
//Process your message here
System.out.println(messageToProcess);
//Delete the messages from queue
amazonSqs.deleteMessage(queueUrl,
messageToProcess
.getReceiptHandle());
} catch (Exception e) {
e.printStackTrace();
}
}
};
executor.execute(run);
}}
} catch (Exception e) {
e.printStackTrace();
}
}
}
//Make this singleton
public static AmazonSQS getSQSClient(){
ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider();
try {
credentialsProvider.getCredentials();
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location , and is in valid format.",
e);
}
AmazonSQS sqs = AmazonSQSClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(Regions.US_WEST_2)
.build();
return sqs;
}}
【讨论】: