1. Introduction to MQ
1. MQ purpose
1. Current limiting and peak clipping
mq can temporarily store the excessive requests of the system so that the system can process them slowly later, so as to avoid the loss of requests or the collapse of the system.
2. Asynchronous decoupling
The synchronous call between services is changed to MQ asynchronous call. One party sends a message and the other party processes it after receiving the message.
3. Data collection
Distributed systems will generate massive data, such as business logs, monitoring data, user behavior, etc. Collect and summarize these data in real time or batch, and then analyze these data streams with big data. Such data collection through MQ is the best choice.
2. Common MQ comparison
JMS: Java Message Service. Several message types are defined, including simple text (TextMessage), serializable object (ObjectMessage), attribute set (MapMessage), byte stream (BytesMessage), original value stream (StreamMessage), and payload free message (Message)
AMQP: Advanced Message Queuing Protocol. The protocol specifies the exchange switch, queue message queue and binding binding, and provides routing rules between the switch and queue.
3. Introduction to rocketmq
RocketMQ is a unified messaging engine and lightweight data processing platform. It is Alibaba's open source message middleware. On November 28, 2016, you, ah, donated it to apache Software Foundation and became an incubation project of apache.
4. Basic concepts of rocketmq
1. Message
The physical carrier transmitted by the message system is the smallest unit of production and consumption. Each message must belong to a Topic
2. topic
Represents a collection of messages. Each topic contains several messages. Each message can only belong to one topic. It is the basic unit for MQ message subscription.
A producer can send messages of multiple topics at the same time, while a consumer can only consume one Topic.
3. tag
The label set for messages is used to distinguish different types of messages under the same topic. Messages from the same business unit have different labels under the same subject according to the business. Tags can effectively maintain the clarity and consistency of the code and optimize RocketMQ queries. Consumers can consume different sub themes according to different tag s to achieve better expansion.
Topic is the primary classification of messages, and Tag is the secondary classification of messages.
4. queue
The physical entity that stores the message. A topic can contain multiple queues. Messages of the topic are stored in each queue. The queue of a topic is also called the partition of messages in a topic.
Messages in a Topic's Queue can only be consumed by one consumer in a consumer group; Messages in a Queue do not allow multiple consumers in the same consumer group to consume at the same time.
There is also a concept of fragmentation. In RocketMQ, sharding refers to the Broker that stores the corresponding Topic. A corresponding number of partitions, namely queues, will be created in each partition, and the size of each Queue is the same.
5. Message identification
Each message in RocketMQ has a unique MessageId and can carry a key with a business ID for message query. However, it should be noted that there are two messageids: a MessageId (msgId) will be automatically generated when the producer sends () message, and a MessageId (offsetMsgId) will also be automatically generated by the Broker when the message arrives at the Broker. msgId, offsetMsgId and key are all called message IDs.
msgId: generated by the producer side. The generation rule is: producerIp + process PID + hashCode of ClassLoader of messageclientidsetter + current time + AutomicInteger auto increment counter
offsetMsgId: generated by the broker side. The generation rule is: brokerIp + offset of physical partition (offset in queue)
key: the unique identifier of the business specified by the user
5. System architecture
1. Producer
Message producer, responsible for producing messages. Producer selects the corresponding Broker cluster queue for message delivery through the load balancing module of MQ. In the process of message delivery, it supports fast failure and low delay.
Producers appear as Producer groups. A Producer group is a collection of producers that send messages of the same Topic type. A Producer group can send messages of multiple topics at the same time.
2. Consumer
Consumers. Consumers appear in the form of Consumer groups. A Consumer group is a collection of consumers of the same type who consume messages of the same Topic. In terms of message consumption, the Consumer group makes it very easy to realize load balancing (evenly allocate different queues of a Topic to different consumers of the same Consumer group, and note that it is not message load balancing) and fault tolerance (when a Consumer hangs up, other consumers in the group can continue to consume the queues consumed by the original Consumer).
The number of consumers in the Consumer group is less than or equal to the number of queue s subscribing to Topic. If it is exceeded, the extra consumers will not be able to consume the message.
A Topic type message can be consumed by multiple consumer groups.
A consumer group can only consume one Topic type message, and cannot consume multiple Topic type messages at the same time; Consumers in a consumer group must subscribe to the same Topic.
3. nameServer
It is a registry for Broker and Topic routing, and supports the dynamic registration and discovery of brokers. The main functions include:
(1) broker management: receive the registration information and heartbeat of the broker cluster
(2) Routing information management: each NameServer stores the entire routing information of the broker cluster and the queue information for client queries. Producer and Consumer can obtain the routing information of the broker through NameServer for message delivery and consumption.
Supplement: client nameserver selection strategy. Client refers to producers and consumers
When configuring, the client must write the address of the nameserver cluster. When selecting, a random number is generated, and then the index is obtained by taking a modulus with the number of nameserver nodes, and then connect. If the connection fails, the round robin strategy will be implemented. That is, random is used first, and polling is used after failure.
4. broker
Act as a message relay role, responsible for storing and forwarding messages. Store the messages produced by the Producer and prepare for the messages pulled by the Consumer. There are also metadata related to the message, including Consumer group progress offset, topic, queue, etc.
The function diagram of broker is as follows:
ClientManager: client management, which is responsible for receiving and resolving client (Producer/ consumer) requests
Store Service: responsible for storage. API interface is provided to process message storage to physical hard disk and message query
HA Service: highly available. Provides message synchronization between Master Broker and Slave Broker.
Index Service: Index Service. The messages delivered to the Broker are indexed according to the specific Message Key, and a quick query according to the MessageKey is also provided.
Note that the Broker cluster scheme is a master-slave architecture. A master can contain multiple slave. The master handles reading and writing, and the slave is responsible for backing up the data of the master. The corresponding relationship between master and slave is determined by specifying the same broker name and different Broker IDs. The BrokerId is 0, the identity is master, and the non-0 identity is slave. Each Broker establishes long-term connections with all nodes in the NameServer cluster and regularly registers Topic information to all nameservers.
6. Workflow
1. Start nameServer, listen to the specified port and wait for the connection of broker, producer and consumer
2. Start the broker. The broker establishes a long connection with all nodes in the nameserver cluster, and then sends heartbeat packets to the nameserver every 30 s
3. To create a Topic before or when sending a message, you need to write the routing relationship between the Topic and the Broker to the NameServer
4. The producer establishes a long link with a node in the nameServer cluster and obtains routing information from the nameServer, that is, the mapping relationship between the Queue of the sent Topic and the Broker(IP + Port). Then select a Queue according to the algorithm, establish a long connection with the Broker where the Queue is located and send messages. Producer will cache the routing information locally and update it from nameServer every 30 s.
5. Consumer is similar to Producer, except that it also sends heartbeat to broker every 30 s to ensure survival.
2. Stand alone installation
reference resources: https://rocketmq.apache.org/docs/quick-start/
1. Download the source code and compile it
(1) Download source code
(2) After decompression, use mvn to compile and install
mvn -Prelease-all -DskipTests clean install -U
(3) After unpacking the installation to the target directory, you will see an executable file generating% rocketmq-all-4.9.2% \ distribution \ target \ rocketmq-4.9.2 zip
2. Start
We transfer the file to the linux server and start it according to the tutorial. Of course, it can also be started under windows. For convenience, we take linux as an example
(1) Decompress
unzip ./rocketmq-4.9.2.zip
(2) Start the nameserver (if it is started by a virtual machine, you may need to modify the JVM parameters under startup, and modify bin/runserver.sh)
nohup sh bin/mqnamesrv &
(3) Start the broker (if it is started by a virtual machine, you may need to modify the JVM parameters to be started under, and modify bin/runbroker.sh)
nohup sh bin/mqbroker -n localhost:9876 &
jps view started java related processes
[root@localhost rocketmq-4.9.2]# jps -l 8981 org.apache.rocketmq.namesrv.NamesrvStartup 9093 sun.tools.jps.Jps 9017 org.apache.rocketmq.broker.BrokerStartup
(4) Test sending and receiving messages
-Setting environment variables
export NAMESRV_ADDR=localhost:9876
-Send message
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
The relevant logs printed on the final console are as follows:
SendResult [sendStatus=SEND_OK, msgId=7F00000123B70194FA3E0C205C4903E4, offsetMsgId=C0A80D8F00002A9F000000000008C5B6, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost, queueId=3], queueOffset=749] SendResult [sendStatus=SEND_OK, msgId=7F00000123B70194FA3E0C205C4A03E5, offsetMsgId=C0A80D8F00002A9F000000000008C676, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost, queueId=0], queueOffset=749] SendResult [sendStatus=SEND_OK, msgId=7F00000123B70194FA3E0C205C4B03E6, offsetMsgId=C0A80D8F00002A9F000000000008C736, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost, queueId=1], queueOffset=749] SendResult [sendStatus=SEND_OK, msgId=7F00000123B70194FA3E0C205C4D03E7, offsetMsgId=C0A80D8F00002A9F000000000008C7F6, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost, queueId=2], queueOffset=749] . . .
-Receive message
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
The final console logs are as follows:
ConsumeMessageThread_8 Receive New Messages: [MessageExt [brokerName=localhost, queueId=3, storeSize=192, queueOffset=720, sysFlag=0, bornTimestamp=1641216646960, bornHost=/192.168.13.143:49262, storeTimestamp=1641216646963, storeHost=/192.168.13.143:10911, msgId=C0A80D8F00002A9F0000000000086EB6, commitLogOffset=552630, bodyCRC=1191992521, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1641216703248, UNIQ_KEY=7F00000123B70194FA3E0C205AB00370, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 56, 48], transactionId='null'}]] ConsumeMessageThread_20 Receive New Messages: [MessageExt [brokerName=localhost, queueId=3, storeSize=192, queueOffset=719, sysFlag=0, bornTimestamp=1641216646951, bornHost=/192.168.13.143:49262, storeTimestamp=1641216646953, storeHost=/192.168.13.143:10911, msgId=C0A80D8F00002A9F0000000000086BB6, commitLogOffset=551862, bodyCRC=704111923, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1641216703248, UNIQ_KEY=7F00000123B70194FA3E0C205AA6036C, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 55, 54], transactionId='null'}]] . . .
(5) Shut down the Server
The bin/mqshutdown command is used to close name server or broker
[root@localhost rocketmq-4.9.2]# sh bin/mqshutdown broker The mqbroker(9017) is running... Send shutdown request to mqbroker(9017) OK [root@localhost rocketmq-4.9.2]# sh bin/mqshutdown namesrv The mqnamesrv(8981) is running... Send shutdown request to mqnamesrv(8981) OK [2]+ Exit 143 nohup sh bin/mqbroker -n localhost:9876
3. Install Visual web interface
The visual interface can run on windows or linux. You only need to access the nameserver
1. Download
https://github.com/apache/rocketmq-externals/tags Download rocketmq-console-1.0.0
2. Modify the configuration
Download a Springboot source code project and modify the application Properties file, modify server Port and rocketmq config. Namesrvaddr address, as follows:
server.contextPath= server.port=7000 #spring.application.index=true spring.application.name=rocketmq-console spring.http.encoding.charset=UTF-8 spring.http.encoding.enabled=true spring.http.encoding.force=true logging.config=classpath:logback.xml #if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876 rocketmq.config.namesrvAddr=192.168.13.143:9876 #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true rocketmq.config.isVIPChannel= #rocketmq-console's data path:dashboard/monitor rocketmq.config.dataPath=/tmp/rocketmq-console/data #set it false if you don't want use dashboard.default true rocketmq.config.enableDashBoardCollect=true
3. Perform packaging
Package in the source pom Directory:
mvn clean package -DskipTests=true
4. Just run the jar package generated under the target directory
java -jar rocketmq-console-ng-1.0.0.jar
5. After startup, you can access it through the browser
To view a message:
So far, you have simply completed the stand-alone installation of rocketMQ.