PHP+RabbitMQ complete code for message queuing

Why RabbitMq instead of ActiveMq or RocketMq?
First of all, from a business point of view, I do not require 100% acceptance rate of messages, and I need to develop in combination with php. RabbitMq has a lower latency (subtle level) than RocketMq. As for ActiveMq, it seems that there are many problems. RabbitMq has good support for various languages, so RabbitMq is chosen.
First, install RabbitMQ corresponding to PHP. Here we use PHP ﹣ AMQP to implement different extension methods with slight differences

PHP extension address: http://pecl.php.net/package/amqp
 For details, please refer to the official website http://www.rabbitmq.com/getstarted.html 

introduce
config.php configuration information

BaseMQ.php MQ base class

ProductMQ.php producer class

ConsumerMQ.php consumer class

Consumer2MQ.php consumer 2 (multiple possible)

config.php

<?php
return [
 //To configure
 'host' => [
  'host' => '127.0.0.1',
  'port' => '5672',
  'login' => 'guest',
  'password' => 'guest',
  'vhost'=>'/',
 ],
 //Switch
 'exchange'=>'word',
 //Route
 'routes' => [],
];

BaseMQ.php

<?php
/**
 * Created by PhpStorm.
 * User: pc
 * Date: 2019/07/13
 * Time: 14:11
 */

namespace MyObjSummary\rabbitMQ;

/** Member
 *  AMQPChannel
 *  AMQPConnection
 *  AMQPEnvelope
 *  AMQPExchange
 *  AMQPQueue
 * Class BaseMQ
 * @package MyObjSummary\rabbitMQ
 */
class BaseMQ
{
 /** MQ Channel
  * @var \AMQPChannel
  */
 public $AMQPChannel ;

 /** MQ Link
  * @var \AMQPConnection
  */
 public $AMQPConnection ;

 /** MQ Envelope
  * @var \AMQPEnvelope
  */
 public $AMQPEnvelope ;

 /** MQ Exchange
  * @var \AMQPExchange
  */
 public $AMQPExchange ;

 /** MQ Queue
  * @var \AMQPQueue
  */
 public $AMQPQueue ;

 /** conf
  * @var
  */
 public $conf ;

 /** exchange
  * @var
  */
 public $exchange ;

 /** link
  * BaseMQ constructor.
  * @throws \AMQPConnectionException
  */
 public function __construct()
 {
  $conf = require 'config.php' ;
  if(!$conf)
   throw new \AMQPConnectionException('config error!');
  $this->conf  = $conf['host'] ;
  $this->exchange = $conf['exchange'] ;
  $this->AMQPConnection = new \AMQPConnection($this->conf);
  if (!$this->AMQPConnection->connect())
   throw new \AMQPConnectionException("Cannot connect to the broker!\n");
 }

 /**
  * close link
  */
 public function close()
 {
  $this->AMQPConnection->disconnect();
 }

 /** Channel
  * @return \AMQPChannel
  * @throws \AMQPConnectionException
  */
 public function channel()
 {
  if(!$this->AMQPChannel) {
   $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);
  }
  return $this->AMQPChannel;
 }

 /** Exchange
  * @return \AMQPExchange
  * @throws \AMQPConnectionException
  * @throws \AMQPExchangeException
  */
 public function exchange()
 {
  if(!$this->AMQPExchange) {
   $this->AMQPExchange = new \AMQPExchange($this->channel());
   $this->AMQPExchange->setName($this->exchange);
  }
  return $this->AMQPExchange ;
 }

 /** queue
  * @return \AMQPQueue
  * @throws \AMQPConnectionException
  * @throws \AMQPQueueException
  */
 public function queue()
 {
  if(!$this->AMQPQueue) {
   $this->AMQPQueue = new \AMQPQueue($this->channel());
  }
  return $this->AMQPQueue ;
 }

 /** Envelope
  * @return \AMQPEnvelope
  */
 public function envelope()
 {
  if(!$this->AMQPEnvelope) {
   $this->AMQPEnvelope = new \AMQPEnvelope();
  }
  return $this->AMQPEnvelope;
 }
}

