RabbitMQ message reliability mainly includes two aspects: on the one hand, it realizes the retry mechanism of consumption (retry through @ Retryable, you can set the retry times and retry frequency, but ensure idempotence); on the other hand, it realizes the reliable delivery of message producers (pay attention to the idempotence of consumption orders). The following mainly discusses the reliable message delivery realized by producers.
The sending process of rabbitTemplate is as follows:
1 send data and return (do not confirm that rabbitmq server has successfully received)
2 asynchronously receive ack confirmation information returned from rabbitmq
3 call the confirmCallback function after receiving ack. Note that there is no original message in confirmCallback, so it is impossible to call replay in this function. ConfirmCallback has only one notification function. In this case, if we cut off the connection at any time in the 2 and 3 steps, we can not confirm whether the data has been successfully sent out. Thus causing the problem of data loss.
There is only one perfect solution: using rabbitmq's transaction mechanism. In this case, however, rabbitmq is extremely inefficient, with hundreds of message s processed per second. It's really not desirable.
The second solution is to use the synchronous sending mechanism, that is, the client sends data, rabbitmq returns ack after receiving it, and the send function returns after receiving ack. The code is similar to this:
establish channel send message wait for ack(or overtime) close channel Return success or fail
Similarly, it is inefficient because the connection must be re established every time a message is sent.
Based on the above analysis, we use a new way to avoid data loss.
Based on rabbitTemplate asynchronous confirmation
1 cache the sent message s in redis
2 delete the confirmed message locally through confirmCallback or confirmed ack
3. Scan the local message regularly. If it is not confirmed after a certain time, it will be retransmitted
Of course, this solution also has some problems: imagine this scenario. rabbitmq receives a message. When sending an ack acknowledgement, the network is disconnected, resulting in the client not receiving the ack and resending the message. (compared with the lost message, it is much better to resend the message. We can achieve idempotence on the consumer side). The automatic retry code is as follows:
package cn.chinotan.service.reliabletransmission; /** * @program: test * @description: rabbitMq constant * @author: xingcheng * @create: 2018-08-12 12:30 **/ public class MyConstant { public static final String MY_EXCHANGE = "my_exchange"; public static final String ERROR_EXCHANGE = "error_exchange"; public static final String MY_QUEUE_THREE = "my_queue_three"; public final static String KEY_PREFIX = "test:rabbitMq:"; /** * consumer Wait time after failure (mils) */ public static final int ONE_MINUTE = 1 * 60 * 1000; /** * MQ Message retry time */ public static final int RETRY_TIME_INTERVAL = ONE_MINUTE; /** * MQ Message validity time */ public static final int VALID_TIME = ONE_MINUTE; }
package cn.chinotan.service.reliabletransmission; import java.io.Serializable; /** * @program: test * @description: Packaging message * @author: xingcheng * @create: 2018-09-24 15:32 **/ public class MessageWithTime implements Serializable { private String id; private long time; private String message; public String getId() { return id; } public void setId(String id) { this.id = id; } public long getTime() { return time; } public void setTime(long time) { this.time = time; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
package cn.chinotan.service.reliabletransmission; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rabbitMQ to configure */ @Configuration public class ReliableRabbitConfig { @Bean public DirectExchange myExchange() { return new DirectExchange(MyConstant.MY_EXCHANGE, true, false); } @Bean public Queue myQueueOne() { return new Queue(MyConstant.MY_QUEUE_THREE, true); } @Bean public Binding queueOneBinding() { return BindingBuilder.bind(myQueueOne()).to(myExchange()).withQueueName(); } }
package cn.chinotan.service.reliabletransmission; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import java.util.Map; import java.util.UUID; /** * @program: test * @description: rabbitService * @author: xingcheng * @create: 2018-09-24 14:28 **/ @Service public class RabbitMQService { Logger logger = LoggerFactory.getLogger(RabbitMQService.class); @Autowired StringRedisTemplate redisTemplate; @Autowired private RabbitTemplate rabbitTemplate; public Boolean send(String exchange, String routingKey, Object message) { try { String key = StringUtils.join(MyConstant.KEY_PREFIX, UUID.randomUUID().toString().replace("-", "").toLowerCase()); // Save the message, time and id to the redis cache before sending MessageWithTime messageWithTime = new MessageWithTime(); messageWithTime.setId(key); messageWithTime.setMessage(JSONObject.toJSONString(message)); messageWithTime.setTime(System.currentTimeMillis()); redisTemplate.opsForValue().set(key, JSONObject.toJSONString(messageWithTime)); // Asynchronous callback notification rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { logger.info("message send success--id:[{}]", correlationData.getId()); // After sending successfully, delete the redis cache redisTemplate.delete(correlationData.getId()); } else { // After sending failed, print the log and try again logger.error("message send fail--id:[{}]", correlationData.getId()); } }); CorrelationData correlationData = new CorrelationData(key); rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData); } catch (Exception e) { logger.error("Sending message exception{}", e); return false; } return true; } Boolean send(String exchange, String routingKey, MessageWithTime message) { try { // Asynchronous callback notification rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { logger.info("message send success--id:[{}]", correlationData.getId()); // After sending successfully, delete the redis cache redisTemplate.delete(correlationData.getId()); } else { // After sending failed, print the log and try again logger.error("message send fail--id:[{}]", correlationData.getId()); } }); CorrelationData correlationData = new CorrelationData(message.getId()); Map map = JSON.parseObject(message.getMessage(), Map.class); rabbitTemplate.convertAndSend(exchange, routingKey, map, correlationData); } catch (Exception e) { logger.error("Sending message exception{}", e); return false; } return true; } }
package cn.chinotan.service.reliabletransmission; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Map; /** * producer */ @Service public class ReliableProducr { private static final Logger LOGGER = LoggerFactory.getLogger(ReliableProducr.class); @Autowired private RabbitMQService rabbitMQService; public Boolean send(Map msg) { return rabbitMQService.send(MyConstant.MY_EXCHANGE, MyConstant.MY_QUEUE_THREE, msg); } public Boolean send(MessageWithTime msg) { return rabbitMQService.send(MyConstant.MY_EXCHANGE, MyConstant.MY_QUEUE_THREE, msg); } }
package cn.chinotan.service.reliabletransmission; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.context.WebApplicationContext; import org.springframework.web.context.support.WebApplicationContextUtils; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import javax.servlet.annotation.WebListener; /** * @program: test * @description: Reliable delivery listener * @author: xingcheng * @create: 2018-09-24 16:05 **/ @WebListener public class ReliableTransContextListener implements ServletContextListener { Logger logger = LoggerFactory.getLogger(ReliableTransContextListener.class); private WebApplicationContext springContext; @Override public void contextInitialized(ServletContextEvent sce) { logger.info("ReliableTransContextListener init start..........."); springContext = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext()); if (springContext != null) { RetryCache retryCache = (RetryCache) springContext.getBean("retryCache"); new Thread(() -> retryCache.startRetry()).start(); } } @Override public void contextDestroyed(ServletContextEvent sce) { } }
package cn.chinotan.service.reliabletransmission; import com.alibaba.fastjson.JSON; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.util.List; import java.util.Set; /** * @program: test * @description: Cache retry * @author: xingcheng * @create: 2018-09-24 16:12 **/ @Component("retryCache") public class RetryCache { private boolean stop = false; Logger logger = LoggerFactory.getLogger(RetryCache.class); @Autowired private ReliableProducr producr; @Autowired private StringRedisTemplate redisTemplate; private final String STAR = "*"; public void startRetry() { while (!stop) { try { Thread.sleep(MyConstant.RETRY_TIME_INTERVAL); } catch (InterruptedException e) { e.printStackTrace(); } long now = System.currentTimeMillis(); Set<String> keys = redisTemplate.keys(StringUtils.join(MyConstant.KEY_PREFIX, STAR)); if (keys != null && !keys.isEmpty()) { List<String> list = redisTemplate.opsForValue().multiGet(keys); list.forEach(value -> { MessageWithTime messageWithTime = JSON.parseObject(value, MessageWithTime.class); if (null != messageWithTime) { if (messageWithTime.getTime() + 3 * MyConstant.VALID_TIME < now) { logger.error("send message {} failed after 3 min ", messageWithTime); redisTemplate.delete(messageWithTime.getId()); } else if (messageWithTime.getTime() + MyConstant.VALID_TIME < now) { Boolean send = producr.send(messageWithTime); logger.info("Redelivery message"); if (!send) { logger.error("retry send message failed {}", messageWithTime); } } } }); } } } }
package cn.chinotan.service.reliabletransmission; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; /** * queueThree consumer */ @Component public class MyQueueThreeConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(MyQueueThreeConsumer.class); /** * Consumers are idempotent * * @param content */ @RabbitListener(queues = MyConstant.MY_QUEUE_THREE) @RabbitHandler public void process(Map content) { LOGGER.info("consumer, queueThree Start execution {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); LOGGER.info("consumer, queueThree Consumption content:[{}]", JSON.toJSONString(content)); } }
import cn.chinotan.service.reliabletransmission.MyConstant; import cn.chinotan.service.reliabletransmission.RabbitMQService; import cn.chinotan.service.reliabletransmission.ReliableProducr; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.HashMap; import java.util.Map; /** * @program: test * @description: Reliable delivery test * @author: xingcheng * @create: 2018-09-24 15:57 **/ @RunWith(SpringRunner.class) @SpringBootTest(classes = MyApplication.class) public class ReliableTransmissionTest { @Autowired private ReliableProducr producr; @Autowired private RabbitMQService rabbitMQService; /** * Normal condition test * @throws Exception */ @Test public void reliableTransmissionTest() throws Exception { Map<String, String> map = new HashMap<>(); map.put("name", "xingheng"); producr.send(map); } /** * Abnormal condition test * @throws Exception */ @Test public void reliableTransmissionFailTest() throws Exception { Map<String, String> map = new HashMap<>(); map.put("name", "xingheng"); rabbitMQService.send(MyConstant.ERROR_EXCHANGE, MyConstant.MY_QUEUE_THREE, map); } }
matters needing attention:
1. Enable publisher confirmation in configuration, like this:
spring: rabbitmq: publisher-confirms: true
2. If you want to test the abnormal condition, you only need to send the message to a nonexistent switch
3. Pay attention to the idempotence of the consumer side
Simple test results:
data:image/s3,"s3://crabby-images/a48aa/a48aa281c3cfdb6aee3155be2b9358804766014c" alt=""
data:image/s3,"s3://crabby-images/b78eb/b78eb83de1a44671265b664061dca5772b1f8b05" alt=""
After retrying once, it will be sent to the correct switch, and the transmission is successful
data:image/s3,"s3://crabby-images/7b3c2/7b3c2a111fd1bbbbc39df41459bb3c955022d3e4" alt=""