Java mainstream framework - (14) spring elastic search

1. Introduction to rocketmq

1.1MQ introduction

MQ (Message Queue) Message Queue is a queue used to save message data

Queue: a type of data structure characterized by "first in first out"

1.2 what is a message

Service requests between servers

Original architecture:
In the server A Function needs to be called B , C Module can be completed
Microservice architecture:
The server A To server B Send the action to be performed (treated as a message)
The server A towards The server C send out To execute operation (treated as a message) )

1.3MQ function

1. Application decoupling (asynchronous message sending)

MQ basic working mode

2. Rapid application change maintenance (asynchronous message sending)

3. Traffic clipping (asynchronous message sending)

1.4MQ advantages and disadvantages analysis

Advantages (functions)

application decoupling
Quickly apply changes maintain
Flow cutting front

shortcoming

system usability reduce
system Complexity increase
Asynchronous message mechanism
news Order
Message loss
news uniformity
news repeat use

1.5MQ product introduction

ActiveMQ: java language implementation, 10000 level data throughput, ms level processing speed, master-slave architecture and high maturity

RabbitMQ: erlang language implementation, 10000 level data throughput, us level processing speed, master-slave architecture,

RocketMQ: java language implementation, 100000 level data throughput, ms level processing speed, distributed architecture, powerful function and strong scalability

kafka: implemented in scala language, with 100000 level data throughput, ms level processing speed, distributed architecture, less functions and more applications for big data

RocketMQ is a very excellent middleware product opened by Alibaba. It was born out of another queue technology MetaQ of Alibaba and donated to Apache foundation. As an incubation technology, RocketMQ has become the top project of Apache foundation in just over a year. Moreover, it has been widely used in Alibaba and has withstood the pressure of the extreme scenario of double 11 for many times (on double 11 in 2017, the message volume of RocketMQ reached trillions and the peak TPS reached 56 million)

Address all shortcomings

 2. Environment construction

2.1 basic concepts

2.2 installation

Download: https://www.apache.org/

 

 

 3. message sending

3.1 development process of message sending and receiving

1. who To send?
2. issue Who?
3. Yes? hair ?
4. What?
5. What's the result?
6. clean battlefield

3.2 single producer and single consumer message sending

Import RocketMQ client coordinates

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.2</version>
</dependency>

Producer

//1. Create an object Producer that sends messages
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2. Set the sending naming server address
producer.setNamesrvAddr("192.168.175.130:9876");
//3.1 start sending service
producer.start();
//4. Create a message object to send, specify topic and content body
Message msg = new Message("topic1","hello rocketmq".getBytes("UTF-8"));
//3.2 sending messages
SendResult result = producer.send(msg);
System.out.println("Return result:"+result);
//5. Close the connection
producer.shutdown();

Note: turn off the server firewall

systemctl stop firewalld.service

Consumer

