【问题标题】:How to listen to aws sqs queue using laravel如何使用 laravel 收听 aws sqs 队列
【发布时间】:2021-12-18 22:16:44
【问题描述】:

我想听一个 sqs 队列并使用了这个例子https://github.com/docusign/connect-php-worker-aws

这就是我所拥有的

class UpdateOrderDispatchedAtListener
{
    private bool $restart = false;
    private array $checkLogQ = [];

    /**
     * Create the event listener.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Handle the event.
     *
     * @return void
     */
    public function handle()
    {
        $sqsClient = new SqsClient(
            [
                'credentials' => ['key' => env('SQS_KEY'), 'secret' => env('SQS_SECRET')],
                'region' => env('SQS_REGION'),
                'version' => '2012-11-05'
            ]
        );

        $this->restart = true;
        $this->listenForever($sqsClient);
    }

    private function listenForever($sqsClient)
    {
        while (true) {
            if ($this->restart) {
                print(date('Y/m/d H:i:s')." Starting queue worker\n");
                $this->restart = false;
                $this->startQueue($sqsClient);
            }
            sleep(5);
        }
    }

    /**
     * Maintain the array checkLogQ as a FIFO buffer with length 4.
     * When a new entry is added, remove oldest entry and shuffle.
     * @param $message
     */
    private function addCheckLog($message)
    {
        $length = 4;
        // If checkLogQ size is smaller than 4 add the message
        if (count($this->checkLogQ) < $length) {
            $this->checkLogQ[] = $message;
        } // If checkLogQ size is bigger than 4 - Remove the oldest message and add the new one
        else {
            array_shift($this->checkLogQ);
            $this->checkLogQ[] = $message;
        }
    }

    /**
     * Prints all checkLogQ messages to the console
     */
    private function printCheckLogQ()
    {
        foreach ($this->checkLogQ as $message) {
            print($message);
        }
        $this->checkLogQ = []; // reset a variable to an empty array
    }

    /**
     * Receive and wait for messages from queue
     */
    private function startQueue($sqsClient)
    {
        try {
            while (true) {
                $this->addCheckLog(date('Y/m/d H:i:s')." Awaiting a message...\n");

                // Receive messages from queue, maximum waits for 20 seconds for message
                // receive_request - contain all the queue messages

                $receive_request = $sqsClient->receiveMessage(array(
                                                                  'QueueUrl' => env('SQS_PREFIX').'/order_items',
                                                                  'WaitTimeSeconds' => 20,
                                                                  'MaxNumberOfMessages' => 10
                                                              ));
                // Count the amount of messages received
                $msgCount = 0;
                if ($receive_request->getPath('Messages') !== null) {
                    $msgCount = count($receive_request->getPath('Messages'));
                }
                $this->addCheckLog(date('Y/m/d H:i:s')." found $msgCount message(s)\n");
                // If at least one message has been received
                if ($msgCount != 0) {
                    $this->printCheckLogQ();
                    foreach ($receive_request->getPath('Messages') as $msg) {
                        $this->messageHandle($sqsClient, $msg, $receive_request);
                    }
                }
            }
        } catch (\Exception $e) {
            $this->printCheckLogQ();
            print(date('Y/m/d H:i:s')." Queue receive error: $e");
            sleep(5);
            $this->restart = true;
        }
    }

    /**
     * @param $sqsClient
     * @param $message
     * @param $receive_request
     */
    private function messageHandle($sqsClient, $message, $receive_request)
    {
        //if (env("DEBUG") === "true") {
        $messageId = $message['MessageId'];
        print(date('Y/m/d H:i:s')." Processing message id: $messageId \n");
        //}
        try {
            // Creates a Json object from the message body
            $body = json_decode($message['Body']);
        } catch (\Exception $e) {
            $body = false;
        }
        if ($body) {
            $test = $body->{'test'};
            $xml = $body->{'xml'};
            process($test, $xml);
        } else {
            print(date('Y/m/d H:i:s')." Null or bad body in message id $messageId. Ignoring. \n");
        }

        $receive_request = $sqsClient->deleteMessage(array(
                                                         'QueueUrl' => env('SQS_PREFIX').'/order_items',
                                                         'ReceiptHandle' => $message['ReceiptHandle']
                                                     ));
    }
}

如何让 laravel 在没有事件的情况下收听?这篇文章提到我可以,但我不知道如何https://laravel.io/forum/02-03-2014-help-with-setting-up-a-queue-listener

【问题讨论】:

    标签: laravel listener amazon-sqs


    【解决方案1】:

    所有 aws 客户端都使用 Guzzle 客户端,请尝试听 Guzzle https://laravel.com/docs/8.x/http-client#events

    【讨论】:

      猜你喜欢
      • 2016-10-07
      • 1970-01-01
      • 1970-01-01
      • 2019-02-14
      • 2014-03-23
      • 2017-10-04
      • 1970-01-01
      • 1970-01-01
      • 2016-08-18
      相关资源
      最近更新 更多