Lettuce source code Analysis-1 [synchronous operation]

1, Foreword

  recently, due to work requirements, we need to understand some implementation principles of lettuce. As can be seen from the directory of the official documents, lettuce implements a wealth of business functions, supports three execution modes: synchronous blocking, Future and Reactive, and supports the use of connection pool technology relying on the third-party connection pool component common-pool2, but the official does not recommend the use of connection pool. The underlying communication is based on netty. With such a rich implementation, we can imagine the complexity of the origin code. It's really hard to see the source code of lettuce for the first time.

  based on the execution mode of lettuce synchronous blocking, this paper reads and analyzes the source code. Later, we will continue to learn the source code implementation of other execution modes.

  in fact, before this article, I have tested the performance of various connection pool components, including jedisPool, reisson and lettuce [using connection pool and not using pool in synchronous mode]. The test results are beyond my expectation. The performance of lettuce in the synchronous mode is the best. When lettuce uses the connection pool, the performance will decline by about 6%. The worst performance is redison. Of course, I don't rule out my improper use (encapsulating the redis operation into a restful interface and testing it through the pressure test http interface). I will continue to verify it later.

2, Several core classes of lettue

  several core classes of lettuce include the main functional logic of lettuce, mainly RedisURI, RedisClient, StatefulRedisConnectionImpl and RedisCommands.

First take the basic usage on the official website and put a use case:

// Create a RedisClient instance and point to the redis service with IP localhost. The default port is 6379
RedisClient client = RedisClient.create("redis://localhost");          

// Based on the above client, open a single node connection
StatefulRedisConnection<String, String> connection = client.connect(); 

// Get command API for synchronous execution
RedisCommands<String, String> commands = connection.sync();            

// Execute GET command
String value = commands.get("foo");                                    

...
// Close the connection. This operation is generally used when the application is destroyed. The connection designed by lettuce is a long connection
connection.close();
                                                    
//Close the client instance and release threads and other resources. This operation is usually performed when the application is destroyed
client.shutdown();                                                     

(1) RedisURI

  defines the IP, port, timeout (60s by default), operation database and security authentication of the redis service to be connected;
The source code of RedisURI is relatively simple. The main method is to set some member properties.

  • Example 1 (use host and port, and set the default timeout to 20 seconds):

    RedisClient client = RedisClient.create(RedisURI.create("localhost", 6379));
    client.setDefaultTimeout(20, TimeUnit.SECONDS);
    
    // ...
    
    client.shutdown();
  • Example 2 (build a client using RedisURI):

    RedisURI redisUri = RedisURI.Builder.redis("localhost")
                                  .withPassword("authentication")
                                  .withDatabase(2)
                                  .build();
    RedisClient client = RedisClient.create(redisUri);
    
    // ...
    
    client.shutdown();

    See official documents for other details.

(2) RedisClient

  RedisClient inherits AbstractRedisClient. It is a scalable thread safe RedisClient and supports the three execution modes mentioned above: synchronous, asynchronous and reactive. Multiple threads share a TCP connection.
  in addition, RedisClusterClient is used to implement the client of redis cluster mode. MasterSlave [5.2 followed by MasterReplica] implements the client of redis master-slave or sentinel cluster mode.
  RedisClient is a very important resource. While creating the instance, the underlying communication infrastructure based on netty will be initialized, including the creation of various thread pools, such as the EventLoopGroup of netty. Therefore, try to reuse RedisClient instances or share the same ClientResources instance.

In fact, RedisClient holds a reference to a ClientResources instance. When creating an instance, RedisClient will call the initialization method of the parent class AbstractRedisClient to create an instance object of the implementation class DefaultClientResources of ClientResources, and the initialization operation of netty is implemented in the construction method of DefaultClientResources.

  • Construction method of RedisClient

    // Construction method of RedisClient
    protected RedisClient(ClientResources clientResources, RedisURI redisURI) {
          // Call the initialization method of the parent class
          super(clientResources);
    
          assertNotNull(redisURI);
    
          this.redisURI = redisURI;
          setDefaultTimeout(redisURI.getTimeout());
      }
  • Properties and construction method of AbstractRedisClient

    public abstract class AbstractRedisClient {
    
      protected static final PooledByteBufAllocator BUF_ALLOCATOR = PooledByteBufAllocator.DEFAULT;
    
      protected static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisClient.class);
    
      protected final Map<Class<? extends EventLoopGroup>, EventLoopGroup> eventLoopGroups = new ConcurrentHashMap<>(2);
    
      protected final ConnectionEvents connectionEvents = new ConnectionEvents();
    
      protected final Set<Closeable> closeableResources = ConcurrentHashMap.newKeySet();
    
      protected final EventExecutorGroup genericWorkerPool;
    
      protected final HashedWheelTimer timer;
    
      protected final ChannelGroup channels;
      // Object reference holding ClientResources
      protected final ClientResources clientResources;
    
      protected volatile ClientOptions clientOptions = ClientOptions.builder().build();
    
      protected Duration timeout = RedisURI.DEFAULT_TIMEOUT_DURATION;
    
      private final boolean sharedResources;
    
      private final AtomicBoolean shutdown = new AtomicBoolean();
    
      /**
       * Create a new instance with client resources.
       *
       * @param clientResources the client resources. If {@code null}, the client will create a new dedicated instance of client
       *        resources and keep track of them.
       */
      protected AbstractRedisClient(ClientResources clientResources) {
    
          if (clientResources == null) {
              sharedResources = false;
              // Create an instance object of ClientResources
              this.clientResources = DefaultClientResources.create();
          } else {
              sharedResources = true;
              // Use an external ClientResources instance object
              this.clientResources = clientResources;
          }
    
          genericWorkerPool = this.clientResources.eventExecutorGroup();
          channels = new DefaultChannelGroup(genericWorkerPool.next());
          timer = (HashedWheelTimer) this.clientResources.timer();
      }
    ...
    }

