ConcurrentBag I of HikariCP source code analysis

Welcome to my blog to sync updates: Fengshan hospital

Source code version 2.4.5-SNAPSHOT

Hello, today, let's analyze the core ConcurrentBag of HikariCP, which is the most important core class for managing connection pools. You can see from its name that it is a concurrency management class with very good performance, which is the secret of its performance beating other connection pools.

Code overview

Let's take a look at the code first. Note that this is not all the code, and the less important part is omitted. You can see that I added very detailed comments. Friends who are not very interested in detailed explanation can directly read the code, but it took me several nights to finish this part. You can hold it a little:

//Available connection synchronizer, used to notify the number of idle connections between threads, synchronizer The currentsequence () method can get the current quantity
//In fact, it is a counter. If a connection is created or returned in the connection pool, it will be + 1, but if the connection of the connection pool is borrowed, it will not be - 1, which will only be increased but not decreased
//It is used to query whether there are idle connections added to the connection pool when a thread obtains a connection from the connection pool. See the browse method for details
private final QueuedSequenceSynchronizer synchronizer;
//sharedList saves all connections
private final CopyOnWriteArrayList<T> sharedList;
//threadList may hold the connected references in sharedList
private final ThreadLocal<List<Object>> threadList;
//A reference to HikariPool to request the creation of a new connection
private final IBagStateListener listener;
//The number of threads currently waiting to get a connection
private final AtomicInteger waiters;
//Mark whether the connection pool is closed
private volatile boolean closed;


/**
 * This method will get the connection from the connection pool. If no connection is available, it will wait for timeout
 *
 * @param timeout  Timeout
 * @param timeUnit Time unit
 * @return a borrowed instance from the bag or null if a timeout occurs
 * @throws InterruptedException if interrupted while waiting
 */
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {
   //①
   //Try to get from ThreadLocal first
   List<Object> list = threadList.get();
   if (weakThreadLocals && list == null) {
      //If ThreadLocal is null, it will be initialized to prevent subsequent npe
      list = new ArrayList<>(16);
      threadList.set(list);
   }
   //②
   //If there is a connection in ThreadLocal, traverse and try to get it
   //Backward traversal from back to front is beneficial, because the last used connection is more likely to be idle, and the previous connection may be stolen by other threads
   for (int i = list.size() - 1; i >= 0; i--) {
      final Object entry = list.remove(i);
      @SuppressWarnings("unchecked") final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
      if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
         return bagEntry;
      }
   }
   //③
   //If the connection is not obtained from ThreadLocal, it will traverse the sharedList connection pool to obtain the connection, and the timeout will expire after the timeout time
   //Because the connections saved in ThreadLocal are used by the current thread, the reference will be retained in ThreadLocal. There may be other idle connections in the connection pool, so you need to traverse the connection pool
   //Take a look at the implementation of the require (final t bagentry) method. The returned connection is placed in ThreadLocal
   timeout = timeUnit.toNanos(timeout);
   Future<Boolean> addItemFuture = null;
   //Record the start time of obtaining connections from the connection pool, which is used later
   final long startScan = System.nanoTime();
   final long originTimeout = timeout;
   long startSeq;
   //Increase the thread counter waiting for connection by 1
   waiters.incrementAndGet();
   try {
      do {
         // scan the shared list
         do {
            //④
            //The number of connections in the current connection pool. This value will increase when new connections are added to the connection pool
            startSeq = synchronizer.currentSequence();
            for (T bagEntry : sharedList) {
               if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                  // if we might have stolen another thread's new connection, restart the add...
                  //⑤
                  //If waiters is greater than 1, it indicates that there are other threads waiting for idle connections besides the current thread
                  //Here, the addItemFuture of the current thread is null, which means that it has not requested to create a new connection, but has obtained the connection, which means that it has obtained the connection requested by other threads. This is the so-called stealing the connection of other threads, and then the current thread requests to create a new connection and compensate it to other threads
                  if (waiters.get() > 1 && addItemFuture == null) {
                     //Add a new asynchronous connection task
                     listener.addBagItem();
                  }
                  return bagEntry;
               }
            }
         } while (startSeq < synchronizer.currentSequence()); //If the number of free connections in the connection pool is more than that before the cycle, it indicates that new connections have been added and continue to cycle
         //⑥
         //After looping the connection pool once (or many times, if there is a new connection added just after the first looping of the connection pool, it will continue to cycle). If you still can't get the free connection, you need to create a new connection
         if (addItemFuture == null || addItemFuture.isDone()) {
            addItemFuture = listener.addBagItem();
         }
         //Calculate the remaining timeout = connectionTimeout set by the user - (current time of the system - time to get the connection code ①, that is, the total time to get the connection from the connection pool)
         timeout = originTimeout - (System.nanoTime() - startScan);
      } while (timeout > 10_000L && synchronizer.waitUntilSequenceExceeded(startSeq, timeout)); //③
      //⑦
      //The cycle conditions here are complex
      //1. If the remaining timeout is greater than 10_ 000 nanoseconds
      //2. The number of startseq, that is, the number of idle connections exceeds the number before the cycle
      //3. timeout is not exceeded
      //The loop will not continue until the above three conditions are met. Otherwise, the thread will be blocked until the above conditions are met
      //If the conditions are not met until the timeout expires, end the blocking and move on
      //The conditions that may change dynamically, only the number of startSeq changes, which is the connection creation request added at ②
   } finally {
      waiters.decrementAndGet();
   }

   return null;
}

