RedisTemplate implements message queuing and inserts data in batches.

In the early stage, due to the small business volume of the production environment. So logs are commit ted one by one. There was no problem with the operation.

Later, with the expansion of business concurrency, the log writing frequently deals with the database, resulting in the database connection pool often occupied, until the program crashes.

Because the log does not require a real-time response. So consider using asynchronous + batch submission.

In order to relieve the jvm memory pressure. Use redis as message queue (because the original project has integrated redis, the company does not want to use other mq to increase maintenance costs).

So I found a spring boot integrated redistemplate on the Internet as the information of message queuing. A little change.

reference material: https://blog.csdn.net/qq_38553333/article/details/82833273

 

First, redisConfig.

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;


@Configuration
@EnableCaching //Open comment
public class RedisConfig {
    /**
     * retemplate Related configuration
     * @param factory
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {

        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // Configure connection factory
        template.setConnectionFactory(factory);

        //Use Jackson2JsonRedisSerializer to serialize and deserialize the value value of redis (JDK is used by default)
        Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om = new ObjectMapper();
        // Specify the domain to be serialized, field,get and set, and modifier range. ANY includes private and public
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // Specify the type of serialized input. The class must be non final decorated. The final decorated class, such as string and integer, will run out of exception
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jacksonSeial.setObjectMapper(om);

        // Value is serialized in json
        template.setValueSerializer(jacksonSeial);
        //Use StringRedisSerializer to serialize and deserialize the key value of redis
        template.setKeySerializer(new StringRedisSerializer());

        // Set hash key and value serialization mode
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(jacksonSeial);
        template.afterPropertiesSet();

        return template;
    }


    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        return container;
    }
}

  

Message entity

import com.alibaba.fastjson.JSON;
import lombok.Data;

import java.util.UUID;

@Data
public class Message {
    private String id;
    private Integer retryCount;
    private String content;
    private Integer status;
    private String topic;

    public Message() {
    }

    public Message(String topic, Object object) {
        this.id = UUID.randomUUID().toString().replace("-", "");
        this.retryCount = 0;
        this.content = JSON.toJSONString(object);
        this.status = 0;
        this.topic = topic;
    }
}

  

Redis subscription management adopts observer mode.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

@Component
public class TopicSubscriber {
    private final Map<String, Set<String>> subscriberMap = new HashMap();

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
  // Observer mode realizes consumer registration.
    public Boolean addConsumer(String topic, String consumer) {
        Set<String> consumerList = subscriberMap.get(topic);

        if (consumerList == null) {
            consumerList = new HashSet<>();
        }
        Boolean b = consumerList.add(consumer);
        subscriberMap.put(topic, consumerList);
        return b;
    }

    public Boolean removeConsumer(String topic, String comsumer) {
        Set<String> consumerList = subscriberMap.get(topic);
        Boolean b = false;
        if (consumerList != null) {
            b = consumerList.remove(comsumer);
            subscriberMap.put(topic, consumerList);
        }
        return b;
    }
  //Message broadcast
    public void broadcast(String topic, String id) {
        if (subscriberMap.get(topic) != null) {
            for (String consumer : subscriberMap.get(topic)) {
                String key = String.join("_", topic, consumer, id);
                if (!redisTemplate.hasKey("fail_" + key)) {
                    redisTemplate.opsForValue().set(key, id);
                    redisTemplate.opsForList().leftPush(topic + "_" + consumer, topic);
                }
            }
        }

    }
}

  

Then Redis publisher

import com.alibaba.fastjson.JSON;
import com.redis.mq.subscriber.TopicSubscriber;
import io.netty.util.CharsetUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class RedisPublisher {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    TopicSubscriber subscriber;

    @PostConstruct
    public void init() throws Exception {
        // todo test thread
        /*new Thread(() -> {
            int count = 0;
            try {
                Thread.sleep(3000l);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            while (count < 14) {
                try {
                    Thread.sleep(100l);
                    Generate generate = new Generate();
                    generate.setIdNo("" + count);
                    this.publish("GenerateLog", generate);
                    count++;
                } catch (Exception e) {
                }
            }
        }).start();*/
    }
  
    public void publish(String topic, Object content) {  //News to redis
        Message message = new Message(topic, content);
        subscriber.broadcast(topic, message.getId());
        redisTemplate.getConnectionFactory().getConnection().publish(
                topic.getBytes(CharsetUtil.UTF_8), JSON.toJSONString(message).getBytes()
        );
    }
}

  

Redis consumer. Just implement onMessage of MessageListener. For easy extension, generics are used here.

import com.alibaba.fastjson.JSON;
import com.cache.redis.mq.subscriber.TopicSubscriber;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import java.lang.reflect.ParameterizedType;
import java.util.concurrent.TimeUnit;

public abstract class RedisListener<T> implements MessageListener {

    @Autowired
    protected RedisTemplate<String, Object> redisTemplate;

    @Autowired
    protected RedisMessageListenerContainer messageListenerContainer;

    @Autowired
    protected TopicSubscriber subscriber;