ProductMQ.php

<?php
//Producer P
namespace MyObjSummary\rabbitMQ;
require 'BaseMQ.php';
class ProductMQ extends BaseMQ
{
 private $routes = ['hello','word']; //Routing key

 /**
  * ProductMQ constructor.
  * @throws \AMQPConnectionException
  */
 public function __construct()
 {
  parent::__construct();
 }

 /** Only control whether the sending is successful and the consumer does not receive it
  * @throws \AMQPChannelException
  * @throws \AMQPConnectionException
  * @throws \AMQPExchangeException
  */
 public function run()
 {
  //channel
  $channel = $this->channel();
  //Create switch object
  $ex = $this->exchange();
  //Message content
  $message = 'product message '.rand(1,99999);
  //Start business
  $channel->startTransaction();
  $sendEd = true ;
  foreach ($this->routes as $route) {
   $sendEd = $ex->publish($message, $route) ;
   echo "Send Message:".$sendEd."\n";
  }
  if(!$sendEd) {
   $channel->rollbackTransaction();
  }
  $channel->commitTransaction(); //Submission of affairs
  $this->close();
  die ;
 }
}
try{
 (new ProductMQ())->run();
}catch (\Exception $exception){
 var_dump($exception->getMessage()) ;
}

ConsumerMQ.php

<?php
//Consumer C
namespace MyObjSummary\rabbitMQ;
require 'BaseMQ.php';
class ConsumerMQ extends BaseMQ
{
 private $q_name = 'hello'; //Team name
 private $route = 'hello'; //Routing key

 /**
  * ConsumerMQ constructor.
  * @throws \AMQPConnectionException
  */
 public function __construct()
 {
  parent::__construct();
 }

 /** Accept message if the reconnect is terminated, there will be a message
  * @throws \AMQPChannelException
  * @throws \AMQPConnectionException
  * @throws \AMQPExchangeException
  * @throws \AMQPQueueException
  */
 public function run()
 {

  //Create switch
  $ex = $this->exchange();
  $ex->setType(AMQP_EX_TYPE_DIRECT); //direct type
  $ex->setFlags(AMQP_DURABLE); //Persistence
  //echo "Exchange Status:".$ex->declare()."\n";

  //Create queue
  $q = $this->queue();
  //var_dump($q->declare());exit();
  $q->setName($this->q_name);
  $q->setFlags(AMQP_DURABLE); //Persistence
  //echo "Message Total:".$q->declareQueue()."\n";

  //Bind switch and queue, and specify routing key
  echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n";

  //Blocking mode receive message
  echo "Message:\n";
  while(True){
   $q->consume(function ($envelope,$queue){
    $msg = $envelope->getBody();
    echo $msg."\n"; //Processing message
    $queue->ack($envelope->getDeliveryTag()); //Send ACK response manually
   });
   //$Q - > consume ('processmessage ', AMQP ﹐ autoack); / / Auto ACK reply
  }
  $this->close();
 }
}
try{
 (new ConsumerMQ)->run();
}catch (\Exception $exception){
 var_dump($exception->getMessage()) ;
}

phper always encounters some problems and bottlenecks when it is advanced. There is no sense of direction when it writes too much business code. It needs to increase salary but I don't know where to start to improve. I collated some data about this, including but not limited to: distributed architecture, high scalability, high performance, high concurrency, server performance tuning, TP6, laravel, YII2, Redis, Swoole, Kafka, Mysql optimization , shell script, Docker, microservice, Nginx and other advanced dry goods can be shared for free Please poke here.

Keywords: PHP RabbitMQ PhpStorm Laravel

Added by Liquid Fire on Sun, 03 Nov 2019 13:38:00 +0200