(3) StatefulRedisConnectionImpl

  thread safe redis connection. StatefulRedisConnectionImpl maintains a tcp connection at the bottom, and multiple threads share a connection object. At the same time, there will be a ConnectionWatchdog [ChannelInboundHandlerAdapter inheriting netty] to maintain the connection and realize disconnection and reconnection.

  • ConnectionWatchdog part of the source code:

    @ChannelHandler.Sharable
    public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
    ...
      @Override
      public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    
          logger.debug("{} channelInactive()", logPrefix());
          if (!armed) {
              logger.debug("{} ConnectionWatchdog not armed", logPrefix());
              return;
          }
    
          channel = null;
    
          if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {
              // Reconnect when the channel is unavailable or inactive
              scheduleReconnect();
          } else {
              logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);
          }
    
          super.channelInactive(ctx);
      }
    ...
    }

  StatefulRedisConnectionImpl instance is the first time redisclient is called Created when the connect method is used.

// Based on the above client, open a single node connection
StatefulRedisConnection<String, String> connection = client.connect();

The process of StatefulRedisConnectionImpl's first creation is roughly as follows:

RedisClient.connect --> RedisClient.connectStandaloneAsync
[new DefaultEndpoint()[Create a closeFuture,CompletableFuture object] 
    --> RedisClient.newStatefulRedisConnection
        [(StatefulRedisConnectionImpl inherit RedisChannelHandler)
        --> StatefulRedisConnectionImpl In the initialization method, four components are initialized
        this.codec = codec;
        this.async = newRedisAsyncCommandsImpl();
        this.sync = newRedisSyncCommandsImpl(); // Proxy object
        this.reactive = newRedisReactiveCommandsImpl();
        ] 
    --> newly build ConnectionFuture connectStatefulAsync
     [initialization CommandHandler(Inherit from ChannelDuplexHandler,belong to netty Class) 
      --> RedisClient.getConnectionBuilder Build new ConnectionBuilder 
      --> AbstractRedisClient.connectionBuilder(structure netty bootstrap) 
      --> connectionBuilder.connection take StatefulRedisConnectionImpl Set to ConnectionBuilder of connection attribute 
      --> AbstractRedisClient.initializeChannelAsync「establish netty of channel,newly build socketAddressFuture and channelReadyFuture,All CompletableFuture type」
      --> establish future(DefaultConnectionFuture Type, asynchronous NEW StatefulRedisConnectionImpl Object)
      ]
    --> return future(DefaultConnectionFuture (type)
] 

--> AbstractRedisClient.getConnection[call connectionFuture.get()obtain StatefulRedisConnectionImpl [object connection object]

(4) RedisCommand

  a redis command object holds an output (CommandOutput type object, which saves the returned content of the redis service), arguments (command content to be sent to the redis service), status (status, identifying a command operation: initialization, completion, cancellation)

Taking the get operation on the official website above as an example, the Command object instance is as follows (the Command to be executed intercepted in the debug process):

Command(ProtocolKeyword type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) {
        LettuceAssert.notNull(type, "Command type must not be null");
        this.type = type; // GET
        this.output = output; // StatusOutput
        this.args = args; // Actual sending content: [buffer = $8thread-2 $23this is thread-2]
    }

3, Synchronous execution core source code

  synchronous execution code example:

// Get command API for synchronous execution
RedisCommands<String, String> commands = connection.sync(); 

String value = commands.get("foo");           

(1) Proxy object

  connection. The sync () method returns the sync property value of StatefulRedisConnectionImpl object. StatefulRedisConnectionImpl will set the value during initialization. The method is as follows:

    /**
     * Initialize a new connection.
     *
     * @param writer the channel writer
     * @param codec Codec used to encode/decode keys and values.
     * @param timeout Maximum time to wait for a response.
     */
    public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V> codec, Duration timeout) {

        super(writer, timeout);

        this.codec = codec;
        this.async = newRedisAsyncCommandsImpl();
        this.sync = newRedisSyncCommandsImpl();
        this.reactive = newRedisReactiveCommandsImpl();
    }

The newRedisSyncCommandsImpl method returns a proxy object. The source code is as follows:

    /**
     * Create a new instance of {@link RedisCommands}. Can be overriden to extend.
     *
     * @return a new instance.
     */
    protected RedisCommands<K, V> newRedisSyncCommandsImpl() {
        return syncHandler(async(), RedisCommands.class, RedisClusterCommands.class);
    }

syncHandler source code:

protected <T> T syncHandler(Object asyncApi, Class<?>... interfaces) {
        FutureSyncInvocationHandler h = new FutureSyncInvocationHandler((StatefulConnection<?, ?>) this, asyncApi, interfaces);
        return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h);
    }

