Online course link: Dark horse programmer java concurrency.
Chapter 8 sharing mode_ tool
8.1 thread pool
Main purpose
- Reduce resource consumption: reuse the created threads to reduce the consumption caused by thread creation and destruction
- Improve response speed: when the task starts queuing, the task can be executed immediately without waiting for the thread to be created
- Improve thread manageability. Threads are scarce resources,
- Unlimited creation will consume system resources and reduce the stability of the system
- Thread pool can be used for unified allocation, tuning and monitoring.
1. Customize thread pool
step
- Custom reject policy interface
@FunctionalInterface { void reject(BlockingQueue<T> queue, T task); }
- Custom task queue
class BlockingQueue<T> { // 1. Task queue private Deque<T> queue = new ArrayDeque<>(); // 2. Lock private ReentrantLock lock = new ReentrantLock(); // 3. Producer condition variables private Condition fullWaitSet = lock.newCondition(); // 4. Consumer condition variables private Condition emptyWaitSet = lock.newCondition(); // 5. Capacity private int capcity; public BlockingQueue(int capcity) { this.capcity = capcity; } // Blocking acquisition public T take(){ lock.lock(); try { while (queue.isEmpty()) { try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } // Blocking addition public void put(T task) { lock.lock(); try { while (queue.size() == capcity) { try { log.debug("Waiting to join the task queue {} ...", task); fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("Join task queue {}", task); queue.addLast(task); emptyWaitSet.signal(); } finally { lock.unlock(); } } // Blocking add with timeout public boolean offer(T task, long timeout, TimeUnit timeUnit) { lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.size() == capcity) { try { if(nanos <= 0) { return false; } log.debug("Waiting to join the task queue {} ...", task); nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("Join task queue {}", task); queue.addLast(task); emptyWaitSet.signal(); return true; } finally { lock.unlock(); } }
- Custom thread pool
class ThreadPool { // Task queue private BlockingQueue<Runnable> taskQueue; // Thread collection private HashSet<Worker> workers = new HashSet<>(); // Number of core threads private int coreSize; // Timeout when getting the task private long timeout; private TimeUnit timeUnit; // Reject policy private RejectPolicy<Runnable> rejectPolicy; // Task execution method public void execute(Runnable task) { // When the number of tasks does not exceed coreSize, it is directly handed over to the worker object for execution synchronized (workers) { // If the number of tasks exceeds coreSize, join the task queue for temporary storage if(workers.size() < coreSize) { Worker worker = new Worker(task); log.debug("newly added worker{}, {}", worker, task); workers.add(worker); worker.start(); } else { // 1) Dead wait: taskqueue put(task); // 2) With timeout wait // 3) Let the caller relinquish task execution // 4) Let the caller throw an exception // 5) Let the caller perform the task himself taskQueue.tryPut(rejectPolicy, task); } } } public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(queueCapcity); this.rejectPolicy = rejectPolicy; } class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { // Perform tasks // 1) When the task is not empty, execute the task // 2) When the task is completed, then get the task from the task queue and execute it // while(task != null || (task = taskQueue.take()) != null) { while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) { try { log.debug("Executing...{}", task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null; } } synchronized (workers) { log.debug("worker Removed {}", this); workers.remove(this); } } }
- test
public static void main(String[] args){ ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue,task)->{ // 1. Wait: queue put(task); // 2) Wait with timeout: queue offer(task, 1500, TimeUnit.MILLISECONDS); // 3) Let the caller abandon task execution: log Debug ("give up {}", task); // 4) Let the caller throw an exception: throw new RuntimeException("task execution failed" + task); // 5) Let the caller perform the task himself task.run(); }); for(int i = 0 ; i < 4 ; i++){ int j = i ; threadPool.execute()->{ try{ Thread.sleep(1000L); } catch(InterruptedException e){ e.printStackTrace(); } log.debug("{}",j); }); } }
2. ThreadPollExecutor
1. Thread pool status
ThreadPoolExecutor uses the 32-bit binary of the int class
- The upper 3 bits indicate the thread pool status
- The lower 29 bits indicate the number of threads
The thread pool state and the number of threads in the thread pool are represented by an atomic integer ctl. The purpose is to combine the thread pool state and the number of threads into one, so that the value can be assigned by one cas atomic operation
// c is the old value and ctlOf returns the new value ctl.compareAndSet(c, ctlOf(targetState, workerCountof(c))); // rs is the upper 3 bits representing the thread pool status, wc is the lower 29 bits representing the number of threads, and ctl is to merge them private static int ctlOf(int rs, int wc) { return rs | wc; }
The thread state variables in the ThreadPoolExecutor class are as follows:
State civil name | Value of the upper 3 bits | describe |
---|---|---|
Running | 111 | Receive new tasks and process tasks in the task queue at the same time |
Shutdown | 000 | New tasks are not accepted, but the remaining tasks in the blocking queue are processed |
Stop | 001 | Interrupt the executing task and discard the task in the blocking queue |
Tidying | 010 | When the task is completed and the active thread is 0, it will enter the termination stage |
Terminated | 011 | End state |
2. Construction method
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
Construction parameters
-
corePoolSize: number of core threads
-
maximumPoolSize: maximum number of threads
- maximumPoolSize - corePoolSize = number of emergency threads
-
keepAliveTime: the maximum survival time when the emergency thread is idle
-
Unit: time unit
-
workQueue: blocking queue (storing tasks)
- Bounded blocking queue ArrayBlockingQueue
- Unbounded blocking queue LinkedBlockingQueue
- A queue with at most one synchronization element, SynchronousQueue
-
Priority BlockingQueue
-
threadFactory: thread factory (name the thread)
-
handler: reject policy
3. newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
This is a static factory method provided by the executors class to create thread pools! Executors is a tool class of the Executor framework. newFixedThreadPool creates a fixed size thread pool
Create and call the previous full parameter construction method, create a fixed number of threads, and use the default thread factory and reject policy
- Number of core threads = = maximum number of threads (when no emergency thread is created), so there is no timeout
- The blocking queue is unbounded (linked blocking queue) and can put any number of tasks
- It is applicable to tasks with known task volume and relatively time-consuming
4. newCachedThreadPool
ExecutorService executorService = Executors.newCachedThreadPool(); public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
- The number of core threads is 0, and the maximum number of threads is integer MAX_ Value, the idle lifetime of the emergency thread is 60s, which means
- All are emergency threads (no task will be recovered after 60s)
- Emergency threads can be created indefinitely
- The queue is implemented by SynchronousQueue, which is characterized by that it cannot be put in without a thread
- The whole thread pool shows that the number of threads will increase continuously according to the number of tasks, and there is no upper limit. When the task is completed, the threads will be released after being idle for 1 minute.
- It is suitable for the situation where the number of tasks is dense but the execution time of each task is short
5. newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); }
6. Submit tasks
7. Close the thread pool
Asynchronous mode_ Worker Thread
definition
hunger
Pool size
cpu intensive operation
cpu cores + 1
io intensive operation
Empirical formula
8. Task scheduling thread pool
Before the task scheduling thread pool function is added, you can use Java util. Timer is used to realize the timing function. The advantage of timer is that it is simple and easy to use. However, since all tasks are scheduled by the same thread, all tasks are executed serially. Only one task can be executed at the same time, and the delay or exception of the previous task will affect the subsequent tasks.
Using task scheduling thread pool can delay the execution of tasks without being affected by exceptions
scheduleAtFixedRate
scheduleWithFixedDelay
9. Correctly handle and execute tasks
- Catch exceptions yourself
- Avoid using Runnable and use Future
Apply_ Timed task
10. Tomcat thread pool
- LimitLatch is used to limit current and control the maximum number of connections
- Acceptor is only responsible for [receiving new socket connections]
- Poller is only responsible for listening to whether the socket channel has [readable I/O events]
- Once calibrated, a task object (socket processor) is encapsulated and submitted to the Executor thread pool for processing
- Executor: the worker thread in the thread pool is ultimately responsible for [processing requests]
Different threads, different division of labor
TomCat thread pool extends ThreadPoolExecutor with slightly different behavior
- If the total thread reaches maxPoolSize
- RejectedExecutionException will not be thrown immediately
- Instead, try to put the task in the queue again, and throw an exception if it still fails
connector configuration
- acceptorThreadCount
- pollerThreadCount
- minSpareThreads
- maxThreads
- executor
executor thread configuration
- threadPriority
- daemon
- minSpareThreads
- maxThreads
- maxIdleTime
- maxQueueSize
- prestartminSpareThreads
3. Fork/Join
1 Concept
Divide and conquer, the design idea of divide and conquer
2 use
Tasks submitted to Fork/Join thread pool need to inherit RecursiveTask()
8.2 Java.Util.Concurrent, J.U.C
1. AQS AbstractQueuedSynchronizer
summary
It is the framework of blocking lock and related synchronizer tools
characteristic
2. ReEntrantLock
1. Implementation principle of unfair lock
Unfair, aqs queue not checked
Lock and unlock process
2. Reentrant principle
Locking increases automatically and unlocking decreases automatically
3 interruptible principle
- Non interruptible mode
- Interruptible mode
4. Implementation principle of fair lock
Fair lock, check aqs queue
5 implementation principle of conditional variables
Each condition variable actually corresponds to a waiting queue, and its implementation class is ConditionObject
await process
Start Thread-0 to hold the lock, call await, enter the addConditionWaiter process of ConditionObject, create a new Node with the status of - 2 (Node.Condition), and associate Thread-0 if waiting for the end of the queue
3. Read write lock
3.1 ReEntrantWriteReadLock
When the read operation is much higher than the write operation, the read-write lock is used to make the read-read concurrent and improve the performance. Similar to select... from... lock in share mode in the database
A data container class is provided, which uses the read() method for reading lock protection data and the write() method for writing lock protection data
private Object data; private ReentrantReadWriteLock rw = new ReentrantReadWriteLock(); private ReentrantReadWriteLock.ReadLock r = rw.readLock(); private ReentrantReadWriteLock.WriteLock w = rw.writeLock(); public Object read() { log.debug("Acquire read lock..."); r.lock(); try { log.debug("read"); sleep(1); return data; } finally { log.debug("Release read lock..."); r.unlock(); } } public void write(){ log.debug("Get write lock..."); w.lock(); try{ log.debug("write in..."); sleep(1); } finally{ log.debug("Release write lock..."); w.unlock(); } }
Read read can be concurrent, and read and write are mutually exclusive
Application cache
- Because read is faster than write, the database and cache are inconsistent
- It is suitable for reading more and writing less. If writing operations are frequent, the above implementation performance is low]
- Cache capacity is not considered
- Cache expiration is not considered
- Only for single machine
- And issue low, there is only one lock at present
- The update method is too simple and crude
Read write lock principle
Ensure operation atomicity
The read-write lock uses the same sync synchronizer, so the wait queue and state are the same
T1 w.lock, t2. r.lock
-
t1 is locked successfully. Compared with ReentrantLock, there is no special lock. The difference is that the write lock accounts for the lower 16 bits of state, while the read lock uses the upper 16 bits of state
-
t2 execute r.lock, and enter sync.lock of read lock The acquiresshared (1) process will first enter the tryacquiresshared process. If a write lock is occupied, tryacquiresshared returns - 1, indicating failure
The return value of tryAcquireShared indicates
- -1 means failure
- 0 indicates success, but subsequent nodes will not continue to wake up
- A positive number indicates success, and the value is that several subsequent nodes need to wake up, and the read-write lock returns 1
-
You will enter sync In the doacquireshared (1) process, addWaiter is called to add a node. The difference is that the node is set to node Shared mode instead of node Exclusive mode, note that t2 is still active at ci
3.2 Stamped Lock
To further optimize the read performance, it is characterized in that when using the read lock, the read lock must be used in conjunction with the [stamp] (stamp) when writing the lock
Optimistic reading. stampedlock supports the tryOptimisticRead() method (happy reading). After reading, a stamp verification needs to be done. If the verification passes, it means that there is no write operation during this period, and the data can be used safely. If the verification fails, the read lock needs to be obtained again to ensure data security
4. Semaphore
semaphore application
- Flow restriction: during the peak period of access, the request thread is blocked, and the license is released after the peak period. Of course, it is only suitable for limiting the number of stand-alone threads, and it limits the number of threads, not the number of resources
- The implementation of simple connection pool with Semaphore is obviously better in performance and readability compared with the implementation in [sharing mode]
5. CountdownLatch
Synchronous cooperation of threads, waiting for all threads to complete the countdown
The construction parameter is used to initialize the waiting count value, await() is used to wait for the count to return to zero, and countDown() is used to reduce the count by one
Compared with join
- countdownLatch is an advanced API, which is more advanced than join
- join is too cumbersome to use
Application - wait for multithreading to be ready
Apply - wait for multiple remote calls to end
6. CyclicBarrier
be careful ⚠️ The main difference between CyclicBarrier and CountDownLatch is that CyclicBarrier can be reused CyclicBarrier can be compared to "a car full of people"
CountdownLatch objects cannot be reused and need to be created repeatedly
CyclicBarrier provides reusable tool classes
7. Thread safe collection
Three categories
- Legacy security collection
- Decorated security collection
- J.U.C. security collection
Focus on Java util. Concurrent.* You can find that they are regular, which contains three types of keywords: Blocking, CopyOnWrite and Concurrent
- Most implementations of Blocking are lock based and provide methods for Blocking
- Containers such as CopyOnWrite are relatively expensive to modify containers of Concurrent type
- Many internal operations use cas optimization, which can generally provide high throughput
- Weak consistency
- Weak consistency during traversal. For example, when traversing with an iterator, if the container is modified, the iterator can still continue to traverse, and the content is old
- For weak consistency of size, the size operation may not be 100% accurate
- Read weak consistency
If the traversal is modified, for non secure containers, use the fail fast mechanism, that is, make the traversal fail immediately, throw a ConcurrentModificationException, and do not continue the traversal
8. ConcurrentHashMap
1. JDK7 HashMap concurrent dead chain
2. JDK8 ConcurrentHashMap
3. JDK7 ConcurrentHashMap
Divide the hashMap into segment, key and entry pairs
It maintains an array of segment s, each of which corresponds to a lock
Advantages: if multiple threads access different Segments, there is actually no conflict, which is similar to the disadvantage in jdk8: the default size of the Segments array is 16. This capacity cannot be changed after initialization is specified, and it is not lazy initialization