We know that RocketMQ's NameServer is not strongly consistent, but ultimately consistent, that is, the client periodically obtains Broker information. If the Broker fails for a period of time, the client can't immediately sense it. How can RocketMQ achieve high availability of message sending? It can be expanded from the following three aspects:
- Retry mechanism
- Replace the queue in turn
- Avoiding failed brokers
1, Retry mechanism
When sending a message, there is such a code:
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; for (; times < timesTotal; times++) { ... }
If it is sent synchronously, it will retry 3 times by default, and the retry times can also be configured through retryTimesWhenSendFailed.
2, Replace the queue in turn
In the selectOneMessageQueue method, the index of the last used queue will be obtained from sendWhichQueue. This sendWhichQueue is a ThreadLocalIndex type, which has a ThreadLocal. The queue index is stored in this ThreadLocal. After obtaining the index, there will be a self increment action, and then obtain a new queue according to the new index. For example, there is a broker named broker-a, which has queues q1, q2, Q3 and Q4. If queue q1 was used last time, q2 will be selected as the new sending queue after the index increases. In this way, the load of the four queues will be consistent as much as possible. The code is as follows:
int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; //Get a queue in which the broker is recorded MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //Judge whether the broker is invalid if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { //The first acquisition will be empty; The obtained broker is valid and used in the last acquisition if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } }
Avoiding failed brokers
If the last used Broker fails or the delay exceeds a certain range, how does RocketMQ avoid it?
After sending a message every time, RocketMQ uses ConcurrentHashMap to record the corresponding relationship between the broker and the time spent sending the message. Before sending the message next time, it will judge whether the broker where the queue is located can be used. If it is available and used last time, it will be used directly, Here, let's focus on the situation that the obtained broker cannot be used. Let's assume that there are two brokers, broker-a and broker-b. each queue has four queues, Q1, Q2, Q3 and Q4:
Before sending, the value of sendWhichQueue is q1 of broker-a. if the message sending fails due to the abnormally large burst traffic of broker-a at this time, a retry will be triggered. According to the round robin mechanism, the next selected queue is q2 queue of broker-a. the message sending probability will still fail, that is, although it will be retried twice, it will be sent to the same Broker for processing, This process will appear unreliable, that is, it will still fail with a high probability. In fact, it doesn't make much sense to try again.
In order to solve this problem, RocketMQ introduces a fault avoidance mechanism. When the message is retried, it will try to avoid the last sent Broker. Returning to the above example, when the message is sent to the broker-a q1 queue, it returns the sending failure. When retrying, it will exclude all queues in broker-a first, that is, it will select the broker-b q1 queue this time to increase the success rate of message sending. RocketMQ provides two evasion strategies. This parameter is controlled by sendLatencyFaultEnable and can be intervened by the user. It indicates whether to enable the delay evasion mechanism. It is not enabled by default. The code for avoiding broker-a is as follows:
public String pickOneAtLeast() { final Enumeration<FaultItem> elements = this.faultItemTable.elements(); List<FaultItem> tmpList = new LinkedList<FaultItem>(); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } if (!tmpList.isEmpty()) { Collections.shuffle(tmpList); Collections.sort(tmpList); final int half = tmpList.size() / 2; if (half <= 0) { return tmpList.get(0).getName(); } else { final int i = this.whichItemWorst.getAndIncrement() % half; return tmpList.get(i).getName(); } } return null; }
Turn on the delay avoidance mechanism. Once the message fails to be sent, the broker-a will "pessimistically" think that the Broker will not be available in the next period of time, and all clients will not send messages to the Broker in a certain period of time in the future. This delay time is calculated jointly by notAvailableDuration and latencyMax. First, calculate the delay consumed by the message sending failure, and then the corresponding interval in latencyMax, that is, calculate the subscript in latencyMax, and then return the delay value corresponding to the same subscript of notAvailableDuration For example, if the latency requested last time exceeds 550Lms, back off 3000Lms; If it exceeds 1000L, retreat 60000L;
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; }
RocketMQ ensures the high availability of message sending through failure retry, changing queues in turn and avoiding failed broker s.