生产者
<?php $exchangeName = "sendMessage"; $queueName = "sendMessageQueue"; $routeKey = "sendMessageKey"; $message = "hello rabbitmq message!"; $connection = new AMQPConnection([\'host\'=>\'127.0.0.1\',\'port\'=>\'5672\',\'vhost\'=>\'/\',\'login\'=>\'guest\',\'password\'=>\'guest\']); $connection->connect() or die(\'can not connect broker server !\'); $channel = new AMQPChannel($connection); //$channel->setPerfetchCount(1); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); $queue->bind($exchangeName, $routeKey); for($i=0;$i<100;$i++) { $exchange->publish($message, $routeKey); } $connection->disconnect();
消费者
<?php $exchangeName = \'sendMessage\'; $queueName="sendMessageQueue"; $routeKey = \'sendMessageKey\'; $connection = new AMQPConnection([\'host\'=>\'127.0.0.1\',\'port\'=>\'5672\',\'vhost\'=>\'/\',\'login\'=>\'guest\',\'password\'=>\'guest\']); $connection->connect() or die(\'can not connect broker server !\'); $channel = new AMQPChannel($connection); //$channel->setPerfetchCount(1); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange(); $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); $queue->bind($exchangeName, $routeKey); echo "message ........"; while(true){ $queue->consume(\'dealMsg\'); } $connection->disconnect(); function dealMsg($event, $q){ $msg = $event->getBody(); echo $msg."\r\n"; $q->ack($event->getDeliveryTag()); }