JUC
1.Summary
The difference between threads and processes
Processes are the basic unit of program running, and a program running is a process.
Threads are the basic unit of cpu scheduling and share memory
1.1 JUC
juc is short for the java.util.courrent toolkit and appeared after jdk1.5
1.2 Thread State
New, Ready, Run, Blocked, Dead
1.3 Differences between wait and sleep
Sleep needs to specify a sleep time, wait does not need
sleep is a method of the Thread class, wait is an object method, and must be used in the synchronized block of code
sleep does not release locks on objects, wait releases locks on objects
1.4 Daemon and non-daemon threads
A daemon thread is a background process that stops when all user threads are finished
Non-daemon threads: normal threads
2. Lock interface
2.1 Synchronized
Modify the synchronization code block or synchronization method
synchronized definition method, cannot be inherited
Method: The common method lock is this, while the static method lock is a Class class
Code block: Monitor object synchronized(this) is locked
class Ticket { //Number of votes private int number = 30; //Operation method: selling tickets public synchronized void sale() { //Judgment: Is there a ticket if(number > 0) { System.out.println(Thread.currentThread().getName()+" :"+(number--)+" "+number); } } }
2.2 Lock
The difference between Lock and Synchronized:
Synchronized does not need to release the lock manually, Lock needs to release the lock manually
Synchronized is JVM level, Lock is API level
Synchronized is an unfair lock, Lock can be a fair lock and an unfair lock
Synchronized cannot be interrupted, Lock can be interrupted and waked up precisely
2.3 Lock interface
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
1. lock
Acquire locks If locks are acquired by other threads, enter the wait
Need to release lock manually
Lock lock = new ReetrantLock(); lock.lock(); try { }catch(Exeception e) { }finally { lock.unlock(); }
2.newCondition
Synchronized wait() and notify () to achieve wait and notification
Lock waits for notification by returning a Condition object with newCondition()
await() == wait()
sigal() == notify()
2.4 ReentrantLock
Re-lockable
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class Airmal{ private int number; public Lock lock=new ReentrantLock(); public Condition condition=lock.newCondition(); public void incrnumber(){ lock.lock(); try { while (number!=0){ condition.await(); } number++; System.out.println(Thread.currentThread().getName()+"\t"+number); condition.signalAll(); }catch (Exception e){ }finally { lock.unlock(); } } public void decrenumber(){ lock.lock(); try { while (number==0){ condition.await(); } number--; System.out.println(Thread.currentThread().getName()+"\t"+number); condition.signalAll(); }catch (Exception e){ }finally { lock.unlock(); } } } public class ProdeCustomerLockDemo { public static void main(String[]ags){ Airmal airmal=new Airmal(); new Thread(()->{ for (int i=1;i<=10;i++){ airmal.incrnumber(); } },"A").start(); new Thread(()->{ for (int i=1;i<=10;i++){ airmal.decrenumber(); } },"B").start(); new Thread(()->{ for (int i=1;i<=10;i++){ airmal.incrnumber(); } },"C").start(); new Thread(()->{ for (int i=1;i<=10;i++){ airmal.decrenumber(); } },"D").start(); } }
Precise wake-up
A B C three threads start, to A-B-C threads execute in turn, A prints 5 times. B prints 10 times, C prints 15 times
Idea: One lock with multiple keys, judged by the mark position
//Condition Precision Control import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * A B C Three threads * A -B-C Execute in sequence * A Print 5 times * B Print 10 * C Print 15 times */ class Airmals{ private int number=1; //Flag 1:A B:2 C:3 private Lock lock=new ReentrantLock(); Condition c1=lock.newCondition(); Condition c2=lock.newCondition(); Condition c3=lock.newCondition(); public void pring5(){ lock.lock(); try { while (number!=1){ c1.await(); } for (int i=1;i<=5;i++){ System.out.println(Thread.currentThread().getName()+"\t"+i); } number=2; c2.signal(); //Wake up B }catch (Exception e){ }finally { lock.unlock(); } } public void pring10(){ lock.lock(); try { while (number!=2){ c2.await(); } for (int i=1;i<=10;i++){ System.out.println(Thread.currentThread().getName()+"\t"+i); } number=3; c3.signal(); }catch (Exception e){ }finally { lock.unlock(); } } public void pring15(){ lock.lock(); try { while (number!=3){ c3.await(); } for (int i=1;i<=15;i++){ System.out.println(Thread.currentThread().getName()+"\t"+i); } number=1; c1.signal(); }catch (Exception e){ }finally { lock.unlock(); } } } public class ConditionDemo { public static void main(String[] args) { Airmals airmals=new Airmals(); new Thread(()->{ for (int i=1;i<=10;i++){ airmals.pring5(); } },"A").start(); new Thread(()->{ for (int i=1;i<=10;i++){ airmals.pring10(); } },"B").start(); new Thread(()->{ for (int i=1;i<=15;i++){ airmals.pring15(); } },"C").start(); } }
2.5 ReadWirteLock
This is an interface that implements the class ReentrantReadWriteLock
There are many rich methods available in ReentrantReadWriteLock, but there are two main ones: readLock() and writeLock() to acquire read and write locks.
To solve the inefficiency of Lock lock for both read and write, ReadWriteLock cannot read and write locks
//Multiple threads operate on a resource //Read-Read can exist //Read-Write cannot coexist //Write-Write cannot coexist class MyLock{ private Map<String,Object> mylock=new HashMap<>(); private ReadWriteLock readWriteLock=new ReentrantReadWriteLock(); public void put(String key,Object value) throws InterruptedException { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName()+"\t"+"---Write data"); TimeUnit.MICROSECONDS.sleep(300); System.out.println(Thread.currentThread().getName()+"\t"+"---Write data successfully"); }catch (Exception e){ }finally { readWriteLock.writeLock().unlock(); } } public void get(String key) throws InterruptedException { readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName()+"\t"+"----Read data"); TimeUnit.MICROSECONDS.sleep(300); System.out.println(Thread.currentThread().getName()+"\t"+"---Read data successfully"); }catch (Exception e){ }finally { readWriteLock.readLock().unlock(); } } } public class ReadWriteLockDemo { public static void main(String[] args) { MyLock myLock=new MyLock(); for (int i=1;i<=3;i++){ final String temp= valueOf(i); //Write operation new Thread(()->{ try { myLock.put(temp, new Object()); } catch (InterruptedException e) { e.printStackTrace(); } }, valueOf(i)).start(); } for (int i=1;i<=3;i++){ final String temp= valueOf(i); new Thread(()->{ try { myLock.get(temp); } catch (InterruptedException e) { e.printStackTrace(); } }, valueOf(i)).start(); } } }
1 - Write data
1 - Successful writing of data
2 - Write data
2 - Successful writing of data
3 - Write data
3 - Successful writing of data
1 - Read data
2 - Read data
3 - Read Data
1 - Successful reading of data
2 - Successful reading of data
3 - Successful reading of data
3. Communication between threads
Communication model: shared memory and messaging
3.1 synchronized scheme
High cohesion and low coupling
/** * Generator consumer demo * Thread programming: * 1.High cohesion, low coupling, thread manipulation resources * 2.Judgment/Work/Notification * 3.Prevent false wake-up */ class Airmant{ private int number=0; public synchronized void increnumber() throws InterruptedException { //judge while (number!=0){ this.wait(); } //work number++; System.out.println(Thread.currentThread().getName()+"\t"+number); //notice this.notifyAll(); } public synchronized void decrennumber() throws InterruptedException { //judge while (number==0){ this.wait(); } //work number--; System.out.println(Thread.currentThread().getName()+"\t"+number); //notice this.notifyAll(); } } public class ProdeCustomerDemo { public static void main(String[]ags){ Airmant airmant=new Airmant(); //High Cohesion new Thread(()->{ try { for(int i=1;i<=10;i++){ airmant.increnumber(); } } catch (InterruptedException e) { e.printStackTrace(); } },"A").start(); new Thread(()->{ try { for(int i=1;i<=10;i++){ airmant.decrennumber(); } } catch (InterruptedException e) { e.printStackTrace(); } },"B").start(); } }
3.2 Lock scheme
class Airmal{ private int number; public Lock lock=new ReentrantLock(); public Condition condition=lock.newCondition(); public void incrnumber(){ lock.lock(); try { while (number!=0){ condition.await(); } number++; System.out.println(Thread.currentThread().getName()+"\t"+number); condition.signalAll(); }catch (Exception e){ }finally { lock.unlock(); } } public void decrenumber(){ lock.lock(); try { while (number==0){ condition.await(); } number--; System.out.println(Thread.currentThread().getName()+"\t"+number); condition.signalAll(); }catch (Exception e){ }finally { lock.unlock(); } } } public class ProdeCustomerLockDemo { public static void main(String[]ags){ Airmal airmal=new Airmal(); new Thread(()->{ for (int i=1;i<=10;i++){ airmal.incrnumber(); } },"A").start(); new Thread(()->{ for (int i=1;i<=10;i++){ airmal.decrenumber(); } },"B").start(); new Thread(()->{ for (int i=1;i<=10;i++){ airmal.incrnumber(); } },"C").start(); new Thread(()->{ for (int i=1;i<=10;i++){ airmal.decrenumber(); } },"D").start(); } }
4. Collection Threads
java.util.ConcurrentModificationExceptio
public class NotSafeDemo { /** * Multiple threads modify the collection at the same time * @param args */ public static void main(String[] args) { List list = new ArrayList(); for (int i = 0; i < 100; i++) { new Thread(() ->{ list.add(UUID.randomUUID().toString()); list.forEach(System.out::println); }, "thread" + i).start(); } } }
4.1 List
Bottom level: The new ArrayList is essentially an array of new Object types
The default size is 10, which is initialized when jdk1.7 is 10, and jdk1.8 is initialized when add is lazy to load
Default expansion is 1.5 times the original capacity
The underlying copy uses Arrays.copyof() - > System.copyof()
Solution
Vector
public class nosaleCollection { public static void main(String[]ags){ /* List <String>list=new ArrayList();*/ List <String>list=new Vector<>(); for(int i=1;i<=30;i++){ new Thread(()->{ list.add(UUID.randomUUID().toString().substring(0,8)); System.out.println(list); },String.valueOf(i)).start(); } } }
Collections.synchronized
public class nosaleCollection { public static void main(String[]ags){ List<String>list= Collections.synchronizedList(new ArrayList<>()); for(int i=1;i<=30;i++){ new Thread(()->{ list.add(UUID.randomUUID().toString().substring(0,8)); System.out.println(list); },String.valueOf(i)).start(); } } }
CopyOnWriteArrayList Copy as Write
Use the idea of read-write separation to ensure the final consistency. When writing data, create a new container, copy the original data, add elements to the new container, and point the original reference to the new container
Advantages: Read-write separation, high concurrency
Disadvantages: New containers need to be created, possibly frequently GC
Read data inconsistently when writing. Strong consistency is not guaranteed and final consistency is guaranteed
public class nosaleCollection { public static void main(String[]ags){ List<String> list=new CopyOnWriteArrayList<>(); for(int i=1;i<=30;i++){ new Thread(()->{ list.add(UUID.randomUUID().toString().substring(0,8)); System.out.println(list); },String.valueOf(i)).start(); } } }
4.2 Map
The default size is 16, which doubles the original capacity, such as 16 to 32
Solve
Collections.synchronizedMap
ConcurrentHashMap
HashTable
4.3 Set
HashSet uses HashMap at the bottom
HashSet's add() method has only one parameter plus hashMap (key,value) because haspMap(e,object)
Solution
Collections.synchronizedSet
public class nosaleCollection { public static void main(String[]ags){ Set <String>set=Collections.synchronizedSet(new HashSet<>()); for (int i=1;i<=30;i++){ new Thread(()->{ set.add(UUID.randomUUID().toString().substring(0,8)); },String.valueOf(i)).start(); } } private static void nosaftList() { List<String> list=new CopyOnWriteArrayList<>(); for(int i=1;i<=30;i++){ new Thread(()->{ list.add(UUID.randomUUID().toString().substring(0,8)); System.out.println(list); },String.valueOf(i)).start(); } } }
CopyOnWriteHashSet
public class nosaleCollection { public static void main(String[]ags){ Set <String>set=new CopyOnWriteArraySet<>(); for (int i=1;i<=30;i++){ new Thread(()->{ set.add(UUID.randomUUID().toString().substring(0,8)); System.out.println(set); },String.valueOf(i)).start(); } } private static void nosaftList() { List<String> list=new CopyOnWriteArrayList<>(); for(int i=1;i<=30;i++){ new Thread(()->{ list.add(UUID.randomUUID().toString().substring(0,8)); System.out.println(list); },String.valueOf(i)).start(); } } }
5. Multithreaded problem
class Phone { public static synchronized void sendSMS() throws Exception { //Stay for 4 seconds at TimeUnit.SECONDS.sleep(4); System.out.println("------sendSMS"); } public synchronized void sendEmail() throws Exception { System.out.println(" ---- sendEmail"); } public void getHello() { System.out.println(" --- getHello"); } }
1 Standard Access, Print SMS or Mail first ------- sendSMS --------- sendEmail
2 Stop for 4 seconds in SMS method, print SMS or email first ------- sendSMS --------- sendEmail
3 New common Hello method is text messaging or hello ----- getHello ------- sendSMS
4 Now there are two mobile phones, print SMS or email first ------- sendEmail ------- sendSMS
5 Two static synchronization methods, one mobile phone, print SMS or email first ------- sendSMS --------- sendEmail
6 Two static synchronization methods, 2 Mobile phones, print SMS or email first ------- sendSMS --------- sendEmail
7 1 static synchronization method, 1 common synchronization method, 1 mobile phone, print SMS or email first ------- sendEmail --------- sendSMS
81 static synchronization methods, 1 common synchronization method, 2 Mobile phones, print SMS or email first ------- sendEmail --------- sendSMS
It is manifested in the following three forms.
For normal synchronization methods, locks are the current instance object.
For static synchronization methods, locks are Class objects of the current class.
For synchronization method blocks, locks are objects configured in Synchonized parentheses
6. Callable
6.1 How threads are implemented
-
Inherit Thread
-
Implement Runnable Interface
-
Implement Callable Interface
-
Thread pool creation
6.2 Callable interface
//Thread interface callable with return value class MyThread implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("======Callable=========="); return 1024; } } public class CallbackDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask futureTask = new FutureTask(new MyThread()); new Thread(futureTask,"a").start(); Integer task = (Integer) futureTask.get(); System.out.println(task); } }
6.3 Future Interface
public boolean cancel(boolean mayInterrupt): Used to stop tasks If it has not been started, it will stop the task. If it has been started, only mayInterrupt by true Will interrupt the task
public Object get()Throw InterruptedException,ExecutionException: Used to get the results of a task. If the task is completed, it will return the results immediately, otherwise it will wait for the task to complete and then return the results
• public boolean isDone(): If the task is completed, return true,Otherwise return false
6.4 FutureTask
When a time-consuming operation needs to be performed in the main thread, but you don't want to block the main thread, you can leave these jobs behind to the Future object to complete in the background
- Future pair calculation results when the main thread needs them
- Used mostly for time-consuming calculations
7. Three JUC auxiliary classes
7.1 CountDownLatch
Decrease Count
CountDownLatch class sets a counter
Reduce 1 by countDown, use await wait counter no more than 0, and let the statement after await execute
countdownlatch calls the await method, this thread will block
countDown method counter minus 1
When the counter is 0, await blocked threads wake up
//Six people left before main closed public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch=new CountDownLatch(6); for (int i=1;i<=6;i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t Leave Classroom"); countDownLatch.countDown(); },String.valueOf(i)).start(); } countDownLatch.await(); System.out.println("main close"); } }
7.2 CyclicBarrier
Circulating fence
CyclicBarrier, looking at the English words, can see that it probably means circular blocking. The first parameter of CyclicBarrier's construction method in use is the number of target barriers. The number of obstacles per execution of CyclicBarrier is increased by one. If the number of target barriers is reached, the statement after cyclicBarrier.await() will be executed.
public class CyclicBrrierDemo { public static void main(String[] args) { CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{ System.out.println("*****Call Dragon"); }); for (int i=1;i<=7;i++){ final Integer temp=i; new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t Collected"+temp+"Dragon bead"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } },String.valueOf(i)).start(); } } }
7.3 Semaphore
The first parameter passed in Semaphore's construction method is the maximum semaphore, which can be viewed as the maximum thread pool, and each semaphore is initialized to a maximum of one license for distribution. Use the acquire method to obtain a license, and the release method to release the license
Scene: Grab parks, 6 cars, 3 parking spaces
public class SemaphoreDemo { public static void main(String[] args) { Semaphore semaphore=new Semaphore(3); //Simulated resource class with 3 empty parking spaces for (int i=1;i<=6;i++){ new Thread(()->{ try { semaphore.acquire(); //Number-1 System.out.println(Thread.currentThread().getName()+"\t Grab parking space"); TimeUnit.SECONDS.sleep(4); System.out.println(Thread.currentThread().getName()+"\t Leave parking"); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release(); //Number + 1 } },String.valueOf(i)).start(); } } }
8. Read and write locks
Writing is less frequent when reading and writing shared resources.
No locks required for reading
ReentrantReadWriteLock Represents two locks, one read as a shared lock and one write-related lock as an exclusive lock.
Read Lock Read Lock Only
Write Lock Other unreadable and write locks
//Multiple threads operate on a resource //Read-Read can exist //Read-Write cannot coexist //Write-Write cannot coexist class MyLock{ private Map<String,Object> mylock=new HashMap<>(); private ReadWriteLock readWriteLock=new ReentrantReadWriteLock(); public void put(String key,Object value) throws InterruptedException { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName()+"\t"+"---Write data"); TimeUnit.MICROSECONDS.sleep(300); System.out.println(Thread.currentThread().getName()+"\t"+"---Write data successfully"); }catch (Exception e){ }finally { readWriteLock.writeLock().unlock(); } } public void get(String key) throws InterruptedException { readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName()+"\t"+"----Read data"); TimeUnit.MICROSECONDS.sleep(300); System.out.println(Thread.currentThread().getName()+"\t"+"---Read data successfully"); }catch (Exception e){ }finally { readWriteLock.readLock().unlock(); } } } public class ReadWriteLockDemo { public static void main(String[] args) { MyLock myLock=new MyLock(); for (int i=1;i<=3;i++){ final String temp= valueOf(i); //Write operation new Thread(()->{ try { myLock.put(temp, new Object()); } catch (InterruptedException e) { e.printStackTrace(); } }, valueOf(i)).start(); } for (int i=1;i<=3;i++){ final String temp= valueOf(i); new Thread(()->{ try { myLock.get(temp); } catch (InterruptedException e) { e.printStackTrace(); } }, valueOf(i)).start(); } } }
9. Blocking queues
9.1 BlockingQueue
Queue FIFO
Blocking queue has blocking function, fetching elements when there are no elements will block the thread to release CPU
Blocked when adding elements when the queue is full
Will automatically wake up the thread
Method Type | throw | Special Values | block | overtime |
---|---|---|---|---|
insert | add(e) | offer(e) | put(e) | offer(e,time,unit) |
remove | remove() | poll() | take() | poll(time,unit) |
inspect | element() | peek() | Not available | Not available |
throw | When the blocking queue is full, throw an exception at add. When the blocking queue is empty, remove throws an exception |
---|---|
Special Values | Insert method, successful true otherwise false remove method, successful queue element, null if none |
Always Blocked | When the queue is full, blocking or responding to interruptions is empty, take keeps blocking consumer threads |
Timeout Exit | The queue is full and blocks the producer for some time. |
/** * Blocking Queue */ public class BlockingQueueDemo { public static void main(String[] args) throws InterruptedException { // List list = new ArrayList(); BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); //first group // System.out.println(blockingQueue.add("a")); // System.out.println(blockingQueue.add("b")); // System.out.println(blockingQueue.add("c")); // System.out.println(blockingQueue.element()); //System.out.println(blockingQueue.add("x")); // System.out.println(blockingQueue.remove()); // System.out.println(blockingQueue.remove()); // System.out.println(blockingQueue.remove()); // System.out.println(blockingQueue.remove()); // Group 2 // System.out.println(blockingQueue.offer("a")); // System.out.println(blockingQueue.offer("b")); // System.out.println(blockingQueue.offer("c")); // System.out.println(blockingQueue.offer("x")); // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); // Group 3 // blockingQueue.put("a"); // blockingQueue.put("b"); // blockingQueue.put("c"); // //blockingQueue.put("x"); // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); // Group 4 System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); System.out.println(blockingQueue.offer("a",3L, TimeUnit.SECONDS)); } }
9.2 Kinds
1.ArrayBlockingQueue
Bounded blocking queue of arrays
Maintained a fixed-length array
Save two integer variables to identify the position of the queue's head and tail in the array
Producer puts in data and consumer consumption data and shares a single object lock, LinkedBlockingQueue does not use the same object lock
Unlike LinkedBlockingQueue, insertion and deletion results in an additional Node object, ArrayBlockingQueue does not
2. LinkedBlockingQueue
A blocked queue of chained lists
Size is Integer.MAX_VALUE
The queue consists of a list of chains
LinkedBlockingQueue is able to process concurrent data efficiently because it uses separate locks for both producers and consumers to control data synchronization.
3.PriorityBlockingQueue
Unbounded blocking queue for priority ordering
Priority is determined by Compator object
Not blocking producers, only consumers when there is no data they can consume
Producer production data must never be faster than consumer consumption data
4.SynchronousQueue
A blocking queue that does not store elements, that is, a queue of individual elements
5. DelayQueue
Delayed Unbounded Blocking Queue Using Priority Queue
An element in a DelayQueue can only be retrieved from a queue if its specified delay time is reached. A DelayQueue is a queue with no size limit, so the operation to insert data into the queue (producer) will never be blocked, and only the operation to get data (consumer) will be blocked.
6. LinkedTransferQueue
An unbounded blocking queue consisting of a list of chains
Preemptive mode
When consumers get elements, if the queue is not empty, take elements
The queue is empty, and a node (element is null) is generated. When the generator joins the queue, it is found to be a null node. The producer does not join the queue and fills the element. The node wakes up the waiting thread, and the waked thread fetches the element.
7. LinkedBlockingDeque
A two-way blocking queue consisting of a list of chains
9.3 Summary
-
In a multi-threaded realm: so-called blocking, threads are suspended (i.e., blocked) in some cases, and once conditions are met, suspended threads are automatically awakened
-
Why do I need BlockingQueue?Before the concurrent package was released, in a multi-threaded environment, each of us programmers had to control these details on our own, with particular regard to efficiency and thread security, which caused considerable complexity to our programs. After use, we didn't need to worry about when we needed to block threads or wake them up, because all of this BlockingQueue happenedAll done for you
10. Thread pool
Pool holding threads
No need to create and destroy threads every time to reduce resource destruction
Increase response speed
Easy Thread Management
[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-gESvOyeU-1632504298472)(img/Sogou Truncation Figure 2210731222039.png)]
10.1 Core Parameters
- Number of corePoolSize core threads
- maxinumPoolSize Maximum Threads
- Ke keepAliveTime idle thread lifetime
- Unit time unit
- workQueue blocking queue
- threadFactory Factory
- handler rejection policy
10.2 Principle
Submit task, current number of threads < corepoolsize Create Core Thread Processing
CurrtThreadNum > corePoolSize but put into blocking queue when blocking queue is not full
Create non-core threads to process when the blocking queue is full and the number of threads <= maxinumpoolsize
Rejection policy when number of threads > maxinumpoolsize
10.3 Rejection Policy
AbortPolicy: Discard Tasks, Thread Pool Default Rejection Policy
CallerRunsPolicy: Return to caller
DiscardPolicy: Discard directly
DiscardOldestPolicy: Trigger rejection policy, discard oldest one
10.4 Executors Tool Class
1.newCachedThreadPool
Role: Create a cacheable thread pool that allows for flexible recycling of idle threads if the length of the thread pool exceeds processing requirements, or a new thread if none is available
- No fixed number of threads to reach Integer.MAX_VALUE
- Thread pools can reuse and recycle caches
- A thread is recreated when no thread is available in the thread pool
/** * Cacheable Thread Pool * @return */ public static ExecutorService newCachedThreadPool(){ /** * corePoolSize Number of core threads in the thread pool * maximumPoolSize Maximum number of threads to accommodate * keepAliveTime Idle Thread Lifetime * unit Unit of time for survival * workQueue Store queues submitted but not executed * threadFactory Factory class for creating threads: can be omitted * handler Rejection policy after waiting for the queue to be full: can be omitted */ return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); }
Scenario: For scenarios where you create an infinitely expandable thread pool with less server load pressure, shorter execution time, and more tasks
2.newFixedThreadPool
Create a thread pool that can reuse a fixed number of threads
Threads in the thread pool are at a certain amount, which is a good way to control the concurrency of threads
* Threads can be reused and will persist until the display is closed
* Wait in queue when more than a certain number of threads are submitted
/** * Fixed-length thread pool * @return */ public static ExecutorService newFixedThreadPool(){ /** * corePoolSize Number of core threads in the thread pool * maximumPoolSize Maximum number of threads to accommodate * keepAliveTime Idle Thread Lifetime * unit Unit of time for survival * workQueue Store queues submitted but not executed * threadFactory Factory class for creating threads: can be omitted * handler Rejection policy after waiting for the queue to be full: can be omitted */ return new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); }
Scenario: For scenarios where the number of threads can be predicted in a business or where the server is heavily loaded and there is a strict limit on the number of threads
3.newSingleThreadExecutor
Create an Executor that uses a single worker thread
Feature: A maximum of one thread is executed in the thread pool, and subsequently submitted thread activities are queued for execution
/** * Single Thread Pool * @return */ public static ExecutorService newSingleThreadExecutor(){ /** * corePoolSize Number of core threads in the thread pool * maximumPoolSize Maximum number of threads to accommodate * keepAliveTime Idle Thread Lifetime * unit Unit of time for survival * workQueue Store queues submitted but not executed * threadFactory Factory class for creating threads: can be omitted * handler Rejection policy after waiting for the queue to be full: can be omitted */ return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); }
Scenario: For scenarios where you need to ensure that tasks are executed sequentially and that there are no multiple threads at any point in time
public class ThreadPoolDemo1 { /** * Three ticket outlets in the railway station, 10 users buy tickets * @param args */ public static void main(String[] args) { //Timer threads: 3 threads - 3 windows ExecutorService threadService = new ThreadPoolExecutor(3, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); try { //Ten people buy tickets for (int i = 1; i <= 10; i++) { threadService.execute(()->{ try { System.out.println(Thread.currentThread().getName() + "window,Start selling tickets"); Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + "End of window ticket"); }catch (Exception e){ e.printStackTrace(); }); }catch (Exception e){ e.printStackTrace(); }finally { //End after completion }
10.5 Thread Creation Method
Threads are not created through Executors because the LinkedBlockingQueue used at the bottom of FixedThreadPool and SingleThreadExecutor has a maximum length of Integer.MAX_VALUE (OOM), and the maximum number of threads for CacheThreadPool is Integer.MAX_VALUE(CPU pressure)
10.6 Manual Creation
new ThreadPoolExecutor to create objects
11.Fork /Join
fork splits large tasks into small ones
join merges tasks
1.Fork
1.ForkJoinTask
First you need to create a ForkJoin task. This class provides a mechanism to execute fork and join in a task. Normally, we don't need to integrate the ForkJoinTask class directly, we just need to inherit its subclasses. The Fork/Join framework provides two subclasses:
a.RecursiveAction: For tasks that do not return results
b.RecursiveTask: Task for returning results
2.ForkJoinPool
ForkJoinPool: ForkJoinTask needs to be executed via ForkJoinPool
3.RecursiveTask:
Tasks that can be invoked recursively (self-adjusting) after inheritance
2. Examples
/** * Recursive Accumulation */ public class TaskExample extends RecursiveTask<Long> { private int start; private int end; private long sum; /** * Constructor * @param start * @param end */ public TaskExample(int start, int end){ this.start = start; this.end = end; } /** * The main computation performed by this task. * * @return the result of the computation */ @Override protected Long compute() { System.out.println("task" + start + "=========" + end + "Accumulation Start"); //More than 100 additive slices, less than direct additions if(end - start <= 100){ for (int i = start; i <= end; i++) { //accumulation sum += i; } }else { //Divide into 2 pieces int middle = start + 100; //Recursive call, divided into two small tasks TaskExample taskExample1 = new TaskExample(start, middle); TaskExample taskExample2 = new TaskExample(middle + 1, end); //Execution: Asynchronous taskExample1.fork(); taskExample2.fork(); //Synchronize blocking to get execution results sum = taskExample1.join() + taskExample2.join(); } //Add Back return sum; } }
public class ForkJoinPoolDemo { /** * Generate a calculation task, calculate 1+2+3.... +1000 * @param args */ public static void main(String[] args) { //Define Tasks TaskExample taskExample = new TaskExample(1, 1000); //Define execution object ForkJoinPool forkJoinPool = new ForkJoinPool(); //Join Task Execution ForkJoinTask<Long> result = forkJoinPool.submit(taskExample); //Output Results try { System.out.println(result.get()); }catch (Exception e){ e.printStackTrace(); }finally { forkJoinPool.shutdown(); } } }