/**
 * This method returns the borrowed connections back to the connection pool
 * Connections that are not returned through this method will cause memory leaks
 *
 * @param bagEntry the value to return to the bag
 * @throws NullPointerException  if value is null
 * @throws IllegalStateException if the requited value was not borrowed from the bag
 */
public void requite(final T bagEntry) {
   //⑧
   //The lazySet method does not guarantee that the connection will be set to the available state immediately. This is a delay method
   //This is an optimization. If you want to take effect immediately, you may need to use volatile to let other threads find it immediately, which will reduce performance. Using lazySet will not waste much time, but it will not waste performance
   bagEntry.lazySet(STATE_NOT_IN_USE);

   //⑨
   //Put the connection back into threadLocal
   final List<Object> threadLocalList = threadList.get();
   if (threadLocalList != null) {
      threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
   }
   //Notify the waiting thread that a connection is available
   synchronizer.signal();
}

/**
 * Add a connection to the connection pool
 * New connections are added to the sharedList, and threadList is the reference of some connections in the sharedList
 *
 * @param bagEntry an object to add to the bag
 */
public void add(final T bagEntry) {
   if (closed) {
      LOGGER.info("ConcurrentBag has been closed, ignoring add()");
      throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
   }
   //⑩
   sharedList.add(bagEntry);
   synchronizer.signal();
}

/**
 * Remove a connection from the connection pool
 * This method can only be used for connections obtained from < code > borrow (long, timeunit) < / code > or < code > reserve (T) < / code > methods
 * That is, this method can only remove connections that are in use and on hold
 *
 * @param bagEntry the value to remove
 * @return true if the entry was removed, false otherwise
 * @throws IllegalStateException if an attempt is made to remove an object
 *                               from the bag that was not borrowed or reserved first
 */
public boolean remove(final T bagEntry) {
   //⑪
   //Try to mark and remove the connection in use and reserved status. If the mark fails, it is an idle connection and directly returns false
   //That is, check the status of connections. You cannot remove idle connections or connections that have been marked for removal
   if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
      LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
      return false;
   }
   //If the above mark is successful, remove the connection from the connection pool
   final boolean removed = sharedList.remove(bagEntry);
   if (!removed && !closed) {
      LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
   }

   // synchronizer.signal();
   return removed;
}

The above code is the member variables and the four most important methods in ConcurrentBag. The attributes in ConcurrentBag are explained in the code.

borrow method

The borrow method should be the most core method in the whole HikariCP. It is the method we will eventually call when we get a connection from the connection pool. All the secrets are here. Let's analyze:

①ThreadLocal

//①
//Try to get from ThreadLocal first
List<Object> list = threadList.get();
if (weakThreadLocals && list == null) {
  //If ThreadLocal is null, it will be initialized to prevent subsequent npe
  list = new ArrayList<>(16);
  threadList.set(list);
}