    @Override
    public void onMessage(org.springframework.data.redis.connection.Message message, byte[] bytes) {
        String name = this.getClass().getSimpleName();
        String topic = new String(message.getChannel());
        String content = new String(message.getBody());
        Message m = JSON.parseObject(content, Message.class);
        String key = String.join("_", topic, name, m.getId());

        Object b = redisTemplate.opsForList().rightPop(topic + "_" + name);
        if (b != null && b.equals(m.getTopic())) {
            T t = JSON.parseObject(m.getContent(),
                    ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]);
            handler(t);  // Processing redis messages.
            // set data expire. Use the expire interface of redis to directly discard the consumed data.
            redisTemplate.expire(key, 1, TimeUnit.NANOSECONDS);
        } else {
            // todo retry
            redisTemplate.opsForValue().set("fail_" + key, content);
        }
    }

    protected abstract void handler(T t);
}

 

Here, the basic redisMq is almost the same. Specific business and batch insertion are involved below.

First, add a loghandler interface.

public interface LogHandler {
    void process();
}

 

Write an abstract class to inherit RedisListener and implement loghandler. Here we use put and poll blocking queues of redis.

Because of the use of mybatisplus and do not want to rewrite mybatis foreach batch query statement. Therefore, I am lazy here and use the single precompile and batch commit of the sqlsession of mybatis directly.

import com.cache.redis.mq.RedisListener;
import com.server.log.store.LogStore;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.listener.ChannelTopic;

import javax.annotation.PostConstruct;
import java.lang.reflect.ParameterizedType;
import java.util.List;

@Slf4j
public abstract class AbstractLogHandler<T, M> extends RedisListener<T> implements LogHandler {

    @Autowired
    SqlSessionFactory factory;

    @PostConstruct
    public void addListener() {
        messageListenerContainer.addMessageListener(this, new ChannelTopic(getTopic()));
        subscriber.addConsumer(getTopic(), this.getClass().getSimpleName());
        process();
    }

    @Override
    protected void handler(T t) {
        getStore().put(t);  //Block until new writes can be made. You can actually add a timeout here. Avoid blocking all the time.
    }

    protected abstract String getTopic();

    protected abstract LogStore<T> getStore();

    protected void commit(List<T> data) {
        if (data == null || data.isEmpty()) return;
        SqlSession session = factory.openSession(ExecutorType.BATCH);
        try {
            M mapper = session.getMapper(
                    (Class<M>) (((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[1])
            );
            save(data, mapper);
            session.commit();
        } catch (Exception e) {
            log.error(String.format("topic %s Data bulk write failed.{}", getTopic()), e);
            session.rollback();
        }finally {
            session.close();
        }

        data.forEach(o -> o = null);
        data.clear();
    }

    protected abstract void save(List<T> data, M m);
}

  

LogStore block queue

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

@Slf4j
public class LogStore<T> {

    private static final Integer QUEUE_CAPACITY = 10000;

    private BlockingQueue<T> logQueue;

    public LogStore() {
        this(QUEUE_CAPACITY);
    }

    public LogStore(int capacity) {
        this.logQueue = new LinkedBlockingQueue<>(capacity);
    }

    public void put(T t) {
        try {
            logQueue.put(t);
        } catch (InterruptedException e) {
            log.info("logStore put exception:{}", e);
        }
    }

    public T poll(long seconds) {
        try {
            return logQueue.poll(seconds, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            return null;
        }
    }
}

  

So far, the basic business code is almost written. Then let's see how to write specific business processing classes.

For example, for our registration log, we only need to implement the abstract class abstractloghandler

import comcommon.constant.Constant;
import com.common.po.RegLog;
import com.dao.mapper.RegLogMapper;
import com.server.log.store.LogStore;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Component
public class RegisterLogHandler extends AbstractLogHandler<RegLog, RegLogMapper> {
    private final LogStore<RegLog> store = new LogStore<>();
    private String topic = Constant.TOPIC_REGISTER_LOG;
    // todo configurable
    private final Integer batchSize = 300;
    private final Integer waitSeconds = 2;

    ExecutorService executor = Executors.newSingleThreadExecutor();

    @Override
    protected String getTopic() {
        return this.topic;
    }

    @Override
    protected LogStore<RegLog> getStore() {
        return this.store;
    }

    @Override
    public void process() {
        executor.execute(() -> {    //Turn on the thread to poll data from redis.
            List<RegLog> data = new ArrayList<>(batchSize);
            while (true) {
                RegLog generate = this.store.poll(waitSeconds);
                if (generate != null) {
                    if (data.size() >= batchSize) {
                        commit(data);
                    }

                    data.add(generate);
                } else {  //Insufficient batchSize tail data is processed.
                    if (data.size() > 0) {
                        commit(data);
                    }
                }
            }
        });
    }

    @Override
    protected void save(List<RegLog> data, RegLogMapper mapper) {
        data.forEach(o -> {
            if (o.getRegNo() == null) {
                String genNo = UUID.randomUUID().toString();
                o.setRegNo(genNo);
            }
            mapper.insert(o);  //Because I don't want to write the foreach statement of mybatis. So here we use the insert single statement of mybatisplus directly. There is no commit for sqlsession
        });
    }
}

  

Call:

@Autowired
    protected RedisPublisher publisher;


publisher.publish(Constant.TOPIC_REGISTER_LOG, log);

Keywords: Java Redis JSON Session

Added by blyz on Fri, 22 May 2020 09:40:09 +0300