//1. Create an object Comsumer to receive messages
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2. Set the accepted naming server address
        consumer.setNamesrvAddr("192.168.175.130:9876");
        //3. Set the topic corresponding to the received message
        consumer.subscribe("topic1","*");
        //3. Enable listening to receive messages
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //Traversal message
                for (MessageExt messageExt : list) {
                    System.out.println("Received message:"+messageExt);
                    System.out.println("news:"+new String(messageExt.getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //4. Start the service to receive messages
        consumer.start();
        System.out.println("The accept message service is running");

3.3 single producer multi consumer message sending

consumer

//1. Create an object Comsumer to receive messages
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2. Set the accepted naming server address
        consumer.setNamesrvAddr("192.168.175.130:9876");
        //3. Set the topic corresponding to the received message
        consumer.subscribe("topic1","*");

        //Set the consumption mode of the current consumer (default mode: load balancing)
//        consumer.setMessageModel(MessageModel.CLUSTERING);
        //Set the current consumer's consumption mode to broadcast mode (all messages received by clients are the same)
        consumer.setMessageModel(MessageModel.BROADCASTING);

        //3. Enable listening to receive messages
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //Traversal message
                for (MessageExt messageExt : list) {
                    System.out.println("Received message:"+messageExt);
                    System.out.println("news:"+new String(messageExt.getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //4. Start the service to receive messages
        consumer.start();
        System.out.println("The accept message service is running");

Load balancing

Broadcast mode

3.4 multi producer and multi consumer message sending

Messages generated by multiple producers can be consumed by the same consumer or by multiple consumers

3.5 message category

3.5.1 synchronization message

Features: instant, important messages with receipt, such as SMS and notification (transfer succeeded)

public class Producer {
    public static void main(String[] args) throws Exception {



        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.175.130:9876");
        producer.start();
        for (int i=1;i<=5;i++) {
            Synchronous message sending
            Message msg = new Message("topic2",("Synchronization message: hello rocketmq"+i).getBytes("UTF-8"));
            SendResult send = producer.send(msg);
            System.out.println("Return results"+send);
        }
        //Add a sleep operation to ensure that the asynchronous message can be output after it is returned
        TimeUnit.SECONDS.sleep(10);
        producer.shutdown();
    }
}

3.5.2 asynchronous messages

Features: messages with weak immediacy but need receipt, such as some information in the order

(the callback processing result must be executed before the end of the producer process, otherwise the callback cannot be executed correctly)

public class Producer {
    public static void main(String[] args) throws Exception {



        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.175.130:9876");
        producer.start();
        for (int i=1;i<=5;i++) {
            //Asynchronous message sending
            Message msg = new Message("topic2",("Asynchronous message: hello rocketmq"+i).getBytes("UTF-8"));
            producer.send(msg,new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                    System.out.println(throwable);
                }
            });
        }
        //Add a sleep operation to ensure that the asynchronous message can be output after it is returned
        TimeUnit.SECONDS.sleep(10);
        producer.shutdown();
    }
}

3.5.3 single message

Features: messages without receipt, such as log messages

public class Producer {
    public static void main(String[] args) throws Exception {



        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.175.130:9876");
        producer.start();
        for (int i=1;i<=5;i++) {
            //One way message sending
            Message msg = new Message("topic2",("Single message: hello rocketmq"+i).getBytes("UTF-8"));
            producer.sendOneway(msg);
        }
        //Add a sleep operation to ensure that the asynchronous message can be output after it is returned
        TimeUnit.SECONDS.sleep(10);
        producer.shutdown();
    }
}

3.5.4 delay message

When sending a message, it is not directly sent to the message server, but arrives according to the set waiting time, which plays the role of buffer for delayed arrival

public class Producer {
    public static void main(String[] args) throws Exception {



        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.175.130:9876");
        producer.start();
        for (int i=1;i<=5;i++) {
            //Synchronous message sending
            Message msg = new Message("topic2",("Non delayed message: hello rocketmq"+i).getBytes("UTF-8"));
            //The effect of the current message delay setting
            msg.setDelayTimeLevel(3);

            SendResult send = producer.send(msg);
            System.out.println("Return results"+send);
        }
        producer.shutdown();
    }
}

Currently supported message times

u Second level: 1 , 5 , 10 , 30
u classification: 1~10 , 20 , 30
u Time level: 1 , 2

                1s  5s  10s  30s  1m  2m  3m  4m  5m  6m  7m  8m  9m  10m  20m  30m  1h  2h

3.5.5 batch messages

public class Producer {
    public static void main(String[] args) throws Exception {
        
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.175.130:9876");
        producer.start();
        //Create a collection to hold multiple messages
        List<Message> msgList = new ArrayList<Message>();
        Message msg1 = new Message("topic3",("Batch messages: hello rocketmq111").getBytes("UTF-8"));
        Message msg2 = new Message("topic3",("Batch messages: hello rocketmq222").getBytes("UTF-8"));
        Message msg3 = new Message("topic3",("Batch messages: hello rocketmq333").getBytes("UTF-8"));
        msgList.add(msg1);
        msgList.add(msg2);
        msgList.add(msg3);

        //Batch message sending (the total number of messages sent each time shall not exceed 4M)
        //The total length of the message contains four pieces of information: topic,body, attributes of the message, and log (20 bytes)
        SendResult send = producer.send(msgList);

        System.out.println("Return results"+send);
        producer.shutdown();
    }
}

3.5.6 message filtering

Classification filtering

producer

Message msg = new Message("topic6","tag2", ("message filtering according to tag: Hello rocketmq 2") getBytes("UTF-8"));

consumer

//When receiving a message, in addition to setting the topic, you can also specify the received tag, * represents any tag
consumer.subscribe("topic6","tag1 || tag2");

Message filtering

producer

//Add properties to the message
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");

consumer

//The message selector is used to filter the corresponding attributes. The syntax format is SQL like syntax
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));

Note: SQL filtering depends on the function support of the server. Add the corresponding function item in the broker configuration file and turn on the corresponding function

enablePropertyFilter=true

Start the server to enable the corresponding profile

sh mqbroker -n localhost:9876 -c ../conf/broker.conf

3.5.7 disordered message sequence

3.5.8 sequence messages

send message

//Set the message to enter the specified message queue
for(final Order order : orderList){
    Message msg = new Message("orderTopic",order.toString().getBytes());
/ / specify the corresponding message queue selector when sending
        SendResult result = producer.send(msg, new MessageQueueSelector() {
/ / set which message queue to use when sending the current message
                 public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
/ / select different message queues according to the information sent
/ / select an object of the message queue according to the id, and return - > id to get the int value
                         int mqIndex = order.getId().hashCode() % list.size();
           return list.get(mqIndex);
        }
    }, null);
    System.out.println(result);
}

receive messages

//Use the single thread mode to get data from the message queue, and one thread binds to one message queue
consumer.registerMessageListener(new MessageListenerOrderly() {
/ / after using the MessageListenerOrderly interface, the processing of the message queue is served by one message queue and multiple threads

/ /, which is converted into a message queue and a thread service
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        for(MessageExt msg : list){
            System.out.println("message:" + new String(msg.getBody())));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

3.5.9 transaction messages

Transaction message status

Submission status: it is allowed to enter the queue. This message is no different from non transaction messages

Rollback status: it is not allowed to enter the queue. This message is equivalent to not sent

Intermediate status: the sending of the half message is completed, and no secondary status confirmation is performed for MQ

Note: transaction messages are only related to producers, not consumers

public static void main1(String[] args) throws Exception {
        //Producer used by transaction messages TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.175.130:9876");
        //Add listening for local transactions
        producer.setTransactionListener(new TransactionListener() {
            //Normal transaction process
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
            //Transaction compensation process
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                return null;
            }
        });
        producer.start();

        Message msg = new Message("topic7",("Transaction message: hello rocketmq").getBytes("UTF-8"));
        SendResult send = producer.sendMessageInTransaction(msg,null);
        System.out.println("Return results"+send);
        producer.shutdown();
    }
public static void main2(String[] args) throws Exception {
        //Producer used by transaction messages TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.175.130:9876");
        //Add listening for local transactions
        producer.setTransactionListener(new TransactionListener() {
            //Normal transaction process
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            //Transaction compensation process
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                return null;
            }
        });
        producer.start();

        Message msg = new Message("topic8",("Transaction message: hello rocketmq").getBytes("UTF-8"));
        SendResult send = producer.sendMessageInTransaction(msg,null);
        System.out.println("Return results"+send);
        producer.shutdown();
    }
public static void main(String[] args) throws Exception {
        //Producer used by transaction messages TransactionMQProducer
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.175.130:9876");
        //Add listening for local transactions
        producer.setTransactionListener(new TransactionListener() {
            //Normal transaction process
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                return LocalTransactionState.UNKNOW;
            }
            //Transaction compensation process
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("Transaction compensation process execution");
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();

        Message msg = new Message("topic9",("Transaction message: hello rocketmq").getBytes("UTF-8"));
        SendResult send = producer.sendMessageInTransaction(msg,null);
        System.out.println("Return results"+send);
        //The physical compensation process must ensure that the server is running, otherwise normal transaction compensation will not be carried out
//        producer.shutdown();
    }

4. Cluster construction

4.1 rocketmq cluster classification

4.2 rocketmq cluster features

4.3 rocketmq cluster workflow

Step 1: start the NameServer, start listening, and wait for the broker and producer to connect with the consumer

Step 2: start the broker, connect all nameservers according to the configuration information, and maintain a long connection

Step 2: if there is existing data in the broker, NameServer will save the relationship between topic and broker

Step 3: the producer sends a message, connects to a NameServer, and establishes a long connection

Step 4: the producer sends a message

u step 4.1 provided that topic Exist, by NameServer Direct distribution
u step 4.2 If topic Does not exist, by NameServer establish topic And broker Relationship and assignment

Step 5: producer selects a message queue in the topic of the broker (select from the list)

Step 6: the producer establishes a long connection with the broker to send messages

Step 7: producer sends a message

The workflow of comsumer is the same as that of producer

4.4 construction of dual master and dual slave clusters

5. Advanced features

5.1 message storage

① news Generator send Message arrival MQ
② MQ return ACK to producer
③ MQ push message to the corresponding consumer
④ Message consumer return ACK to MQ

Description: ACK (knowledge character)

 

① news The generator sends a message to MQ
② MQ When a message is received, it is persisted , store the message
③ MQ Return ACK to producer
④ MQ push message to the corresponding consumer
⑤ Message consumer return ACK to MQ
⑥ MQ removal message

be careful:

① Step ⑤ MQ In specified time Internal message consumer returns ACK , MQ assertion message Consumption success , execute ⑥
② Step ⑤ MQ At the specified time Not received within Message consumer return ACK , MQ assertion message If consumption fails, execute ④ ⑤⑥ again

 

database

ActiveMQ
Disadvantages: database bottleneck will become MQ bottleneck

file system

RocketMQ /Kafka/RabbitMQ
Solution: adopt message brush Disk mechanism for data storage
Disadvantages: the problem of hard disk damage cannot be avoided

 

5.2 efficient message storage and read-write mode

 

5.3 message storage structure

The MQ data storage area contains the following contents

Message data storage area
topic
queueId
message
Consumption logical queue
minOffset
maxOffset
consumerOffset
Indexes
key Indexes
Create time index
......

 

5.4 disc brushing mechanism

 

Synchronous disk brushing: high security, low efficiency and slow speed (applicable to businesses with high requirements for data security)

Asynchronous disk brushing: low security, high efficiency and high speed (applicable to businesses requiring high data processing speed)

Configuration mode

#Disc brushing mode

#- ASYNC_FLUSH asynchronous brush disk

#- SYNC_FLUSH synchronous brush disc

flushDiskType=SYNC_FLUSH

5.5 high availability

nameserver

Stateless + Full server registration

Message server

Master-slave architecture( 2M-2S )

Message production

Producers will be the same topic Bind to multiple group Group, support master After hanging up, others master The message can still be received normally

Message consumption

RocketMQ Self will be based on master Confirm whether the pressure is controlled by master Undertake the function of message reading, when master Auto switch by busy time slave Undertake the work of data reading

5.6 master slave data replication

synchronous copy

master After receiving the message, copy it to slave And then feed back to the producer that the write operation is successful
Advantages: data security, no loss of data, and easy recovery in case of failure
Disadvantages: affect data throughput and low overall performance

Asynchronous replication

master After receiving the message, return to Producer write operation Success, when the message reaches a certain level amount Then asynchronously copy to slave
advantage : high data throughput, low operation delay and high performance
Disadvantages: if the data is not secure, the data will be lost master In case of failure, the data from the last data synchronization to the failure time will be lost

collocation method

#Role of Broker

#- ASYNC_MASTER asynchronous replication master

#- SYNC_MASTER synchronous double write master

#- SLAVE

brokerRole=SYNC_MASTER

5.7 load balancing

Producer load balancing

Internal implementation of different broker For the same in the cluster topic Load balancing corresponding to message queue

 

 

Consumer load balancing

Average distribution
Cyclic average distribution

5.8 message retry

When the message fails to return the information of successful consumption after consumption, the message retry mechanism will be started

Message retry mechanism

Sequential message
When After the consumer failed to consume the message , RocketMQ Automatically Message retries (at intervals of) 1 second )
Note: Application Message consumption will be blocked Therefore, the consumption of sequential messages should be monitored to avoid blocking Phenomenal happen
Unordered message

Dead letter queue

When the message consumption retry reaches the specified number of times (16 by default), MQ will call the message that cannot be consumed normally dead letter message

Dead letter messages are not discarded directly, but are saved to a new queue called dead letter queue

Dead letter queue characteristics

Belong to a group( Gourp Id )Without belonging Topic , nor does it belong to consumers
A dead letter queue can contain multiple messages in the same group Topic Dead letter message in
The dead letter queue will not be initialized by default. When the first dead letter appears, the queue will be initialized for the first time

Message characteristics in dead letter queue

Will not be re consumed
Messages in the dead letter queue are valid for 3 Days, will be eliminate

Dead letter processing: in the monitoring platform, get the messageId of the dead letter by searching the dead letter, and then accurately consume the dead letter through the id

5.9 repeated consumption of messages

 

---------------------------------------------------------------------------------------------------------------------------------

Some of the contents exist in books, classes and online records. If they are the same, it is pure coincidence

Keywords: Java ElasticSearch Spring

Added by musclehead on Sun, 20 Feb 2022 20:29:52 +0200