threadList is a member variable on ConcurrentBag. Its definition is private final ThreadLocal < List > threadList;, It can be seen that it is a ThreadLocal, that is, each thread has an exclusive List, which is used to save the connections used by the current thread. Note that I'm talking about "used", not all connections. Because there is also a member variable private final CopyOnWriteArrayList < T > sharedList;, It is the place where all connections are really saved. It is a CopyOnWriteArrayList. When writing, it will first copy a sharedList2, then modify the new sharedList2, and finally point the variable address to the new sharedList2 instead of directly writing to the current sharedList. A typical method of changing space for time can avoid locking the sharedList before writing, This results in reduced performance.

The connections used by HikariCP are directly placed in ThreadLocal when returning to the connection pool. At this point, some students may ask: all connections are saved in the sharedList. When a user borrows a connection, shouldn't he remove it from the sharedList and add it to the sharedList when he returns it? Why didn't you put it in the sharedList when you returned it?

First, to be clear, HikariCP does not do this. Why? If you remove the connection from the sharedList when the user borrows the connection, it is equivalent to that the connection is separated from the management of HikariCP. How can HikariCP manage the connection later? For example, the life cycle of this connection is up, and the connection has made users turn away. How can I close this connection? Therefore, all connections cannot be out of control, and none can be less. In fact, what we save in sharedList is only the reference of database connection, which is visible to all threads. Each thread can also save the reference of connection at will, but we must use the borrow method and follow the process.

Why put it in the threadList of the thread?

Because it is more convenient to get it next time, it may improve the performance. Each thread takes priority from its own local thread, and the possibility of competition is greatly reduced. Perhaps the time from the connection just used up to getting it again is very short, and the connection is likely to be idle. This thread cannot be used in the local connection of hikarelist.

Take a life example: if you are a chain store owner who provides car rental service and has a general warehouse, all chain stores pick up cars and rent them to users from here. At the beginning, every time you rent a car, you go to the warehouse to pick up the goods directly. When the user returns the car, you send it directly to the warehouse. After a period of time, you think this won't work. It's a waste of time. Moreover, all chain stores are like this. The owners of all stores are too busy to pick up the car and have to queue up. Otherwise, the car returned by the user should be put in the store first, so that the next time a user rents a car, he doesn't have to go to the warehouse. It's much more convenient to give it directly to him. When there is no car in the store, go to the general warehouse to pick up the car. Other chain stores began to do this. Everyone first used the car in the store, and then went to the general warehouse. Business is booming. One day, there is no car in the store. You go to the warehouse to pick up the car. The warehouse keeper said: there is no car in the warehouse. There are idle people in Tiantongyuan chain store. You go there to pick up the car, so you borrow the car of Tiantongyuan chain store. Therefore, various chain stores also borrow cars from each other.

The example may not be appropriate. I can't think of a life example with the same reason at the moment, but that's what I mean. The same is true for HikariCP. When the connection used by the user is returned to the connection pool, it is directly put into the local threadList of the thread. If the user wants to borrow the connection, first see whether there is a local connection and give priority to using the local connection. Only when there is no local connection or it is not available, it can be obtained from the connection pool of HikariCP. However, it is different from car borrowing, because we locally save the connection reference in the sharedList. Although you still have the connection reference, it is likely that it has been borrowed from the sharedList by other threads. This is what HikariCP calls inter thread connection theft. Therefore, even if the thread gets the connection in the local threadList, it must check the status and whether it is available.

At this point, there is no parsing code, which is far away. ① The code is to first take out the connected List from the local threadList, and then check whether the List is empty. If it is empty, directly initialize a List, because it will be used below to prevent null pointers. You can see that another condition is weakthreadlocales when judging null. This identifier indicates whether threadList is a weak reference. If it is a weak reference, it is likely to be recycled during GC, so it becomes null. However, if it is not a weak reference, it is a FastList when initializing ConcurrentBag. Don't worry about being null. So when will threadList be a weak reference? When HikariCP runs in the container, weak references will be used, because memory leakage may occur when the container is redeployed. For details, please see #39 the issue.

Keywords: Java

Added by glueater on Mon, 07 Mar 2022 21:58:36 +0200