1. Introduction to rocketmq
1.1MQ introduction
MQ (Message Queue) Message Queue is a queue used to save message data
Queue: a type of data structure characterized by "first in first out"
1.2 what is a message
Service requests between servers
1.3MQ function
1. Application decoupling (asynchronous message sending)
MQ basic working mode
2. Rapid application change maintenance (asynchronous message sending)
3. Traffic clipping (asynchronous message sending)
1.4MQ advantages and disadvantages analysis
Advantages (functions)
shortcoming
1.5MQ product introduction
ActiveMQ: java language implementation, 10000 level data throughput, ms level processing speed, master-slave architecture and high maturity
RabbitMQ: erlang language implementation, 10000 level data throughput, us level processing speed, master-slave architecture,
RocketMQ: java language implementation, 100000 level data throughput, ms level processing speed, distributed architecture, powerful function and strong scalability
kafka: implemented in scala language, with 100000 level data throughput, ms level processing speed, distributed architecture, less functions and more applications for big data
RocketMQ is a very excellent middleware product opened by Alibaba. It was born out of another queue technology MetaQ of Alibaba and donated to Apache foundation. As an incubation technology, RocketMQ has become the top project of Apache foundation in just over a year. Moreover, it has been widely used in Alibaba and has withstood the pressure of the extreme scenario of double 11 for many times (on double 11 in 2017, the message volume of RocketMQ reached trillions and the peak TPS reached 56 million)
Address all shortcomings
2. Environment construction
2.1 basic concepts
2.2 installation
Download: https://www.apache.org/
3. message sending
3.1 development process of message sending and receiving
3.2 single producer and single consumer message sending
Import RocketMQ client coordinates
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
Producer
//1. Create an object Producer that sends messages DefaultMQProducer producer = new DefaultMQProducer("group1"); //2. Set the sending naming server address producer.setNamesrvAddr("192.168.175.130:9876"); //3.1 start sending service producer.start(); //4. Create a message object to send, specify topic and content body Message msg = new Message("topic1","hello rocketmq".getBytes("UTF-8")); //3.2 sending messages SendResult result = producer.send(msg); System.out.println("Return result:"+result); //5. Close the connection producer.shutdown();
Note: turn off the server firewall
systemctl stop firewalld.service
Consumer
//1. Create an object Comsumer to receive messages DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2. Set the accepted naming server address consumer.setNamesrvAddr("192.168.175.130:9876"); //3. Set the topic corresponding to the received message consumer.subscribe("topic1","*"); //3. Enable listening to receive messages consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //Traversal message for (MessageExt messageExt : list) { System.out.println("Received message:"+messageExt); System.out.println("news:"+new String(messageExt.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //4. Start the service to receive messages consumer.start(); System.out.println("The accept message service is running");
3.3 single producer multi consumer message sending
consumer
//1. Create an object Comsumer to receive messages DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2. Set the accepted naming server address consumer.setNamesrvAddr("192.168.175.130:9876"); //3. Set the topic corresponding to the received message consumer.subscribe("topic1","*"); //Set the consumption mode of the current consumer (default mode: load balancing) // consumer.setMessageModel(MessageModel.CLUSTERING); //Set the current consumer's consumption mode to broadcast mode (all messages received by clients are the same) consumer.setMessageModel(MessageModel.BROADCASTING); //3. Enable listening to receive messages consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //Traversal message for (MessageExt messageExt : list) { System.out.println("Received message:"+messageExt); System.out.println("news:"+new String(messageExt.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //4. Start the service to receive messages consumer.start(); System.out.println("The accept message service is running");
Load balancing
Broadcast mode
3.4 multi producer and multi consumer message sending
Messages generated by multiple producers can be consumed by the same consumer or by multiple consumers
3.5 message category
3.5.1 synchronization message
Features: instant, important messages with receipt, such as SMS and notification (transfer succeeded)
public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.175.130:9876"); producer.start(); for (int i=1;i<=5;i++) { Synchronous message sending Message msg = new Message("topic2",("Synchronization message: hello rocketmq"+i).getBytes("UTF-8")); SendResult send = producer.send(msg); System.out.println("Return results"+send); } //Add a sleep operation to ensure that the asynchronous message can be output after it is returned TimeUnit.SECONDS.sleep(10); producer.shutdown(); } }
3.5.2 asynchronous messages
Features: messages with weak immediacy but need receipt, such as some information in the order
(the callback processing result must be executed before the end of the producer process, otherwise the callback cannot be executed correctly)
public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.175.130:9876"); producer.start(); for (int i=1;i<=5;i++) { //Asynchronous message sending Message msg = new Message("topic2",("Asynchronous message: hello rocketmq"+i).getBytes("UTF-8")); producer.send(msg,new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable throwable) { System.out.println(throwable); } }); } //Add a sleep operation to ensure that the asynchronous message can be output after it is returned TimeUnit.SECONDS.sleep(10); producer.shutdown(); } }
3.5.3 single message
Features: messages without receipt, such as log messages
public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.175.130:9876"); producer.start(); for (int i=1;i<=5;i++) { //One way message sending Message msg = new Message("topic2",("Single message: hello rocketmq"+i).getBytes("UTF-8")); producer.sendOneway(msg); } //Add a sleep operation to ensure that the asynchronous message can be output after it is returned TimeUnit.SECONDS.sleep(10); producer.shutdown(); } }
3.5.4 delay message
When sending a message, it is not directly sent to the message server, but arrives according to the set waiting time, which plays the role of buffer for delayed arrival
public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.175.130:9876"); producer.start(); for (int i=1;i<=5;i++) { //Synchronous message sending Message msg = new Message("topic2",("Non delayed message: hello rocketmq"+i).getBytes("UTF-8")); //The effect of the current message delay setting msg.setDelayTimeLevel(3); SendResult send = producer.send(msg); System.out.println("Return results"+send); } producer.shutdown(); } }
Currently supported message times
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
3.5.5 batch messages
public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.175.130:9876"); producer.start(); //Create a collection to hold multiple messages List<Message> msgList = new ArrayList<Message>(); Message msg1 = new Message("topic3",("Batch messages: hello rocketmq111").getBytes("UTF-8")); Message msg2 = new Message("topic3",("Batch messages: hello rocketmq222").getBytes("UTF-8")); Message msg3 = new Message("topic3",("Batch messages: hello rocketmq333").getBytes("UTF-8")); msgList.add(msg1); msgList.add(msg2); msgList.add(msg3); //Batch message sending (the total number of messages sent each time shall not exceed 4M) //The total length of the message contains four pieces of information: topic,body, attributes of the message, and log (20 bytes) SendResult send = producer.send(msgList); System.out.println("Return results"+send); producer.shutdown(); } }
3.5.6 message filtering
Classification filtering
producer
Message msg = new Message("topic6","tag2", ("message filtering according to tag: Hello rocketmq 2") getBytes("UTF-8"));
consumer
//When receiving a message, in addition to setting the topic, you can also specify the received tag, * represents any tag
consumer.subscribe("topic6","tag1 || tag2");
Message filtering
producer
//Add properties to the message
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");
consumer
//The message selector is used to filter the corresponding attributes. The syntax format is SQL like syntax
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
Note: SQL filtering depends on the function support of the server. Add the corresponding function item in the broker configuration file and turn on the corresponding function
enablePropertyFilter=true
Start the server to enable the corresponding profile
sh mqbroker -n localhost:9876 -c ../conf/broker.conf
3.5.7 disordered message sequence
3.5.8 sequence messages
send message
//Set the message to enter the specified message queue
for(final Order order : orderList){
Message msg = new Message("orderTopic",order.toString().getBytes());
/ / specify the corresponding message queue selector when sending
SendResult result = producer.send(msg, new MessageQueueSelector() {
/ / set which message queue to use when sending the current message
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
/ / select different message queues according to the information sent
/ / select an object of the message queue according to the id, and return - > id to get the int value
int mqIndex = order.getId().hashCode() % list.size();
return list.get(mqIndex);
}
}, null);
System.out.println(result);
}
receive messages
//Use the single thread mode to get data from the message queue, and one thread binds to one message queue
consumer.registerMessageListener(new MessageListenerOrderly() {
/ / after using the MessageListenerOrderly interface, the processing of the message queue is served by one message queue and multiple threads
/ /, which is converted into a message queue and a thread service
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for(MessageExt msg : list){
System.out.println("message:" + new String(msg.getBody())));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
3.5.9 transaction messages
Transaction message status
Submission status: it is allowed to enter the queue. This message is no different from non transaction messages
Rollback status: it is not allowed to enter the queue. This message is equivalent to not sent
Intermediate status: the sending of the half message is completed, and no secondary status confirmation is performed for MQ
Note: transaction messages are only related to producers, not consumers
public static void main1(String[] args) throws Exception { //Producer used by transaction messages TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setNamesrvAddr("192.168.175.130:9876"); //Add listening for local transactions producer.setTransactionListener(new TransactionListener() { //Normal transaction process @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { return LocalTransactionState.COMMIT_MESSAGE; } //Transaction compensation process @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { return null; } }); producer.start(); Message msg = new Message("topic7",("Transaction message: hello rocketmq").getBytes("UTF-8")); SendResult send = producer.sendMessageInTransaction(msg,null); System.out.println("Return results"+send); producer.shutdown(); }
public static void main2(String[] args) throws Exception { //Producer used by transaction messages TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setNamesrvAddr("192.168.175.130:9876"); //Add listening for local transactions producer.setTransactionListener(new TransactionListener() { //Normal transaction process @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { return LocalTransactionState.ROLLBACK_MESSAGE; } //Transaction compensation process @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { return null; } }); producer.start(); Message msg = new Message("topic8",("Transaction message: hello rocketmq").getBytes("UTF-8")); SendResult send = producer.sendMessageInTransaction(msg,null); System.out.println("Return results"+send); producer.shutdown(); }
public static void main(String[] args) throws Exception { //Producer used by transaction messages TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setNamesrvAddr("192.168.175.130:9876"); //Add listening for local transactions producer.setTransactionListener(new TransactionListener() { //Normal transaction process @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { return LocalTransactionState.UNKNOW; } //Transaction compensation process @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("Transaction compensation process execution"); return LocalTransactionState.COMMIT_MESSAGE; } }); producer.start(); Message msg = new Message("topic9",("Transaction message: hello rocketmq").getBytes("UTF-8")); SendResult send = producer.sendMessageInTransaction(msg,null); System.out.println("Return results"+send); //The physical compensation process must ensure that the server is running, otherwise normal transaction compensation will not be carried out // producer.shutdown(); }
4. Cluster construction
4.1 rocketmq cluster classification
4.2 rocketmq cluster features
4.3 rocketmq cluster workflow
Step 1: start the NameServer, start listening, and wait for the broker and producer to connect with the consumer
Step 2: start the broker, connect all nameservers according to the configuration information, and maintain a long connection
Step 2: if there is existing data in the broker, NameServer will save the relationship between topic and broker
Step 3: the producer sends a message, connects to a NameServer, and establishes a long connection
Step 4: the producer sends a message
Step 5: producer selects a message queue in the topic of the broker (select from the list)
Step 6: the producer establishes a long connection with the broker to send messages
Step 7: producer sends a message
The workflow of comsumer is the same as that of producer
4.4 construction of dual master and dual slave clusters
5. Advanced features
5.1 message storage
Description: ACK (knowledge character)
be careful:
database
file system
5.2 efficient message storage and read-write mode
5.3 message storage structure
The MQ data storage area contains the following contents
5.4 disc brushing mechanism
Synchronous disk brushing: high security, low efficiency and slow speed (applicable to businesses with high requirements for data security)
Asynchronous disk brushing: low security, high efficiency and high speed (applicable to businesses requiring high data processing speed)
Configuration mode
#Disc brushing mode
#- ASYNC_FLUSH asynchronous brush disk
#- SYNC_FLUSH synchronous brush disc
flushDiskType=SYNC_FLUSH
5.5 high availability
nameserver
Message server
Message production
Message consumption
5.6 master slave data replication
synchronous copy
Asynchronous replication
collocation method
#Role of Broker
#- ASYNC_MASTER asynchronous replication master
#- SYNC_MASTER synchronous double write master
#- SLAVE
brokerRole=SYNC_MASTER
5.7 load balancing
Producer load balancing
Consumer load balancing
5.8 message retry
When the message fails to return the information of successful consumption after consumption, the message retry mechanism will be started
Message retry mechanism
Dead letter queue
When the message consumption retry reaches the specified number of times (16 by default), MQ will call the message that cannot be consumed normally dead letter message
Dead letter messages are not discarded directly, but are saved to a new queue called dead letter queue
Dead letter queue characteristics
Message characteristics in dead letter queue
Dead letter processing: in the monitoring platform, get the messageId of the dead letter by searching the dead letter, and then accurately consume the dead letter through the id
5.9 repeated consumption of messages
---------------------------------------------------------------------------------------------------------------------------------
Some of the contents exist in books, classes and online records. If they are the same, it is pure coincidence