PHP+RabbitMQ complete code for message queuing

Why RabbitMq instead of ActiveMq or RocketMq?
First of all, from a business point of view, I do not require 100% acceptance rate of messages, and I need to develop in combination with php. RabbitMq has a lower latency (subtle level) than RocketMq. As for ActiveMq, it seems that there are many problems. RabbitMq has good support for various languages, so RabbitMq is chosen.
First, install RabbitMQ corresponding to PHP. Here we use PHP ﹣ AMQP to implement different extension methods with slight differences

PHP extension address:
 For details, please refer to the official website 

config.php configuration information

BaseMQ.php MQ base class

ProductMQ.php producer class

ConsumerMQ.php consumer class

Consumer2MQ.php consumer 2 (multiple possible)


return [
 //To configure
 'host' => [
  'host' => '',
  'port' => '5672',
  'login' => 'guest',
  'password' => 'guest',
 'routes' => [],


 * Created by PhpStorm.
 * User: pc
 * Date: 2019/07/13
 * Time: 14:11

namespace MyObjSummary\rabbitMQ;

/** Member
 *  AMQPChannel
 *  AMQPConnection
 *  AMQPEnvelope
 *  AMQPExchange
 *  AMQPQueue
 * Class BaseMQ
 * @package MyObjSummary\rabbitMQ
class BaseMQ
 /** MQ Channel
  * @var \AMQPChannel
 public $AMQPChannel ;

 /** MQ Link
  * @var \AMQPConnection
 public $AMQPConnection ;

 /** MQ Envelope
  * @var \AMQPEnvelope
 public $AMQPEnvelope ;

 /** MQ Exchange
  * @var \AMQPExchange
 public $AMQPExchange ;

 /** MQ Queue
  * @var \AMQPQueue
 public $AMQPQueue ;

 /** conf
  * @var
 public $conf ;

 /** exchange
  * @var
 public $exchange ;

 /** link
  * BaseMQ constructor.
  * @throws \AMQPConnectionException
 public function __construct()
  $conf = require 'config.php' ;
   throw new \AMQPConnectionException('config error!');
  $this->conf  = $conf['host'] ;
  $this->exchange = $conf['exchange'] ;
  $this->AMQPConnection = new \AMQPConnection($this->conf);
  if (!$this->AMQPConnection->connect())
   throw new \AMQPConnectionException("Cannot connect to the broker!\n");

  * close link
 public function close()

 /** Channel
  * @return \AMQPChannel
  * @throws \AMQPConnectionException
 public function channel()
  if(!$this->AMQPChannel) {
   $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);
  return $this->AMQPChannel;

 /** Exchange
  * @return \AMQPExchange
  * @throws \AMQPConnectionException
  * @throws \AMQPExchangeException
 public function exchange()
  if(!$this->AMQPExchange) {
   $this->AMQPExchange = new \AMQPExchange($this->channel());
  return $this->AMQPExchange ;

 /** queue
  * @return \AMQPQueue
  * @throws \AMQPConnectionException
  * @throws \AMQPQueueException
 public function queue()
  if(!$this->AMQPQueue) {
   $this->AMQPQueue = new \AMQPQueue($this->channel());
  return $this->AMQPQueue ;

 /** Envelope
  * @return \AMQPEnvelope
 public function envelope()
  if(!$this->AMQPEnvelope) {
   $this->AMQPEnvelope = new \AMQPEnvelope();
  return $this->AMQPEnvelope;


//Producer P
namespace MyObjSummary\rabbitMQ;
require 'BaseMQ.php';
class ProductMQ extends BaseMQ
 private $routes = ['hello','word']; //Routing key

  * ProductMQ constructor.
  * @throws \AMQPConnectionException
 public function __construct()

 /** Only control whether the sending is successful and the consumer does not receive it
  * @throws \AMQPChannelException
  * @throws \AMQPConnectionException
  * @throws \AMQPExchangeException
 public function run()
  $channel = $this->channel();
  //Create switch object
  $ex = $this->exchange();
  //Message content
  $message = 'product message '.rand(1,99999);
  //Start business
  $sendEd = true ;
  foreach ($this->routes as $route) {
   $sendEd = $ex->publish($message, $route) ;
   echo "Send Message:".$sendEd."\n";
  if(!$sendEd) {
  $channel->commitTransaction(); //Submission of affairs
  die ;
 (new ProductMQ())->run();
}catch (\Exception $exception){
 var_dump($exception->getMessage()) ;


//Consumer C
namespace MyObjSummary\rabbitMQ;
require 'BaseMQ.php';
class ConsumerMQ extends BaseMQ
 private $q_name = 'hello'; //Team name
 private $route = 'hello'; //Routing key

  * ConsumerMQ constructor.
  * @throws \AMQPConnectionException
 public function __construct()

 /** Accept message if the reconnect is terminated, there will be a message
  * @throws \AMQPChannelException
  * @throws \AMQPConnectionException
  * @throws \AMQPExchangeException
  * @throws \AMQPQueueException
 public function run()

  //Create switch
  $ex = $this->exchange();
  $ex->setType(AMQP_EX_TYPE_DIRECT); //direct type
  $ex->setFlags(AMQP_DURABLE); //Persistence
  //echo "Exchange Status:".$ex->declare()."\n";

  //Create queue
  $q = $this->queue();
  $q->setFlags(AMQP_DURABLE); //Persistence
  //echo "Message Total:".$q->declareQueue()."\n";

  //Bind switch and queue, and specify routing key
  echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n";

  //Blocking mode receive message
  echo "Message:\n";
   $q->consume(function ($envelope,$queue){
    $msg = $envelope->getBody();
    echo $msg."\n"; //Processing message
    $queue->ack($envelope->getDeliveryTag()); //Send ACK response manually
   //$Q - > consume ('processmessage ', AMQP ﹐ autoack); / / Auto ACK reply
 (new ConsumerMQ)->run();
}catch (\Exception $exception){
 var_dump($exception->getMessage()) ;

phper always encounters some problems and bottlenecks when it is advanced. There is no sense of direction when it writes too much business code. It needs to increase salary but I don't know where to start to improve. I collated some data about this, including but not limited to: distributed architecture, high scalability, high performance, high concurrency, server performance tuning, TP6, laravel, YII2, Redis, Swoole, Kafka, Mysql optimization , shell script, Docker, microservice, Nginx and other advanced dry goods can be shared for free Please poke here.

Keywords: PHP RabbitMQ PhpStorm Laravel

Added by Liquid Fire on Sun, 03 Nov 2019 13:38:00 +0200