Based on redis6 0 deployment of mini version message queue

Technical research background

As the current R & D team is in the start-up stage of the company, there is no mature operation and maintenance system, and the ability to build and maintain the mature MQ commonly seen in the market is insufficient. However, we hope to have a lightweight message system for the members of the R & D team. Therefore, we have carried out technical research on this aspect.

After relevant technical research, it is decided to select Redis based messaging system.
Specific technical selection reasons:
The team has built relevant Redis services and has certain operation and maintenance capabilities, which can save technical costs
There are many technical articles in the industry on how to build Redis messaging system
The overall throughput of the current system is not high. The main purpose of accessing the message system is to realize the decoupling between the systems

In order to make it convenient for readers to learn this content from 0 to 1, I will introduce it from link construction.

Construction of basic environment

Based on redis6 Build a simple message queue system in version 0.6.
Environment deployment:

docker run -p 6379:6379 --name redis_6_0_6 -d redis:6.0.6 
  • Parameter interpretation:
    -d background startup
    -p port mapping
    -Name container name

If there is no local image, you can try to pull the image by setting up the following command:

docker pull redis:6.0.6 

After the basic environment of redis is configured, the next step is to develop a message queue component based on some basic functions built in redis.

Next, I will introduce how to implement a lightweight message queue in three different technical schemes.

The message queue is implemented based on the conventional queue structure

The implementation of this part is relatively simple. It is mainly based on the List structure inside Redis. The sender writes the message from the left side of the queue, and then the consumer reads it from the right side of the queue.

package org.idea.mq.redis.framework.mq.list;
import com.alibaba.fastjson.JSON;
import org.idea.mq.redis.framework.bean.MsgWrapper;
import org.idea.mq.redis.framework.mq.IMQTemplate;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @Author linhao
 * @Date created in 3:09 2022 / 2 / 7 pm
 */
@Component
public class RedisListMQTemplate implements IMQTemplate {
    @Resource
    private IRedisService iRedisService;
    @Override
    public boolean send(MsgWrapper msgWrapper) {
        try {
            String json = JSON.toJSONString(msgWrapper.getMsgInfo());
            iRedisService.lpush(msgWrapper.getTopic(),json);
            return true;
        }catch (Exception e){
            e.printStackTrace();
        }
        return false;
    }
}

Problem thinking

There are several problems to consider:

How to subscribe to the same message among multiple services

Here, I suggest that you can organize according to the project name prefix + business ID of the system.
For example, the user system needs to publish a message that a member has been upgraded to the downstream system. At this time, this message can be written to the List set named user service: member upgrade List. If the order system wants to access the messages of the user system, it needs to specify the user service: member upgrade List keyword in the key of redis.

How to implement the message listening mechanism?

The message of List can be obtained by polling. For example, the following case code:

/**
 * Obtain data by polling
 *
 * @param msgWrapper
 */
