catalogue
Thread pool - Executors default thread pool
Thread pool - ThreadPoolExecutor
Thread pool parameter - reject policy
AtomicInteger - memory parsing
AtomicInteger - source code analysis
Pessimistic lock and optimistic lock
Concurrency tool class - Hashtable
Concurrency tool class - basic use of ConcurrentHashMap
ConcurrentHashMap1.7 principle
ConcurrentHashMap1.8 principle
Concurrency tool class - CountDownLatch
Concurrency tool class - Semaphore
Thread state
How to verify the status of threads?
Query thread. In API State internal class, you can see the six states of threads in the virtual machine
- NEW
Threads that have not yet started are in this state. - RUNNABLE
The thread executing in the Java virtual machine is in this state. - BLOCKED
A thread that is blocked and waiting for a monitor lock is in this state. - WAITING
A thread that waits indefinitely for another thread to perform a particular operation is in this state. - TIMED_WAITING
A thread waiting for another thread to perform an operation depending on the specified waiting time is in this state. - TERMINATED
The exited thread is in this state.
Six states of threads in virtual machine?
NEW # create objects
RUNNABLE start()
BLOCKED} unable to get lock object
WAITING wait()
TIMED_WAITING sleep()
TERMINATED: all codes have been executed
Thread pool - Fundamentals
The solution to the story of eating and falling bowls?
1. Find a cabinet to put the bowl. At this time, the cabinet is empty
2. For the first meal, buy a bowl
3. Put the bowl in the cabinet after eating
4. For the second meal, you don't need to buy a bowl. Take it directly from the cabinet
5. After eating, put the bowl back in the cabinet again
There are similar problems with multithreading before? - > Efficiency issues
1. You need to create a thread
2. After running out, the thread dies
How to solve it?
1. Create a pool (line program pool), which is empty at first
2. When a task needs to be executed, a thread object will be created. After the task is executed, the thread object will be returned to the pool
3. After all tasks are completed, close the connection pool
Thread pool is a form of multi-threaded processing. Tasks are added to the queue during processing. When the system starts, the thread pool creates a large number of idle threads. The program passes a task to the thread pool, and the thread pool will start a thread to execute the task. After execution, the thread will not die, but return to the thread pool to become idle again and wait for the execution of the next task.
Simply put, a thread pool is a collection of threads.
Thread pool - Executors default thread pool
How to use the default thread pool of Executors?
1. Create a pool (line program pool), which is empty at first - > use the static method of Executors - > check the API
2. When a task needs to be executed, a thread object will be created. After the task is executed, return the thread object to the pool - > submit();
3. After all tasks are completed, close the connection pool - > shutdown();
How does the Executors class create a thread pool?
1. newCachedThreadPool();
Create a thread pool that can create new threads as needed to accommodate max of int_ Value threads
2. newFixedThreadPool(int maximum capacity);
Create a reusable thread pool with a fixed number of threads
Create thread pool: newCachedThreadPool()
public class ThreadPoolDemo01 { public static void main(String[] args) throws InterruptedException { // Create a thread pool service object to control the thread pool ExecutorService executorService = Executors.newCachedThreadPool(); // Create a thread through the thread pool service object executorService.submit(() -> { System.out.println(Thread.currentThread().getName() + "The thread is executing"); }); Thread.sleep(1000); // Creating threads from thread pool service objects executorService.submit(() -> { System.out.println(Thread.currentThread().getName() + "The thread is executing"); }); // Close thread pool executorService.shutdown(); ExecutorService executorService = Executors.newFixedThreadPool(2); System.out.println(executorService); // } }
Create a thread pool with a specified number of threads: newFixedThreadPool()
Executors Class to create a thread pool? 1. newCachedThreadPool(); Thread pool creation, The default is empty, Maximum accommodation int of MAX_VALUE 2. newFixedThreadPool(int Maximum capacity); If you query the default number of threads? 1. debug see, workers Indicates the number of threads, initial size=0 2. call ThreadPoolExecutor Object getPoolSize method Code example: public class ThreadPoolDemo02 { public static void main(String[] args) { // Create thread service object ExecutorService executorService = Executors.newFixedThreadPool(10); // Strong conversion executorService ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService; System.out.println(executor.getPoolSize()); //0 // Start thread 1 executorService.submit(() -> { System.out.println(Thread.currentThread().getName() + "The thread is executing"); }); // Start thread 2 executorService.submit(() -> { System.out.println(Thread.currentThread().getName() + "The thread is executing"); }); System.out.println(executor.getPoolSize()); //2 // Release resources executorService.shutdown(); } }
Thread pool - ThreadPoolExecutor
The above methods are all java to help us create thread pool objects. What if we want to create them ourselves?
Follow up the source code of the above two methods 1. newCachedThreadPool(); public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 2. newFixedThreadPool(int Maximum capacity); public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } We found that the bottom floor new Yes ThreadPoolExecutor, You can use this class if you create it yourself!
Examples of restaurant staff services:
1. Number of regular employees = number of core threads
2. Maximum number of employees in the restaurant # maximum number of threads
3. Temporary employee dismissed idle time (value) idle time (value)
4. Temporary employee dismissed idle time (unit) idle time (unit)
5. Queued customer task queue
6. Where to hire people and how to} create threads
7. Solution when there are too many people in the queue and too many tasks to be executed by the rejection policy
ThreadPoolExecutor pool = new ThreadPoolExecutor(
Number of core threads, / / cannot be less than 0
Maximum number of threads, / / cannot be less than or equal to 0. Maximum number= Number of core threads
Maximum idle thread lifetime, / / cannot be less than 0
Time unit, / / time unit
Task queue, / / cannot be null
Create thread factory, / / cannot be null
Reject policy / / cannot be null
);
public class ThreadPoolDemo03 { public static void main(String[] args) { // Create thread pool ThreadPoolExecutor pool = new ThreadPoolExecutor( 3, //Number of core threads 5, //Maximum number of threads 2, //Maximum idle thread lifetime TimeUnit.SECONDS, //second new ArrayBlockingQueue<>(10), //Task queue (blocking queue) Executors.defaultThreadFactory(), //Default factory new ThreadPoolExecutor.AbortPolicy() //Reject policy for task (stop) ); // Create thread object pool.submit(new MyRunnable()); pool.submit(new MyRunnable()); // Release resources pool.shutdown(); } } class MyRunnable implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName() + "Thread executed"); } }
Thread pool parameter - reject policy
1. When to refuse?
When submitted tasks > maximum thread pool capacity + task queue capacity
2. How to refuse?
2.1 ThreadPoolExecutor.AbortPolicy(); Discard task throw exception (default policy)
2.2 ThreadPoolExecutor.DiscardPolicy(); Discard tasks without throwing exceptions (not recommended)
2.3 ThreadPoolExecutor.DiscardOldestPolicy(); Discard the one with the longest waiting time in the queue and add the current task to the queue
2.4 ThreadPoolExecutor.CallerRunsPolicy(); Call the run() of the task to bypass the thread pool
Example of default policy code:
public class ThreadPoolDemo04 { public static void main(String[] args) { // Create thread pool ThreadPoolExecutor pool = new ThreadPoolExecutor( 2, //Number of core threads 5, //Maximum number of threads 2, //Maximum idle thread lifetime TimeUnit.SECONDS, //second new ArrayBlockingQueue<>(10), //Task queue: let the task have idle threads in the queue, and then get the task from the queue and execute it Executors.defaultThreadFactory(), //Default factory: the bottom layer will create thread objects in the default way new ThreadPoolExecutor.AbortPolicy() //Reject policy (default): reject when submitting a task > maximum number of threads + task queue ); // ThreadPoolExecutor.AbortPolicy() discards the task and throws an exception (default policy) // If submit task > maximum number of threads + task queue, an exception is thrown for (int i = 1; i <= 16; i++) { pool.submit(new MyRunnable()); // java.util.concurrent.RejectedExecutionException } // Release resources pool.shutdown(); } }
Non default task rejection policy:
ThreadPoolExecutor.DiscardPolicy(); Discard tasks without throwing exceptions (not recommended)
You can only see the results of the maximum number of threads in the thread pool + the capacity of the task queue, and there is no error prompt
ThreadPoolExecutor.DiscardOldestPolicy(); Discard the one with the longest waiting time in the queue and add the current task to the queue
You can only see the results of the maximum number of threads in the thread pool + the capacity of the task queue, and the last one is the most important task
ThreadPoolExecutor.CallerRunsPolicy(); Call the run() of the task to bypass the thread pool
You can only see the result of the maximum number of threads in the thread pool + the capacity of the task queue. The main method helps us perform other tasks
volatile
When a shared variable is modified by volatile, it will ensure that the modified value will be updated to main memory immediately. When other threads need to read, it will read the new value in memory.
Using volatile keyword will force the modified value to be written to main memory immediately! A detailed explanation will be given later
class Test { public static void main(String[] args) { //Create thread object Girl g = new Girl(); g.setName("girl"); Boy b = new Boy(); b.setName("boy"); //Open thread g.start(); b.start(); } } public class Money { // Solve through volatile keyword (modify shared data) // It is mandatory to go to the heap to check the shared data before using the shared data every time, and update your [variable copy] to the latest value in time public static volatile int money = 100000; } public class Boy extends Thread{ @Override public void run() { try { Thread.sleep(10); // Modified amount Money.money = 90000; } catch (InterruptedException e) { e.printStackTrace(); } } } public class Girl extends Thread{ @Override public void run() { while (Money.money == 100000){ } System.out.println("Amount changes, Stop cycle"); } }
volatile keyword can't guarantee atomicity (synchronous lock can),
volatile keyword can only ensure that the value of the variable copy in the current thread stack is new
Send 100 ice cream code examples for 100 times: class Test { public static void main(String[] args) { // Create MyAtomThread object MyAtomThread mt = new MyAtomThread(); // Send 100 ice creams 100 times for (int i = 1; i <= 100; i++) { Thread t = new Thread(mt); t.start(); } } } public class MyAtomThread implements Runnable { private int count = 0; // Number of ice cream @Override public void run() { // Send 100 ice cream for (int i = 1; i <= 100; i++) { /* The code running result may be wrong! Problem analysis: count++It is not an atomic operation. Every operation may be robbed of execution right! 1.Read data from shared data to this thread stack 2.Modify the variable copy in this thread stack 3.Assign the value of the variable copy in this thread stack to the shared data */ //Thread.sleep(10); count++; System.out.println(Thread.currentThread().getName() + "Sent the second" + count + "An ice cream"); } } }
Atomicity
Atomicity refers to the feeling of "living and dying together" when all operations are successful or failed in one or more operations. Even when multiple threads execute together, once an operation starts, it will not be disturbed by other threads. Let's first look at which are atomic operations and which are not. We have an intuitive impression: int a = 10// one
a++; //2
int b=a; //3
a = a+1; //4
Among the above four statements, only the first statement is an atomic operation, assigning 10 to variable a of thread working memory
Statement 2 (a + +) actually contains three operations: 1 Read the value of variable a; 2: Add one to a; 3. Assign the calculated value to variable a, and these three operations cannot constitute atomic operations.
Similarly, the analysis of statements 3 and 4 shows that these two statements are not atomic.
Atomicity - AtomicInteger
In jdk1 After 5, an atomic package (atomic package) under comcurrent package under util package is provided
It provides atomic operations on various classes
AtomicInteger common methods
Construction method:
public AtomicInteger(); New atomicinteger with initial value of 0
public AtomicInteger(int initialValue); New atomicinteger with given initial value
Member method:
int get(); Get value
int getAndIncrement(); Add the current value + 1 atomically and return the value before self increment
int incrementAndGet(); Add the current value + 1 atomically and return the value after self increment
int addAndGet(int data); Add the current value and the parameter atomically and return the addition result
int getAndSet(int value); Set atomically as the value of the parameter and return the old value
public class Demo01_Constructor { public static void main(String[] args) { // Null parameter constructs public AtomicInteger(); AtomicInteger ac = new AtomicInteger(); System.out.println(ac); //0 // Construct public AtomicInteger(int initialValue) with parameters; AtomicInteger ac2 = new AtomicInteger(10); System.out.println(ac2); //10 } } public class Demo02_Method { public static void main(String[] args) { // create object AtomicInteger ac = new AtomicInteger(10); // int get(); Get value System.out.println(ac.get()); //Get value 10 // int getAndIncrement(); Add the current value + 1 atomically and return the value before self increment System.out.println(ac.getAndIncrement()); //Returns the value 10 before self increment System.out.println(ac.get()); //10 + 1 = 11 // int incrementAndGet(); Atomically + 1 the current value and returns the value after self increment System.out.println(ac.incrementAndGet()); //11 + 1 = 12 // int addAndGet(int data); Add the current value and the parameter atomically and return the addition result System.out.println(ac.addAndGet(20)); //Return addition result 32 // int getAndSet(int value); Set atomically as the value of the parameter and return the old value System.out.println(ac.getAndSet(50)); //Return old value 32 System.out.println(ac.get()); //Set to parameter value 50 } }
AtomicInteger - memory parsing
First we pass AtomicInteger Implement the code: class Test { public static void main(String[] args) { MyAtomThread mt = new MyAtomThread(); for (int i = 1; i <= 100; i++) { Thread t = new Thread(mt); t.start(); } } } public class MyAtomThread implements Runnable { //2. Atomicinteger (high efficiency and thread safety) AtomicInteger ac = new AtomicInteger(); //Or give 0 @Override public void run() { for (int i = 1; i <= 100; i++) { // count++; // Data error. The reason is that count + + is not atomic // int incrementAndGet(); Add the current value + 1 atomically and return the value after self increment int count = ac.incrementAndGet(); System.out.println(Thread.currentThread().getName() + "Sent the second" + count + "An ice cream"); } } } AtomicInteger principle: CAS + spin There are three operational data (Memory value V, Old value A, Value to modify B) When old value A == Memory value, Prove that when the current thread operates, No other thread has come, You can modify it at this time, take V Change to B When old value A != Memory value, Prove that when the current thread operates, Other threads have come, It cannot be modified at this time, Spin spin: Get the latest memory value again V, Continue with the above judgment Simple understanding: When modifying shared data, Record the old value before modification If the current memory value, Same as the old value, Prove that no other user has manipulated the memory value, You can modify the memory value If the current memory value, It is different from the original old value, Prove that another thread has operated on the memory value, The memory value cannot be modified Continue to get the latest memory value, Repeat the above operation(spin)
AtomicInteger - source code analysis
Source code analysis: public AtomicInteger(int initialValue) { value = initialValue; } // Auto increment first, and then return the result after auto increment public final int incrementAndGet() { // this: the current AtomicInteger object // 1: Self increasing once // +1: The final result is the result of self increase return U.getAndAddInt(this, VALUE, 1) + 1; } @HotSpotIntrinsicCandidate public final int getAndAddInt(Object o, long offset, int delta) { // o: Memory value // v + delta: modified value // v: Old value int v; // do while: spin and keep getting old values do { v = getIntVolatile(o, offset); // Judgment condition: the weakCompareAndSetInt method compares whether the value in memory is equal to the old value // Case 1: compare whether the memory value and the old value are equal, modify them if they are equal, and return true to end the cycle // Case 2: compare whether the memory value and the old value are equal. If they are not equal, they cannot be modified. Return false to continue the cycle (self rotation) } while (!weakCompareAndSetInt(o, offset, v, v + delta)); return v; }
Pessimistic lock and optimistic lock
The difference between synchronized and CAS:
1. Similarities:
In the case of multithreading, the security of shared data can be guaranteed
2. difference
From the worst point of view, synchronized thinks that every time data is obtained, others may modify it, so it will be locked before sharing data every time (pessimistic lock)
From an optimistic point of view, CAS believes that every time data is obtained, others will not modify it, so it will not be locked Only when modifying the shared data, check whether others have operated the data (leguan lock)
If the operation is, get the latest value again
If there is no operation, modify the value of the shared data directly
Concurrency tool class - Hashtable
HashMap is thread unsafe, and there will be problems in multi-threaded environment
In order to ensure thread safety, we can use Hashtable, but Hashtable is inefficient
The bottom layer of Hashtable is the hash table structure, which is composed of array + linked list
The default length of the array is 16, and the loading factor is 0.75 (the capacity needs to be expanded when 12 are stored)
Linked list is to judge the content first when there are elements in the location where the current element is to be stored
If it is the same, it will not be saved
If not, the old element is hung under the new element to form a linked list (hash bucket)
Reasons for low Hashtable efficiency:
By looking at the underlying code, we find that the underlying layer uses a synchronized pessimistic lock, and the whole table will be locked in each operation
Concurrency tool class - basic use of ConcurrentHashMap
HashMap: thread unsafe
Hashtable: thread safety and low efficiency (the bottom pessimistic lock locks the whole table)
ConcurrentHashMap: thread safety and high efficiency (analyze the underlying differences between JDK7 and 8)
ConcurrentHashMap1.7 principle
When creating a ConcurrentHashMap object:
Create a large array with a length of 16, and the loading factor is 0.75 (Segment [])
Create a small array with a length of 2, assign the address value to the 0 index, and the other index positions are null (HashEntry [])
The array at the 0 index is used as a template. When new elements are added, the 0 index array will be used as a template to create a small array with a length of 2
When adding elements, the position in the large array is calculated according to the hash value of the key
If null, create a small array according to the template
After creation, the position that should be stored in the small array will be hashed twice. Since the first time is null, it will be stored directly
If it is not null, it will hash twice to calculate the position that should be stored in the small array
If the small array needs to be expanded, the expansion is 2 times (stored in index 1)
If there is no need to expand the capacity, it will judge whether the current index position of the small array is null
If it is null, it means there is no element, and it is stored directly
If it is not null, the attribute value is compared according to the equals method
The same does not exist
Otherwise, hang the old element under the new element to form a linked list (hash bucket)
To sum up, if the large array Segment [] is full, it is a 16 * 16 large hash table
Why is it efficient?
Because each operation will only lock the small table (small array HashEntry []), and will not lock the large table
So in jdk1 Before 7, a maximum of 16 threads were allowed to access at one time
ConcurrentHashMap1.8 principle
Related shortcut keys:
Alt + 7: show all methods in the bottom layer
Ctrl + Alt + Shift + U: display inheritance structure in the bottom layer
ConcurrentHashMap is in jdk1 8 bottom layer analysis:
Structure: hash table (array + linked list + red black tree)
Thread safety: CAS mechanism + synchronized code block
1. If you create a ConcurrentHashMap object using a null parameter construct, you do nothing (check the null parameter construct and the null parameters of the parent class)
2. Create a hash table (initTable method) the first time an element is added (when the put method is called)
Calculate the index position where the current element should be stored
If it is null, it means that there is no element, and this node is added to the array through CAS algorithm
If it is not null, it means that there are elements, then use volatile to obtain the latest node address of the current index position and hang it under it to form a linked list. When the length of the linked list is greater than or equal to 8, it will automatically turn into a red black tree
3. Each operation will take the head node of the linked list or tree as the lock object, and cooperate with the pessimistic lock to ensure the safety of multi-threaded operation set
Concurrency tool class - CountDownLatch
CountDownLatch synchronizes the counter. When the counter value is reduced to 0, all threads waiting affected by it will be activated, so as to ensure the authenticity of simulated concurrent requests.
CountDownLatch application scenario
Let one thread wait for other threads to execute before executing
CountDownLatch related methods
1. CountDownLatch(int count); (constructor) indicates the number of threads to wait
2. public void await(); Let the thread wait
3. public void countDown(); Indicates that the current thread has completed execution
case: Implementation using code, Mother waited for three children to eat dumplings, Clean up the dishes and chopsticks after eating mom wait for await(); sout("Clean up the dishes and chopsticks"); children*3 Complementary zone parameter structure sout(name+Eat dumplings); Say you're finished countDown() Test class Open 4 threads establish CountDownLatch()Object passed to 4 threads Code example public class CountDownLatchTest { public static void main(String[] args) { //Create CountDownLatch() object and pass it to 4 threads CountDownLatch countDownLatch = new CountDownLatch(3); //Wait for three (child) threads //Open 4 threads Mother mother = new Mother(countDownLatch); mother.start(); Child01 c1 = new Child01(countDownLatch); Child02 c2 = new Child02(countDownLatch); Child03 c3 = new Child03(countDownLatch); c1.start(); c2.start(); c3.start(); } } //Child 1 class Child01 extends Thread { //Complementary zone parameter structure private CountDownLatch countDownLatch; public Child01(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { //Print results for (int i = 1; i < 10; i++) { System.out.println(getName() + "Eating dumplings" + ",Yes" + i + "individual"); } //Say you're finished countDownLatch.countDown(); } } //Child 2 class Child02 extends Thread { //Complementary zone parameter structure private CountDownLatch countDownLatch; public Child02(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { //Print results for (int i = 1; i < 15; i++) { System.out.println(getName() + "Eating dumplings" + ",Yes" + i + "individual"); } //Say you're finished countDownLatch.countDown(); } } //Child 3 class Child03 extends Thread { //Complementary zone parameter structure private CountDownLatch countDownLatch; public Child03(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { //Print results for (int i = 1; i < 20; i++) { System.out.println(getName() + "Eating dumplings" + ",Yes" + i + "individual"); } //Say you're finished countDownLatch.countDown(); } } //mom class Mother extends Thread { //Complementary zone parameter structure private CountDownLatch countDownLatch; public Mother(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { //wait for try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } //Print results System.out.println("Mother is cleaning up the dishes and chopsticks~"); } }
Concurrency tool class - Semaphore
Typically used to limit the number of threads that can access certain resources (physical or logical)
It can also be compared to a peer certificate, which is issued to one or more threads, returned to the exit (or after execution), and then issued to the next thread or threads
Semaphore construction method
Semaphore(int maximum traffic volume);
Create a Semaphore with a given number of licenses and unfair fair settings
Semaphore(int permits, boolean fair);
Create a Semaphore with a given number of licenses and a given fair setting
Semaphore uses step analysis
1. Someone needs to manage this channel - create a Semaphore object
2. When a car comes in, issue a pass - acquire(); hair
3. If a car goes out, take back the pass - release(); collect
4. If the passes are issued, only the car is allowed to wait - automatic completion
Code example (realization Runnable Interface to implement multithreading) public class SemaphoreTest { public static void main(String[] args) { //Create MyRannable object MyRannable mr = new MyRannable(); //Thread started multiple times for (int i = 1; i <= 50 ; i++) { new Thread(mr).start(); } } } class MyRannable implements Runnable{ //1. Someone needs to manage this channel - create a Semaphore object Semaphore semaphore = new Semaphore(2); //Maximum traffic volume @Override public void run() { //2. When a car comes in, issue a pass - acquire(); hair try { semaphore.acquire(); System.out.println("Get a pass,Car in!"); semaphore.release(); System.out.println("Return the pass,The car is out!"); } catch (InterruptedException e) { e.printStackTrace(); } } }