Application scenario
Why use it? What's the advantage? This should be put at the beginning to say that only when you know what it is and what it is suitable for, can you better integrate it with your own projects. Where to use it and where to learn it? Learning it doesn't mean learning it doesn't mean we won't. We should usually consider more such questions: what project functions can be combined with xx technology? How about this xx technology in this business scenario? Instead of "what can I do after learning this xx technology? The company doesn't use it now, and it's useless after learning it". It must be painful to learn xx technology with such a mood.
We all know that some time-consuming operations are not done first, but buried first, and then processed asynchronously. In this way, users can not feel the time-consuming operations such as sending e-mails and texting. Because the buried point is over, the operation is over, and the consumption queue is done on the server. It is mainly used in SMS or email notification, accessing the third-party interface to subscribe to messages, and some seconds killing activities in the mall can be completed in combination with queues.
Beanstalkd introduction
Beanstalkd is a high-performance, lightweight distributed memory queue, C code, typical memcached like design, protocol and usage are the same style, so users who have used memcached will feel beanstalkd is familiar.
The original design intention of beanstalkd is to execute more time-consuming requests asynchronously under high concurrent network requests, return results in time, and reduce the response delay of requests.
Ubuntu installation
sudo apt-get install beanstalkd
configuration file
vim /etc/default/beanstalkd
View status
service beanstalkd status # command echoing # root@:/www/server/php/72/etc# service beanstalkd status ● beanstalkd.service - Simple, fast work queue Loaded: loaded (/lib/systemd/system/beanstalkd.service; enabled; vendor preset: enabled) Active: active (running) since Tue 2018-10-16 10:42:28 CST; 6 days ago Docs: man:beanstalkd(1) Main PID: 7033 (beanstalkd) Tasks: 1 (limit: 4634) CGroup: /system.slice/beanstalkd.service └─7033 /usr/bin/beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd Oct 16 10:42:28 ip-10-93-2-137 systemd[1]: Started Simple, fast work queue.
Configure connectivity + persistence
ip allows all connections with 0.0.0.0. The connection is constrained by the configuration of security groups or firewalls. The - b parameter is released (there is no persistence by default). The queue messages in memory can land on the hard disk binlog for persistence, and the queue messages can be read again after power failure.
vim /etc/default/beanstalkd BEANSTALKD_LISTEN_ADDR=0.0.0.0 BEANSTALKD_LISTEN_PORT=11300 BEANSTALKD_EXTRA="-b /var/lib/beanstalkd"
beanstalkd task status
management tool
I have personally tested many beanstalkd tools that can be found on the Internet. These two tools are my favorite, one is command line, the other is web.
Command line: https://github.com/src-d/beanstool
web interface: https://github.com/ptrofimov/beanstalk_console
Programming language client
PHP client
https://packagist.org/packages/pda/pheanstalk
composer require pda/pheanstalk
Write job
<?php //Create queue message require_once('./vendor/autoload.php'); use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk('127.0.0.1',11300); $tubeName = 'email_list'; $jobData = [ 'email' => '123456@163.com', 'message' => 'Hello World !!', 'dtime' => date('Y-m-d H:i:s'), ]; $pheanstalk->useTube( $tubeName)->put( json_encode( $jobData ) );
Consumption job
<?php ini_set('default_socket_timeout', 86400*7); ini_set( 'memory_limit', '256M' ); // Consumption queue message require_once('./vendor/autoload.php'); use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk('127.0.0.1',11300); $tubeName = 'email_list'; while ( true ) { // Get queue information, reserve block get $job = $pheanstalk->watch( $tubeName )->ignore( 'default' )->reserve(); if ( $job !== false ) { $data = $job->getData(); /* TODO Logical operation */ /* Finish processing, delete job */ $pheanstalk->delete( $job ); } }
The default socket timeout parameter must be added. php is generally 60s by default. If you don't set it in the code, the default is 60s. If no job is generated within 60s, the script will report socket error. What I wrote is 7-day timeout. You can adjust it according to the business. Remember to configure it. There are many consumer s searched on the Internet None of the scripts have this configured and can't be used in the production environment at all. This is the result of my own practice.
As for whether or not the while true is a dead loop, I'll tell you clearly that it's a dead loop, but it won't be executed as it consumes performance all the time. It will block here in reserve, and it will not go down until a message is generated, so you can use it safely. In my project code, I use the method call method itself to implement the loop.
This is the code for reference:
public function watchJob() { $job = $this->pheanstalk->watch( config( 'tube' ) )->ignore( 'default' )->reserve(); if ( $job !== false ) { $job_data = $job->getData(); $this->subscribe( $job_data ); $this->pheanstalk->delete( $job ); /* Continue to Watch next job */ $this->watchJob(); } else { $this->log->error( 'reserve false', 'reserve false' ); } }
Monitoring beanstalkd status
<?php //Monitoring service status require_once('./vendor/autoload.php'); use Pheanstalk\Pheanstalk; $pheanstalk = new Pheanstalk('127.0.0.1',11300); $isAlive = $pheanstalk->getConnection()->isServiceListening(); var_dump( $isAlive );
You can cooperate with email to make an alarm email. The script will execute every minute. If the status is false, send an email alarm to the administrator.
Some related orders
View the memory usage of beanstalkd service
top -u beanstalkd
Run the consumer script in the background
nohup php googlehome_subscribe.php &
View the run time of consumer script
ps -A -opid,stime,etime,args | grep consumer.php
Restart the consumer script manually
ps auxf|grep 'googlehome_subscribe.php'|grep -v grep|awk '{print $2}'|xargs kill -9 nohup php googlehome_subscribe.php &
Some summary
php should open the error log to facilitate the collection of the user script crash log. If the script runs out of some fatal errors, it must be fixed in time, because once there is an error, it will hang up, which will affect the availability of your script. After the later stability, you can use the supervisor process management procedure to control the script life cycle.
Some network request operations must try to catch all errors. Once no catch arrives, the script will crash. I used Guzzle to make network requests. Here are some errors of my catch. The code snippet is for reference.
try { /* TODO: Logical operation */ } catch ( ClientException $e ) { $results['mid'] = $this->mid; $results['code'] = $e->getResponse()->getStatusCode(); $results['reason'] = $e->getResponse()->getReasonPhrase(); $this->log->error( 'properties-changed ClientException', $results ); } catch ( ServerException $e ) { $results['mid'] = $this->mid; $results['code'] = $e->getResponse()->getStatusCode(); $results['reason'] = $e->getResponse()->getReasonPhrase(); $this->log->error( 'properties-changed ServerException', $results ); } catch ( ConnectException $e ) { $results['mid'] = $this->mid; $this->log->error( 'properties-changed ConnectException', $results ); }
After a job is consumed, it must be deleted. If it is not deleted for a long time, the php client will return false because of the timeout error "deadline" and "soon". Therefore, after processing the task, you must remember to delete it. Unlike kafka, beanstalkd requires developers to delete the job themselves.