In the early stage, due to the small business volume of the production environment. So logs are commit ted one by one. There was no problem with the operation.
Later, with the expansion of business concurrency, the log writing frequently deals with the database, resulting in the database connection pool often occupied, until the program crashes.
Because the log does not require a real-time response. So consider using asynchronous + batch submission.
In order to relieve the jvm memory pressure. Use redis as message queue (because the original project has integrated redis, the company does not want to use other mq to increase maintenance costs).
So I found a spring boot integrated redistemplate on the Internet as the information of message queuing. A little change.
reference material: https://blog.csdn.net/qq_38553333/article/details/82833273
First, redisConfig.
import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.*; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration @EnableCaching //Open comment public class RedisConfig { /** * retemplate Related configuration * @param factory * @return */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); // Configure connection factory template.setConnectionFactory(factory); //Use Jackson2JsonRedisSerializer to serialize and deserialize the value value of redis (JDK is used by default) Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); // Specify the domain to be serialized, field,get and set, and modifier range. ANY includes private and public om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // Specify the type of serialized input. The class must be non final decorated. The final decorated class, such as string and integer, will run out of exception om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jacksonSeial.setObjectMapper(om); // Value is serialized in json template.setValueSerializer(jacksonSeial); //Use StringRedisSerializer to serialize and deserialize the key value of redis template.setKeySerializer(new StringRedisSerializer()); // Set hash key and value serialization mode template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(jacksonSeial); template.afterPropertiesSet(); return template; } @Bean public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); return container; } }
Message entity
import com.alibaba.fastjson.JSON; import lombok.Data; import java.util.UUID; @Data public class Message { private String id; private Integer retryCount; private String content; private Integer status; private String topic; public Message() { } public Message(String topic, Object object) { this.id = UUID.randomUUID().toString().replace("-", ""); this.retryCount = 0; this.content = JSON.toJSONString(object); this.status = 0; this.topic = topic; } }
Redis subscription management adopts observer mode.
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @Component public class TopicSubscriber { private final Map<String, Set<String>> subscriberMap = new HashMap(); @Autowired private RedisTemplate<String, Object> redisTemplate; // Observer mode realizes consumer registration. public Boolean addConsumer(String topic, String consumer) { Set<String> consumerList = subscriberMap.get(topic); if (consumerList == null) { consumerList = new HashSet<>(); } Boolean b = consumerList.add(consumer); subscriberMap.put(topic, consumerList); return b; } public Boolean removeConsumer(String topic, String comsumer) { Set<String> consumerList = subscriberMap.get(topic); Boolean b = false; if (consumerList != null) { b = consumerList.remove(comsumer); subscriberMap.put(topic, consumerList); } return b; } //Message broadcast public void broadcast(String topic, String id) { if (subscriberMap.get(topic) != null) { for (String consumer : subscriberMap.get(topic)) { String key = String.join("_", topic, consumer, id); if (!redisTemplate.hasKey("fail_" + key)) { redisTemplate.opsForValue().set(key, id); redisTemplate.opsForList().leftPush(topic + "_" + consumer, topic); } } } } }
Then Redis publisher
import com.alibaba.fastjson.JSON; import com.redis.mq.subscriber.TopicSubscriber; import io.netty.util.CharsetUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component public class RedisPublisher { @Autowired private RedisTemplate<String, Object> redisTemplate; @Autowired TopicSubscriber subscriber; @PostConstruct public void init() throws Exception { // todo test thread /*new Thread(() -> { int count = 0; try { Thread.sleep(3000l); } catch (InterruptedException e) { e.printStackTrace(); } while (count < 14) { try { Thread.sleep(100l); Generate generate = new Generate(); generate.setIdNo("" + count); this.publish("GenerateLog", generate); count++; } catch (Exception e) { } } }).start();*/ } public void publish(String topic, Object content) { //News to redis Message message = new Message(topic, content); subscriber.broadcast(topic, message.getId()); redisTemplate.getConnectionFactory().getConnection().publish( topic.getBytes(CharsetUtil.UTF_8), JSON.toJSONString(message).getBytes() ); } }
Redis consumer. Just implement onMessage of MessageListener. For easy extension, generics are used here.
import com.alibaba.fastjson.JSON; import com.cache.redis.mq.subscriber.TopicSubscriber; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import java.lang.reflect.ParameterizedType; import java.util.concurrent.TimeUnit; public abstract class RedisListener<T> implements MessageListener { @Autowired protected RedisTemplate<String, Object> redisTemplate; @Autowired protected RedisMessageListenerContainer messageListenerContainer; @Autowired protected TopicSubscriber subscriber; @Override public void onMessage(org.springframework.data.redis.connection.Message message, byte[] bytes) { String name = this.getClass().getSimpleName(); String topic = new String(message.getChannel()); String content = new String(message.getBody()); Message m = JSON.parseObject(content, Message.class); String key = String.join("_", topic, name, m.getId()); Object b = redisTemplate.opsForList().rightPop(topic + "_" + name); if (b != null && b.equals(m.getTopic())) { T t = JSON.parseObject(m.getContent(), ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]); handler(t); // Processing redis messages. // set data expire. Use the expire interface of redis to directly discard the consumed data. redisTemplate.expire(key, 1, TimeUnit.NANOSECONDS); } else { // todo retry redisTemplate.opsForValue().set("fail_" + key, content); } } protected abstract void handler(T t); }
Here, the basic redisMq is almost the same. Specific business and batch insertion are involved below.
First, add a loghandler interface.
public interface LogHandler { void process(); }
Write an abstract class to inherit RedisListener and implement loghandler. Here we use put and poll blocking queues of redis.
Because of the use of mybatisplus and do not want to rewrite mybatis foreach batch query statement. Therefore, I am lazy here and use the single precompile and batch commit of the sqlsession of mybatis directly.
import com.cache.redis.mq.RedisListener; import com.server.log.store.LogStore; import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.session.ExecutorType; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.listener.ChannelTopic; import javax.annotation.PostConstruct; import java.lang.reflect.ParameterizedType; import java.util.List; @Slf4j public abstract class AbstractLogHandler<T, M> extends RedisListener<T> implements LogHandler { @Autowired SqlSessionFactory factory; @PostConstruct public void addListener() { messageListenerContainer.addMessageListener(this, new ChannelTopic(getTopic())); subscriber.addConsumer(getTopic(), this.getClass().getSimpleName()); process(); } @Override protected void handler(T t) { getStore().put(t); //Block until new writes can be made. You can actually add a timeout here. Avoid blocking all the time. } protected abstract String getTopic(); protected abstract LogStore<T> getStore(); protected void commit(List<T> data) { if (data == null || data.isEmpty()) return; SqlSession session = factory.openSession(ExecutorType.BATCH); try { M mapper = session.getMapper( (Class<M>) (((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[1]) ); save(data, mapper); session.commit(); } catch (Exception e) { log.error(String.format("topic %s Data bulk write failed.{}", getTopic()), e); session.rollback(); }finally { session.close(); } data.forEach(o -> o = null); data.clear(); } protected abstract void save(List<T> data, M m); }
LogStore block queue
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @Slf4j public class LogStore<T> { private static final Integer QUEUE_CAPACITY = 10000; private BlockingQueue<T> logQueue; public LogStore() { this(QUEUE_CAPACITY); } public LogStore(int capacity) { this.logQueue = new LinkedBlockingQueue<>(capacity); } public void put(T t) { try { logQueue.put(t); } catch (InterruptedException e) { log.info("logStore put exception:{}", e); } } public T poll(long seconds) { try { return logQueue.poll(seconds, TimeUnit.SECONDS); } catch (InterruptedException e) { return null; } } }
So far, the basic business code is almost written. Then let's see how to write specific business processing classes.
For example, for our registration log, we only need to implement the abstract class abstractloghandler
import comcommon.constant.Constant; import com.common.po.RegLog; import com.dao.mapper.RegLogMapper; import com.server.log.store.LogStore; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Component public class RegisterLogHandler extends AbstractLogHandler<RegLog, RegLogMapper> { private final LogStore<RegLog> store = new LogStore<>(); private String topic = Constant.TOPIC_REGISTER_LOG; // todo configurable private final Integer batchSize = 300; private final Integer waitSeconds = 2; ExecutorService executor = Executors.newSingleThreadExecutor(); @Override protected String getTopic() { return this.topic; } @Override protected LogStore<RegLog> getStore() { return this.store; } @Override public void process() { executor.execute(() -> { //Turn on the thread to poll data from redis. List<RegLog> data = new ArrayList<>(batchSize); while (true) { RegLog generate = this.store.poll(waitSeconds); if (generate != null) { if (data.size() >= batchSize) { commit(data); } data.add(generate); } else { //Insufficient batchSize tail data is processed. if (data.size() > 0) { commit(data); } } } }); } @Override protected void save(List<RegLog> data, RegLogMapper mapper) { data.forEach(o -> { if (o.getRegNo() == null) { String genNo = UUID.randomUUID().toString(); o.setRegNo(genNo); } mapper.insert(o); //Because I don't want to write the foreach statement of mybatis. So here we use the insert single statement of mybatisplus directly. There is no commit for sqlsession }); } }
Call:
@Autowired protected RedisPublisher publisher; publisher.publish(Constant.TOPIC_REGISTER_LOG, log);