private void pollingGet(MsgWrapper msgWrapper) {
    while (true) {
        String value = iRedisService.rpop(msgWrapper.getTopic());
        if (!StringUtils.isEmpty(value)) {
            System.out.println(value);
        }
        //Reduce access pressure and sleep regularly for a period of time
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

However, the polling method consumes more performance, so you can try to use Redis's blocking pop-up instruction. For example, the following method is used to monitor the trigger behavior of messages:

/**
 * Get data by blocking
 */
private void blockGet(MsgWrapper msgWrapper) {
    while (true) {
        List<String> values = iRedisService.brpop(msgWrapper.getTopic());
        if (!CollectionUtils.isEmpty(values)) {
            values.forEach(value -> {
                System.out.println(value);
            });
        }
    }
}

How to ensure the reliable transmission of messages?

When designing message queues, we attach great importance to the reliability of messages. After a message is sent to the consumer, if there is an exception, I hope the message can be re sent.
For the design of this scenario, we can try to use the BRPOPLPUSH instruction, which can help us write the data to another backup queue when pop-up in Redis. In this way, even if the pop-up message consumption fails, there is still a backup message in the backup queue, and the pop-up and write backup queue operations are encapsulated in Redis, External call can be regarded as an atomic operation.

Can broadcast mode be supported?

From the implementation principle of List collection, Redis pop-up elements can only be returned to one client link, so it cannot support the implementation of broadcast effect.

Implementation of message queue based on publish subscribe function

Redis internally provides a function called publish and subscribe, which can help us realize the functions of message publishing and notification through the subscribe command and publish command.
The biggest difference between the effect achieved by using the subscibe/publish command and the List structure lies in its transmission mode:
list is more about point-to-point transmission (P2P)
Subscribe / publish is a way to communicate with subscribers by broadcasting

Case code of publish part:

@Override
public boolean publish(String channel, String content) {
    try (Jedis jedis = iRedisFactory.getConnection()) {
        jedis.publish(channel, content);
        return true;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

Code of subscibe part:

@Override
public boolean subscribe(JedisPubSub jedisPubSub, String... channel) {
    try (Jedis jedis = iRedisFactory.getConnection()) {
        jedis.subscribe(jedisPubSub, channel);
        return true;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

The listening part can achieve this effect by opening an additional thread:

@Component
public class RedisSubscribeMQListener implements IMQListener {
    @Resource
    private IRedisService iRedisService;
    class TestChannel extends JedisPubSub {
        @Override
        public void onMessage(String channel, String message) {
            super.onMessage(channel, message);
            System.out.println("channel " + channel + " Message received:" + message);
        }
        @Override
        public void onSubscribe(String channel, int subscribedChannels) {
            System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
                    channel, subscribedChannels));
        }
        @Override
        public void onUnsubscribe(String channel, int subscribedChannels) {
            System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
                    channel, subscribedChannels));
        }
    }
    //Messages on all channels are monitored
    @Override
    public void onMessageReach(MsgWrapper msgWrapper) {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                iRedisService.subscribe(new TestChannel(), msgWrapper.getTopic());
            }
        });
        thread.start();
    }
}

Note that a JedisPubSub object needs to be injected when calling back the notification, and the internal definition of this object defines the processing operation after receiving the message.

Problem thinking

How to ensure the reliable transmission of messages?

The messages processed through subscibe/publish do not have the characteristics of persistence. In case of network interruption and Redis downtime, messages will be lost, and there is no good mechanism to support repeated consumption of messages. Therefore, the reliability is poor.

Implementation of message queue based on Stream

Redis5. The Stream type published in 0 is also used to implement a typical message queue. It provides message persistence and active and standby replication functions, which can allow any client to access data at any time, remember the access location of each client, and ensure that messages are not lost. The emergence of this Stream type meets almost all the contents of the message queue, including but not limited to:

  • Serialization generation of message ID
  • Message traversal
  • Blocking and non blocking reading of messages
  • Packet consumption of messages
  • Unfinished message processing
  • Message queue monitoring

Some basic introductory chapters about Stream are not introduced here. Interested friends can read this article:

https://xie.infoq.cn/article/cdb47caddc5ff49dc09ea58cd

In the following part, we will directly enter the actual combat links related to Redis XStream.

Encapsulate message listening function

First, define an MQ related interface:

public interface RedisStreamListener {
    /**
     * Process normal messages
     */
    HandlerResult handleMsg(StreamEntry streamEntry);
}

Next is the implementation of message sending based on this set of interfaces:

package org.idea.mq.redis.framework.listener;
import com.alibaba.fastjson.JSON;
import org.idea.mq.redis.framework.bean.HandlerResult;
import org.idea.mq.redis.framework.config.StreamListener;
import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.idea.mq.redis.framework.utils.PayMsg;
import redis.clients.jedis.StreamEntry;
import javax.annotation.Resource;
import java.util.Map;
import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS;
/**
 * @Author linhao
 * @Date created in 10:07 2022 / 2 / 9 pm
 */
@StreamListener(streamName = "order-service:order-payed-stream", groupName = "order-service-group", consumerName = "user-service-consumer")
public class OrderPayedListener implements RedisStreamMQListener {
    @Resource
    private IRedisService iRedisService;
    @Override
    public HandlerResult handleMsg(StreamEntry streamEntry) {
        Map<String, String> map = streamEntry.getFields();
        String json = map.get("json");
        PayMsg payMsg = JSON.parseObject(json, PayMsg.class);
        System.out.println("pending payMsg is : " + payMsg);
        return SUCCESS;
    }
}

