Lightweight dynamically monitorable thread pool based on configuration center - DynamicTp

Hello, I'm a programming fox. Today we'll talk about a more practical topic, the practice of dynamic and monitorable thread pool. The address of the open source project < < dynamictp > > is at the end of the article.

Write in front

Little friends with some Java programming experience know that the essence of Java lies in the juc package, which is the masterpiece of the famous old man Doug Lea. To evaluate a programmer's Java level depends on his mastery of some technologies under the juc package to a certain extent. This is also one of the technical points that must be asked in the interview.

juc package mainly includes:

1. Atomic class (AtomicXXX)

2. Locks (XXXLock)

3. Thread synchronization classes (AQS, CountDownLatch, CyclicBarrier, Semaphore, Exchanger)

4. Task Executor class (Executor system class, including today's protagonist ThreadPoolExecutor)

5. Related collection classes of concurrent collection classes (ConcurrentXXX, CopyOnWriteXXX)

6. Blocking queue class (BlockingQueue inherits system class)

7.Future related

8. Other auxiliary tools

In the multi-threaded programming scenario, these classes are necessary skills, which can help us write high-quality, high-performance and less bug code. At the same time, these are also some difficult technologies in Java. We need to persevere, apply what we have learned, and feel the mystery of them in use.

The above briefly lists the function categories under the juc package. In this article, we will mainly introduce the dynamic monitorable thread pool, so we won't talk about the specific content. Let's talk about it alone when we have time in the future. Before reading this article, I hope readers would better have some experience in using thread pool ThreadPoolExecutor, otherwise they will look a little confused.

If you are not familiar with ThreadPoolExecutor, it is recommended to read the following two articles

https://www.javadoop.com/post/java-thread-poolhttps://www.javadoop.com/post/java-thread-pool
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.htmlhttps://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

background

Do you have the following pain points when using ThreadPoolExecutor?

1. A ThreadPoolExecutor is created in the code, but I don't know how many core parameters are appropriate

2. Set parameter values based on experience. It is very troublesome to change the code and restart the service after online

3. Compared with developers, thread pool is a black box, and the running condition cannot be perceived until there is a problem

If you have the above pain points, the dynamic thread pool introduced in this article may help you.

If you have seen the source code of ThreadPoolExecutor, you may know that it actually provides some set methods that can dynamically modify the corresponding values at runtime. These methods include:

public void setCorePoolSize(int corePoolSize);
public void setMaximumPoolSize(int maximumPoolSize);
public void setKeepAliveTime(long time, TimeUnit unit);
public void setThreadFactory(ThreadFactory threadFactory);
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);

Now most Internet projects actually deploy microservices and have their own service governance system. The distributed configuration center in the microservice component plays the role of dynamically modifying the configuration and taking effect in real time. Can we dynamically adjust the thread pool parameters at runtime in combination with the configuration center? The answer is yes, and the configuration center is relatively highly available. You don't have to worry too much about problems with configuration push, and it can also reduce the difficulty and workload of developing dynamic thread pool components.

To sum up, we summarize the following background

  • Universality: in Java development, if you want to improve system performance, thread pool is already a basic tool that more than 90% of people will choose to use

  • Uncertainty: many thread pools may be created in the project, both IO intensive and CPU intensive, but the parameters of the thread pool are not easy to determine; A mechanism is needed to dynamically adjust parameters during operation

  • Insensibility, the indicators in the running process of thread pool are generally not perceived; It is necessary to have a set of monitoring and alarm mechanism, so that developers can perceive the running status of thread pool and deal with it in time

  • High availability, configuration changes need to be pushed to the client in time; A highly available configuration management push service is required. The configuration center is a component used by most internet systems, and can be greatly combined with it

  • Reduce development volume and access difficulty

brief introduction

