Why RabbitMq instead of ActiveMq or RocketMq?
First of all, from a business point of view, I do not require 100% acceptance rate of messages, and I need to develop in combination with php. RabbitMq has a lower latency (subtle level) than RocketMq. As for ActiveMq, it seems that there are many problems. RabbitMq has good support for various languages, so RabbitMq is chosen.
First, install RabbitMQ corresponding to PHP. Here we use PHP ﹣ AMQP to implement different extension methods with slight differences
PHP extension address: http://pecl.php.net/package/amqp For details, please refer to the official website http://www.rabbitmq.com/getstarted.html
introduce
config.php configuration information
BaseMQ.php MQ base class
ProductMQ.php producer class
ConsumerMQ.php consumer class
Consumer2MQ.php consumer 2 (multiple possible)
config.php
<?php return [ //To configure 'host' => [ 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/', ], //Switch 'exchange'=>'word', //Route 'routes' => [], ];
BaseMQ.php
<?php /** * Created by PhpStorm. * User: pc * Date: 2019/07/13 * Time: 14:11 */ namespace MyObjSummary\rabbitMQ; /** Member * AMQPChannel * AMQPConnection * AMQPEnvelope * AMQPExchange * AMQPQueue * Class BaseMQ * @package MyObjSummary\rabbitMQ */ class BaseMQ { /** MQ Channel * @var \AMQPChannel */ public $AMQPChannel ; /** MQ Link * @var \AMQPConnection */ public $AMQPConnection ; /** MQ Envelope * @var \AMQPEnvelope */ public $AMQPEnvelope ; /** MQ Exchange * @var \AMQPExchange */ public $AMQPExchange ; /** MQ Queue * @var \AMQPQueue */ public $AMQPQueue ; /** conf * @var */ public $conf ; /** exchange * @var */ public $exchange ; /** link * BaseMQ constructor. * @throws \AMQPConnectionException */ public function __construct() { $conf = require 'config.php' ; if(!$conf) throw new \AMQPConnectionException('config error!'); $this->conf = $conf['host'] ; $this->exchange = $conf['exchange'] ; $this->AMQPConnection = new \AMQPConnection($this->conf); if (!$this->AMQPConnection->connect()) throw new \AMQPConnectionException("Cannot connect to the broker!\n"); } /** * close link */ public function close() { $this->AMQPConnection->disconnect(); } /** Channel * @return \AMQPChannel * @throws \AMQPConnectionException */ public function channel() { if(!$this->AMQPChannel) { $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection); } return $this->AMQPChannel; } /** Exchange * @return \AMQPExchange * @throws \AMQPConnectionException * @throws \AMQPExchangeException */ public function exchange() { if(!$this->AMQPExchange) { $this->AMQPExchange = new \AMQPExchange($this->channel()); $this->AMQPExchange->setName($this->exchange); } return $this->AMQPExchange ; } /** queue * @return \AMQPQueue * @throws \AMQPConnectionException * @throws \AMQPQueueException */ public function queue() { if(!$this->AMQPQueue) { $this->AMQPQueue = new \AMQPQueue($this->channel()); } return $this->AMQPQueue ; } /** Envelope * @return \AMQPEnvelope */ public function envelope() { if(!$this->AMQPEnvelope) { $this->AMQPEnvelope = new \AMQPEnvelope(); } return $this->AMQPEnvelope; } }
ProductMQ.php
<?php //Producer P namespace MyObjSummary\rabbitMQ; require 'BaseMQ.php'; class ProductMQ extends BaseMQ { private $routes = ['hello','word']; //Routing key /** * ProductMQ constructor. * @throws \AMQPConnectionException */ public function __construct() { parent::__construct(); } /** Only control whether the sending is successful and the consumer does not receive it * @throws \AMQPChannelException * @throws \AMQPConnectionException * @throws \AMQPExchangeException */ public function run() { //channel $channel = $this->channel(); //Create switch object $ex = $this->exchange(); //Message content $message = 'product message '.rand(1,99999); //Start business $channel->startTransaction(); $sendEd = true ; foreach ($this->routes as $route) { $sendEd = $ex->publish($message, $route) ; echo "Send Message:".$sendEd."\n"; } if(!$sendEd) { $channel->rollbackTransaction(); } $channel->commitTransaction(); //Submission of affairs $this->close(); die ; } } try{ (new ProductMQ())->run(); }catch (\Exception $exception){ var_dump($exception->getMessage()) ; }
ConsumerMQ.php
<?php //Consumer C namespace MyObjSummary\rabbitMQ; require 'BaseMQ.php'; class ConsumerMQ extends BaseMQ { private $q_name = 'hello'; //Team name private $route = 'hello'; //Routing key /** * ConsumerMQ constructor. * @throws \AMQPConnectionException */ public function __construct() { parent::__construct(); } /** Accept message if the reconnect is terminated, there will be a message * @throws \AMQPChannelException * @throws \AMQPConnectionException * @throws \AMQPExchangeException * @throws \AMQPQueueException */ public function run() { //Create switch $ex = $this->exchange(); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct type $ex->setFlags(AMQP_DURABLE); //Persistence //echo "Exchange Status:".$ex->declare()."\n"; //Create queue $q = $this->queue(); //var_dump($q->declare());exit(); $q->setName($this->q_name); $q->setFlags(AMQP_DURABLE); //Persistence //echo "Message Total:".$q->declareQueue()."\n"; //Bind switch and queue, and specify routing key echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n"; //Blocking mode receive message echo "Message:\n"; while(True){ $q->consume(function ($envelope,$queue){ $msg = $envelope->getBody(); echo $msg."\n"; //Processing message $queue->ack($envelope->getDeliveryTag()); //Send ACK response manually }); //$Q - > consume ('processmessage ', AMQP ﹐ autoack); / / Auto ACK reply } $this->close(); } } try{ (new ConsumerMQ)->run(); }catch (\Exception $exception){ var_dump($exception->getMessage()) ; }
phper always encounters some problems and bottlenecks when it is advanced. There is no sense of direction when it writes too much business code. It needs to increase salary but I don't know where to start to improve. I collated some data about this, including but not limited to: distributed architecture, high scalability, high performance, high concurrency, server performance tuning, TP6, laravel, YII2, Redis, Swoole, Kafka, Mysql optimization , shell script, Docker, microservice, Nginx and other advanced dry goods can be shared for free Please poke here.