Custom message annotation

package org.idea.mq.redis.framework.config;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
/**
 * @Author linhao
 * @Date created in 10:04 2022 / 2 / 9 pm
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface StreamListener {
    String streamName() default "";
    String groupName() default "";
    String consumerName() default "";
}

There is a custom annotation of @ StreamListener in the code. The annotation contains an annotation of @ Component, which can be injected into the Spring container.

In order to automatically assemble these initialization classes, a configuration object needs to be added. The code is as follows:

package org.idea.mq.redis.framework.config;
import org.idea.mq.redis.framework.bean.HandlerResult;
import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.StreamPendingEntry;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS;
/**
 * @Author linhao
 * @Date created in 3:25 2022 / 2 / 7 pm
 */
@Configuration
public class StreamListenerConfiguration implements ApplicationListener<ApplicationReadyEvent> {
    @Resource
    private ApplicationContext applicationContext;
    @Resource
    private IRedisService iRedisService;
    private static Logger logger = LoggerFactory.getLogger(StreamListenerConfiguration.class);
    @Override
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        Map<String, RedisStreamMQListener> beanMap = applicationContext.getBeansOfType(RedisStreamMQListener.class);
        beanMap.values().forEach(redisStreamMQListener -> {
            StreamListener StreamListener = redisStreamMQListener.getClass().getAnnotation(StreamListener.class);
            ListenerInitWrapper listenerInitWrapper = new ListenerInitWrapper(StreamListener.streamName(), StreamListener.groupName(), StreamListener.consumerName());
            Thread handleThread = new Thread(new CoreMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService));
            Thread pendingHandleThread = new Thread(new PendingMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService));
            handleThread.start();
            pendingHandleThread.start();
            logger.info("{} load successed ", redisStreamMQListener);
        });
    }
    class PendingMsgHandlerThread implements Runnable {
        private ListenerInitWrapper listenerInitWrapper;
        private RedisStreamMQListener redisStreamMQListener;
        private IRedisService iRedisService;

        public PendingMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) {
            this.redisStreamMQListener = redisStreamMQListener;
            this.listenerInitWrapper = listenerInitWrapper;
            this.iRedisService = iRedisService;
        }
        @Override
        public void run() {
            String startId = "0-0";
            while (true) {
                List<StreamPendingEntry> streamConsumersInfos = iRedisService.xpending(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId), 1);
                //If the collection is not empty, the listening behavior will be triggered
                if (!CollectionUtils.isEmpty(streamConsumersInfos)) {
                    for (StreamPendingEntry streamConsumersInfo : streamConsumersInfos) {
                        StreamEntryID streamEntryID = streamConsumersInfo.getID();
                        //1 less than the streamId of the current pending
                        String streamIdStr = streamEntryID.toString();
                        String[] items = streamIdStr.split("-");
                        Long timestamp = Long.valueOf(items[0]) - 1;
                        String beforeId = timestamp + "-" + "0";
                        List<Map.Entry<String, List<StreamEntry>>> result = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(beforeId), 1, listenerInitWrapper.getConsumerName());
                        for (Map.Entry<String, List<StreamEntry>> streamInfo : result) {
                            List<StreamEntry> streamEntries = streamInfo.getValue();
                            for (StreamEntry streamEntry : streamEntries) {
                                try {
                                    //Business processing
                                    HandlerResult handlerResult = redisStreamMQListener.handleMsg(streamEntry);
                                    if (SUCCESS.equals(handlerResult)) {
                                        startId = streamEntryID.toString();
                                        iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId));
                                    }
                                } catch (Exception e) {
                                    logger.error("[PendingMsgHandlerThread] e is ", e);
                                }
                            }
                        }
                    }
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    class CoreMsgHandlerThread implements Runnable {
        private ListenerInitWrapper listenerInitWrapper;
        private RedisStreamMQListener redisStreamMQListener;
        private IRedisService iRedisService;
        public CoreMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) {
            this.redisStreamMQListener = redisStreamMQListener;
            this.listenerInitWrapper = listenerInitWrapper;
            this.iRedisService = iRedisService;
        }
        @Override
        public void run() {
            while (true) {
                List<Map.Entry<String, List<StreamEntry>>> streamConsumersInfos = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), StreamEntryID.UNRECEIVED_ENTRY, 1, listenerInitWrapper.getConsumerName());
                for (Map.Entry<String, List<StreamEntry>> streamInfo : streamConsumersInfos) {
                    List<StreamEntry> streamEntries = streamInfo.getValue();
                    for (StreamEntry streamEntry : streamEntries) {
                        //Business processing
                        try {
                            HandlerResult result = redisStreamMQListener.handleMsg(streamEntry);
                            if (SUCCESS.equals(result)) {
                                iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), streamEntry.getID());
                            }
                        } catch (Exception e) {
                            logger.error("[CoreMsgHandlerThread] e is ", e);
                        }
                    }
                }
            }
        }
    }
}