Based on the configuration center, we extend the ThreadPoolExecutor of the thread pool to dynamically modify the running thread pool parameters and take effect in real time; And real-time monitoring the running status of the thread pool. When the set alarm strategy is triggered, the alarm information will be pushed to the office platform (nailing, enterprise micro, etc.). Alarm dimensions include (queue capacity, thread pool activity, reject trigger, etc.); At the same time, thread pool indicator data will be collected regularly for visualization of the monitoring platform. So that we can always sense the load of the thread pool and adjust it in time according to the situation to avoid problems affecting the online business.

    |  __ \                            (_) |__   __|
    | |  | |_   _ _ __   __ _ _ __ ___  _  ___| |_ __  
    | |  | | | | | '_ \ / _` | '_ ` _ \| |/ __| | '_ \ 
    | |__| | |_| | | | | (_| | | | | | | | (__| | |_) |
    |_____/ \__, |_| |_|\__,_|_| |_| |_|_|\___|_| .__/ 
             __/ |                              | |    
            |___/                               |_|    
     :: Dynamic Thread Pool :: 

characteristic

  • Refer to meituan thread pool practice, dynamically manage thread pool parameters, and add monitoring and alarm functions

  • Based on the Spring framework, it only supports the use of SpringBoot projects. It is lightweight and can be eaten by introducing starter

  • Based on the configuration center, the thread pool parameters are dynamically adjusted and take effect in real time; It integrates mainstream configuration centers, supports Nacos and Apollo by default, and also provides SPI interface, which can be customized and extended

  • The built-in notification alarm function provides a variety of alarm dimensions (configuration change notification, activity alarm, capacity threshold alarm and rejection policy trigger alarm). By default, it supports enterprise wechat and nailing alarm. At the same time, it provides SPI interface, which can be customized and extended

  • The built-in thread pool indicator collection function supports three methods: MicroMeter, JsonLog log output and Endpoint, which can be customized through SPI interface extension

architecture design

It is mainly divided into four modules

  • Configuration change monitoring module:

    1. Listen to the specified configuration files of a specific configuration center (Nacos and Apollo are implemented by default), and other implementations can be extended through the internally provided SPI interface

    2. Parse the content of the configuration file, realize the parsing of yml and properties configuration files built-in, and extend other implementations through the internally provided SPI interface

    3. Notify the thread pool management module to refresh

  • Thread pool management module:

    1. When the service starts, pull the configuration information from the configuration center, generate a thread pool instance and register it in the internal thread pool registration center

    2. When the monitoring module monitors the configuration change, it passes the change information to the management module to refresh the thread pool parameters

    3. In the code, get the thread pool object instance according to the thread pool name through the getExecutor() method

  • Monitoring module:

    To realize the collection and output of monitoring indicators, the following three methods are provided by default, and other implementations can also be extended through the internal SPI interface

    1. By default, Json log is output to disk

    2.MicroMeter acquisition and introduction of MicroMeter related dependencies

    3. Thunderstorm Endpoint, which can be accessed through http

  • Notification alarm module:

    Connect with the office platform to realize the notification alarm function. Nailing and enterprise micro are realized by default. Other implementations can be extended through the internal SPI interface. The notification alarm types are as follows

    1. Thread pool parameter change notification

    2. Blocking queue capacity reaches the set threshold alarm

    3. Thread pool activity reaches the set threshold alarm

    4. Trigger rejection policy alarm

                                                    

use

  • maven dependency

    <dependency>
         <groupId>io.github.lyh200</groupId>
         <artifactId>dynamic-tp-spring-cloud-starter</artifactId>
         <version>1.0.2-RELEASE</version>
     </dependency>
  • Thread pool configuration

    spring:
      dynamic:
        tp:
          enabled: true
          enabledBanner: true        #Whether to enable banner printing. The default value is true
          enabledCollect: false      #Whether to enable monitoring indicator collection. The default is false
          collectorType: logging     #Monitoring data collector type (JsonLog | MicroMeter), default logging
          logPath: /home             #Monitoring log data path, default ${user.home}/logs
          monitorInterval: 5         #Monitoring time interval (alarm judgment and indicator Collection), 5s by default
          nacos:                     #nacos configuration, no default value is configured (rule name-dev.yml)
            dataId: dynamic-tp-demo-dev.yml
            group: DEFAULT_GROUP
          apollo:                    #The first namespace is configured with apollo by default
            namespace: dynamic-tp-demo-dev.yml
          configType: yml            #Profile type
          platforms:                 #Notification alarm platform configuration
            - platform: wechat
              urlKey: 3a7500-1287-4bd-a798-c5c3d8b69c  #Replace
              receivers: test1,test2                   #Name of recipient enterprise wechat
            - platform: ding
              urlKey: f80dad441fcd655438f4a08dcd6a     #Replace
              secret: SECb5441fa6f375d5b9d21           #Instead, non sign modes may not have this value
              receivers: 15810119805                   #Nail account mobile number
          executors:                                   #Dynamic thread pool configuration
            - threadPoolName: dynamic-tp-test-1
              corePoolSize: 6
              maximumPoolSize: 8
              queueCapacity: 200
              queueType: VariableLinkedBlockingQueue   #Task queue, view the source code QueueTypeEnum enumeration class
              rejectedHandlerType: CallerRunsPolicy    #Reject policy, see RejectedTypeEnum enumeration class
              keepAliveTime: 50
              allowCoreThreadTimeOut: false
              threadNamePrefix: test           #Thread name prefix
              notifyItems:                     #Alarm item, automatically configured if not configured (change notification, capacity alarm, active alarm, reject alarm)
                - type: capacity               #For the alarm item type, see the NotifyTypeEnum enumeration class
                  enabled: true
                  threshold: 80                #Alarm threshold
                  platforms: [ding,wechat]     #Optional configuration. All platforms configured by the upper platforms are not configured by default
                  interval: 120                #Alarm interval (unit: s)
                - type: change
                  enabled: true
                - type: liveness
                  enabled: true
                  threshold: 80
                - type: reject
                  enabled: true
                  threshold: 1
  • Generated by code, the service will be registered automatically when it is started

    @Configuration
    public class DtpConfig {
    
       @Bean
       public DtpExecutor demo1Executor() {
           return DtpCreator.createDynamicFast("demo1-executor");
      }
    
       @Bean
       public ThreadPoolExecutor demo2Executor() {
           return ThreadPoolBuilder.newBuilder()
                  .threadPoolName("demo2-executor")
                  .corePoolSize(8)
                  .maximumPoolSize(16)
                  .keepAliveTime(50)
                  .allowCoreThreadTimeOut(true)
                  .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false)
                  .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName())
                  .buildDynamic();
      }
    }
  • Code call, obtained according to the thread pool name

    public static void main(String[] args) {
           DtpExecutor dtpExecutor = DtpRegistry.getExecutor("dynamic-tp-test-1");
           dtpExecutor.execute(() -> System.out.println("test"));
    }

matters needing attention

  1. The parameters configured by the configuration file will override the parameters configured by code generation

  2. Only the VariableLinkedBlockingQueue type can modify the capacity of the blocking queue

    It can be similar to LinkedBlockingQueue, but the capacity is not of final type and can be modified,

    VariableLinkedBlockingQueue refers to the implementation of RabbitMq

  3. After startup, you can see the following log output to prove that the access is successful

    |  __ \                            (_) |__   __|   
    | |  | |_   _ _ __   __ _ _ __ ___  _  ___| |_ __  
    | |  | | | | | '_ \ / _` | '_ ` _ \| |/ __| | '_ \ 
    | |__| | |_| | | | | (_| | | | | | | | (__| | |_) |
    |_____/ \__, |_| |_|\__,_|_| |_| |_|_|\___|_| .__/ 
             __/ |                              | |    
            |___/                               |_|    
     :: Dynamic Thread Pool :: 
    
    DynamicTp register, executor: DtpMainPropWrapper(dtpName=dynamic-tp-test-1, corePoolSize=6, maxPoolSize=8, keepAliveTime=50, queueType=VariableLinkedBlockingQueue, queueCapacity=200, rejectType=RejectedCountableCallerRunsPolicy, allowCoreThreadTimeOut=false)
  4. Configuration changes push notification messages and highlight the changed fields

    DynamicTp [dynamic-tp-test-1] refresh end, changed keys: [corePoolSize, queueCapacity], corePoolSize: [6 => 4], maxPoolSize: [8 => 8], queueType: [VariableLinkedBlockingQueue => VariableLinkedBlockingQueue], queueCapacity: [200 => 2000], keepAliveTime: [50s => 50s], rejectedType: [CallerRunsPolicy => CallerRunsPolicy], allowsCoreThreadTimeOut: [false => false]

call the police

Triggering the alarm threshold will push the corresponding alarm message (activity, capacity, rejection) and highlight the corresponding field

Configuration changes push notification messages and highlight the changed fields

Monitoring log

Configure the collection type of monitoring indicators through the collectType attribute. The default is logging

  • MicroMeter: by introducing relevant MicroMeter dependency collection to the corresponding platform (e.g

    Prometheus,InfluxDb...)

  • Logging: regularly collect indicator data and output it to disk in Json log format, address ${logPath}/dy

    namictp/${appName}.monitor.log

  • Expose the EndPoint endpoint (dynamic TP), which can be requested through http

Project address

gitee address: https://gitee.com/yanhom/dynamic-tp-spring-cloud-starter

github address: https://github.com/lyh200/dynamic-tp-spring-cloud-starter

Contact me

If you have any ideas or suggestions on the project, you can join me in wechat communication, or create issues to improve the project together

Keywords: Java thread pool threadpool

Added by bgomillion on Mon, 17 Jan 2022 06:28:14 +0200