The author has written many articles on spring data redis and lettuce:
- The new monitoring method of Redis connection pool is not poked ~ I'll add a little more seasoning
- The spring data redis connection leaks. I'm stupid
- Spring data redis dynamically switches data sources
- Millions of QPS in spring data redis are under too much pressure. The connection fails. I'm stupid
Recently, in private letters and messages, netizens mentioned that spring data redis and lettuce are used together. After capturing the package, the pipeline does not take effect. How can this configuration take effect?
Firstly, in the above article, we analyzed the basic principle of spring data redis + lattice. In this environment, the internal connections used by RedisTemplate include:
- asyncSharedConn: can be blank. If connection sharing is enabled, it is not blank. It is enabled by default; Redis connections shared by all letticconnections are actually the same connection for each letticconnection; It is used to execute simple commands. Because of the single processing thread feature of Netty client and redis, it is also fast to share the same connection. If connection sharing is not enabled, this field is empty. Use asyncDedicatedConn to execute the command.
- asyncDedicatedConn: private connection. If you need to maintain a session, execute a transaction, and use the Pipeline command and fixed connection, you must use this asyncDedicatedConn to execute the Redis command.
execute(RedisCallback). The process is:

For executePipelined(RedisCallback), if it is used correctly, it will be executed using the asyncDedicatedConn private connection. So how to use it correctly?
Redis calls need to be made using the callback connection. redisTemplate cannot be used directly. Otherwise, the pipeline will not take effect:
Pipeline effective:
List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { connection.get("test".getBytes()); connection.get("test2".getBytes()); return null; } });
Pipeline does not take effect:
List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { redisTemplate.opsForValue().get("test"); redisTemplate.opsForValue().get("test2"); return null; } });
In this way, we can ensure that the API layer uses Pipeline correctly, but under the default configuration, the underlying layer still does not execute Pipeline. What's the matter?
Redis Pipeline is similar to AutoFlushCommands in lattice
Redis Pipeline is a batch operation in redis. It can assemble a group of redis commands, transmit them to redis at one time and return the result set, which greatly reduces the RTT time required for individual transmission of commands (including the time for redis client, redis server switching, system calling, sending and receiving data, and network transmission time).
If the original command is sent as follows:
Client -> Server: INCR X\r\n Server -> Client: 1 Client -> Server: INCR X\r\n Server -> Client: 2 Client -> Server: INCR X\r\n Server -> Client: 3 Client -> Server: INCR X\r\n Server -> Client: 4
After using PIPELINE, the command is sent like this
Client -> Server: INCR X\r\nINCR X\r\nINCR X\r\nINCR X\r\n Server -> Client: 1\r\n2\r\n3\r\n4
We can see that its principle is that the client first splices all commands together, then caches them locally, and then sends them to the server uniformly. After the server executes all commands, it responds uniformly.
The lettue connection has an AutoFlushCommands configuration, which means that the commands executed on the connection are sent to the server. The default is false, that is, a command is sent to the server after receiving it. If the configuration is false, all commands will be cached. When manually calling flush commands, the cached commands will be sent to the server, which actually implements Pipeline.
Configure spring data redis + lattice to use Pipeline
Spring data redis from 2.3 Since version 0, lettue is also compatible with Pipeline configuration. Refer to:
- DATAREDIS-1011 - Allow configuration of Lettuce pipelining flush behavior
- https://github.com/spring-projects/spring-data-redis/issues/1581
We can configure it as follows:
@Bean public BeanPostProcessor lettuceConnectionFactoryBeanProcessor() { return new BeanPostProcessor() { @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { //After the Bean letticconnectionfactory is initialized, set the pipelining flushpolicy to flushOnClose if (bean instanceof LettuceConnectionFactory) { LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) bean; lettuceConnectionFactory.setPipeliningFlushPolicy(LettuceConnection.PipeliningFlushPolicy.flushOnClose()); } return bean; } }; }
Let's look at the source code of pipelining flushpolicy to know the meaning of flushOnClose:
public interface PipeliningFlushPolicy { //In fact, each command is sent directly to Redis Server by default static PipeliningFlushPolicy flushEachCommand() { return FlushEachCommand.INSTANCE; } //When the connection is closed, send the command to Redis together static PipeliningFlushPolicy flushOnClose() { return FlushOnClose.INSTANCE; } //Manually set the number of commands that will be sent to Redis after the connection is closed static PipeliningFlushPolicy buffered(int bufferSize) { return () -> new BufferedFlushing(bufferSize); } }
These three classes also implement the PipeliningFlushState interface:
public interface PipeliningFlushState { //For executePipelined, connection. Is called at the beginning openPipeline(); When you open the pipeline, this method will be called void onOpen(StatefulConnection<?, ?> connection); //This method is called for each command in executePipelined void onCommand(StatefulConnection<?, ?> connection); //Connection. Is called at the end of executePipelined This method will be called in closepipeline() void onClose(StatefulConnection<?, ?> connection); }
By default, each command is sent directly to Redis Server. The implementation is: in fact, nothing is done in the method.
private enum FlushEachCommand implements PipeliningFlushPolicy, PipeliningFlushState { INSTANCE; @Override public PipeliningFlushState newPipeline() { return INSTANCE; } @Override public void onOpen(StatefulConnection<?, ?> connection) {} @Override public void onCommand(StatefulConnection<?, ?> connection) {} @Override public void onClose(StatefulConnection<?, ?> connection) {} }
For flush onclose:
private enum FlushOnClose implements PipeliningFlushPolicy, PipeliningFlushState { INSTANCE; @Override public PipeliningFlushState newPipeline() { return INSTANCE; } @Override public void onOpen(StatefulConnection<?, ?> connection) { //First, configure the connected AutoFlushCommands to false, so that the commands will not be sent to Redis immediately connection.setAutoFlushCommands(false); } @Override public void onCommand(StatefulConnection<?, ?> connection) { //Do nothing when ordered } @Override public void onClose(StatefulConnection<?, ?> connection) { //Send all commands when pipeline is closed connection.flushCommands(); //Restore the default configuration so that if the connection is returned to the connection pool, subsequent use will not be affected connection.setAutoFlushCommands(true); } }
For buffered:
private static class BufferedFlushing implements PipeliningFlushState { private final AtomicLong commands = new AtomicLong(); private final int flushAfter; public BufferedFlushing(int flushAfter) { this.flushAfter = flushAfter; } @Override public void onOpen(StatefulConnection<?, ?> connection) { //First, configure the connected AutoFlushCommands to false, so that the commands will not be sent to Redis immediately connection.setAutoFlushCommands(false); } @Override public void onCommand(StatefulConnection<?, ?> connection) { //If the number of commands reaches the specified number, it will be sent to Redis if (commands.incrementAndGet() % flushAfter == 0) { connection.flushCommands(); } } @Override public void onClose(StatefulConnection<?, ?> connection) { //Send all commands when pipeline is closed connection.flushCommands(); //Restore the default configuration so that if the connection is returned to the connection pool, subsequent use will not be affected connection.setAutoFlushCommands(true); } }