The principle is that after the Spring container is started, listen to the ApplicationReadyEvent event sent from the Spring container, listen to the event, and start two background threads to process the stream data inside redis.

Encapsulate related message publishing functions

The sending part of the message is relatively simple. You can directly write data into the stream through redis

package org.idea.mq.redis.framework.producer;
/**
 * @Author linhao
 * @Date created in 12:23 2022 / 2 / 10 pm
 */
public interface IStreamProducer {
    /**
     * Specify the streamName publishing message
     * @param streamName
     * @param json
     */
    void sendMsg(String streamName, String json);
}

The message transmission format is written into the stream inside redis in the form of json string.

package org.idea.mq.redis.framework.producer;


import org.idea.mq.redis.framework.redis.IRedisService;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;


/**
 * @Author linhao
 * @Date created in 12:19 2022 / 2 / 10 pm
 */
public class StreamProducer implements IStreamProducer{

    @Resource
    private IRedisService iRedisService;

    @Override
    public void sendMsg(String streamName,String json){
        Map<String,String> map = new HashMap<>();
        map.put("json",json);
        iRedisService.xAdd(streamName,map);
    }

}

Note that when writing to the bottom layer, I use the ID serial number automatically generated in Redis. The code is as follows:

@Override
public boolean xAdd(String streamName, Map<String, String> stringMap) {
    try (Jedis jedis = iRedisFactory.getConnection()) {
        jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, stringMap);
        return true;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

In order to use it as a SpringBoot starter component for external teams, we can package it as a starter component:

Component testing

Point to point sending test

Establish two sets of microservice projects, user service and order service. User service deploys two service nodes, both of which belong to user service group. Order service also needs to deploy two service nodes, both of which belong to order service group.
Finally, two micro service clusters publish the messages subscribed by each other to see whether they can be accepted normally, and only one node in the same group receives messages at a time.

Broadcast transmission test

Using the user service module built before, deploy four nodes and subscribe to the same stream queue, but set their groupName to different attributes. Finally, publish a message and check that all four nodes can receive normally.

For specific details, a test template has been established in the existing project. Interested friends can read the part of MQ redis test module.

Problem thinking

Why does the same StreamName require dual thread consumption?

A thread is used to accept the normal data inside the Stream. If the business is processed normally, it will be returned as an ACK signal to confirm that the message has been consumed successfully. If an exception occurs during processing, the ACK signal will not be returned. At this time, Redis will put the message into the Pending queue, and the second thread is dedicated to processing the data in the Pending queue. If the second consumption of messages in Pending status still fails, regular polling will be conducted.

Is delayed retry supported

In fact, the current design has always had shortcomings. For example, when the message consumption is abnormal, it will enter polling. In serious cases, it may lead to an endless cycle of message consumption and keep blocking. For the time being, it has not realized the function of regularly delivering consumption failure messages at intervals of 1, 3, 5... Minutes similar to RocketMQ. Interested partners can make simple modifications based on existing code.

This article complete code case address

https://gitee.com/IdeaHome_admin/mq-framework

Keywords: Java Database Redis Distribution Middleware

Added by Hellusius on Sun, 13 Feb 2022 16:37:18 +0200