(2) The process of performing a synchronization operation

  commands.get("foo"); When this method is executed, the handleInvocation method of the proxy object will be executed to call the method actually to be executed by the proxy object. The specific process is as follows:

io.lettuce.core.internal.AbstractInvocationHandler.invoke 
  -> FutureSyncInvocationHandler.handleInvocation(Object result = targetMethod.invoke(asyncApi, args);) 
    -> AbstractRedisAsyncCommands.get[return RedisFuture [type object] 
    -> RedisCommandBuilder.get[structure redis [command object] 
    -> AbstractRedisAsyncCommands.dispatch() [new AsyncCommand<>(cmd),Will be ordinary Command Object encapsulation AsyncCommand [object] 
    -> StatefulRedisConnectionImpl.dispatch() 
    -> StatefulRedisConnectionImpl.preProcessCommand[1,First, judge whether security verification is required. 2. Whether to select custom library. 3. Whether to use read-only mode. 4. Whether to use read-write mode. 5. Whether to use custom library DISCARD 6,Is it EXEC 7,Is it MULTI] 
    -> RedisChannelHandler.dispatch()[Judge whether it is debug perhaps tracingEnabled(default false)] 
    -> DefaultEndpoint.write() 
    -> DefaultEndpoint.writeToChannelAndFlush() 
    -> DefaultEndpoint.channelWriteAndFlush()[To the bottom netty Transfer and set retry listener] 
    -> [netty part]AbstractChannel.writeAndFlush
  -> LettuceFutures.awaitOrCancel(command, timeout, TimeUnit.NANOSECONDS); // Wait for redis to return data

Source code of handleInvocation method of FutureSyncInvocationHandler:

    @Override
    @SuppressWarnings("unchecked")
    protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {

        try {

            Method targetMethod = this.translator.get(method);
            Object result = targetMethod.invoke(asyncApi, args);

            if (result instanceof RedisFuture<?>) {

                RedisFuture<?> command = (RedisFuture<?>) result;

                if (!isTxControlMethod(method.getName(), args) && isTransactionActive(connection)) {
                    return null;
                }

                long timeout = getTimeoutNs(command);

                return LettuceFutures.awaitOrCancel(command, timeout, TimeUnit.NANOSECONDS);
            }

            return result;
        } catch (InvocationTargetException e) {
            throw e.getTargetException();
        }
    }

The core of synchronous execution is this method:

LettuceFutures.awaitOrCancel(command, timeout, TimeUnit.NANOSECONDS);

LettuceFutures.awaitOrCancel source code:

    /**
     * Wait until futures are complete or the supplied timeout is reached. Commands are canceled if the timeout is reached but
     * the command is not finished. A {@code timeout} value of zero or less indicates to not time out.
     *
     * @param cmd command to wait for.
     * @param timeout maximum time to wait for futures to complete.
     * @param unit unit of time for the timeout.
     * @param <T> Result type.
     * @return Result of the command.
     */
    public static <T> T awaitOrCancel(RedisFuture<T> cmd, long timeout, TimeUnit unit) {

        try {
            if (timeout > 0 && !cmd.await(timeout, unit)) {
                cmd.cancel(true);
                throw ExceptionFactory.createTimeoutException(Duration.ofNanos(unit.toNanos(timeout)));
            }
            // Get the data returned by redis and return
            return cmd.get();
        } catch (RuntimeException e) {
            throw e;
        } catch (ExecutionException e) {

            if (e.getCause() instanceof RedisCommandExecutionException) {
                throw ExceptionFactory.createExecutionException(e.getCause().getMessage(), e.getCause());
            }

            if (e.getCause() instanceof RedisCommandTimeoutException) {
                throw new RedisCommandTimeoutException(e.getCause());
            }

            throw new RedisException(e.getCause());
        } catch (InterruptedException e) {

            Thread.currentThread().interrupt();
            throw new RedisCommandInterruptedException(e);
        } catch (Exception e) {
            throw ExceptionFactory.createExecutionException(null, e);
        }
    }

You can see the normal thread stack, which will be clearer:

  the completable future asynchronous framework is widely used in lettue, so the code will look very incoherent and not easy to read. Just synchronous execution also takes a lot of time to learn 😤. However, I will take the time to look at the source code of asynchronous and reactive modes.

Keywords: Java Redis lettuce

Added by excence on Fri, 11 Feb 2022 23:25:31 +0200