【发布时间】:2021-12-18 22:16:44
【问题描述】:
我想听一个 sqs 队列并使用了这个例子https://github.com/docusign/connect-php-worker-aws
这就是我所拥有的
class UpdateOrderDispatchedAtListener
{
private bool $restart = false;
private array $checkLogQ = [];
/**
* Create the event listener.
*
* @return void
*/
public function __construct()
{
//
}
/**
* Handle the event.
*
* @return void
*/
public function handle()
{
$sqsClient = new SqsClient(
[
'credentials' => ['key' => env('SQS_KEY'), 'secret' => env('SQS_SECRET')],
'region' => env('SQS_REGION'),
'version' => '2012-11-05'
]
);
$this->restart = true;
$this->listenForever($sqsClient);
}
private function listenForever($sqsClient)
{
while (true) {
if ($this->restart) {
print(date('Y/m/d H:i:s')." Starting queue worker\n");
$this->restart = false;
$this->startQueue($sqsClient);
}
sleep(5);
}
}
/**
* Maintain the array checkLogQ as a FIFO buffer with length 4.
* When a new entry is added, remove oldest entry and shuffle.
* @param $message
*/
private function addCheckLog($message)
{
$length = 4;
// If checkLogQ size is smaller than 4 add the message
if (count($this->checkLogQ) < $length) {
$this->checkLogQ[] = $message;
} // If checkLogQ size is bigger than 4 - Remove the oldest message and add the new one
else {
array_shift($this->checkLogQ);
$this->checkLogQ[] = $message;
}
}
/**
* Prints all checkLogQ messages to the console
*/
private function printCheckLogQ()
{
foreach ($this->checkLogQ as $message) {
print($message);
}
$this->checkLogQ = []; // reset a variable to an empty array
}
/**
* Receive and wait for messages from queue
*/
private function startQueue($sqsClient)
{
try {
while (true) {
$this->addCheckLog(date('Y/m/d H:i:s')." Awaiting a message...\n");
// Receive messages from queue, maximum waits for 20 seconds for message
// receive_request - contain all the queue messages
$receive_request = $sqsClient->receiveMessage(array(
'QueueUrl' => env('SQS_PREFIX').'/order_items',
'WaitTimeSeconds' => 20,
'MaxNumberOfMessages' => 10
));
// Count the amount of messages received
$msgCount = 0;
if ($receive_request->getPath('Messages') !== null) {
$msgCount = count($receive_request->getPath('Messages'));
}
$this->addCheckLog(date('Y/m/d H:i:s')." found $msgCount message(s)\n");
// If at least one message has been received
if ($msgCount != 0) {
$this->printCheckLogQ();
foreach ($receive_request->getPath('Messages') as $msg) {
$this->messageHandle($sqsClient, $msg, $receive_request);
}
}
}
} catch (\Exception $e) {
$this->printCheckLogQ();
print(date('Y/m/d H:i:s')." Queue receive error: $e");
sleep(5);
$this->restart = true;
}
}
/**
* @param $sqsClient
* @param $message
* @param $receive_request
*/
private function messageHandle($sqsClient, $message, $receive_request)
{
//if (env("DEBUG") === "true") {
$messageId = $message['MessageId'];
print(date('Y/m/d H:i:s')." Processing message id: $messageId \n");
//}
try {
// Creates a Json object from the message body
$body = json_decode($message['Body']);
} catch (\Exception $e) {
$body = false;
}
if ($body) {
$test = $body->{'test'};
$xml = $body->{'xml'};
process($test, $xml);
} else {
print(date('Y/m/d H:i:s')." Null or bad body in message id $messageId. Ignoring. \n");
}
$receive_request = $sqsClient->deleteMessage(array(
'QueueUrl' => env('SQS_PREFIX').'/order_items',
'ReceiptHandle' => $message['ReceiptHandle']
));
}
}
如何让 laravel 在没有事件的情况下收听?这篇文章提到我可以,但我不知道如何https://laravel.io/forum/02-03-2014-help-with-setting-up-a-queue-listener
【问题讨论】:
标签: laravel listener amazon-sqs