JUC common classes
It refers to the class under the import java.util.concurrent package, which is usually used to solve the problem of multi-threaded coordination
- lock and derived ReentrantLock
- Security classes of various containers: CopyOnWriteArrayList, ConcurrentHashMap
- Unsafe collection to safe collection: Collections.synchronizedLis()
- ...
Producer and consumer questions (interview)
- If the producer is not empty, inform the consumer to consume
- When the goods are consumed, inform the consumers to consume
- Tug of war between producers and consumers
Using wait and notify
//producer class Producer { final Produce produce; public Producer(Produce produce) { this.produce = produce; } //production public void produce() { for (int i = 0; i < 100; i++) { produce.add(); } } } //consumer class Consumer { final Produce produce; public Consumer(Produce produce) { this.produce = produce; } //consumption public void consume() { for (int i = 0; i < 1000; i++) { produce.dec(); } } } //product class Produce { Queue<Integer> data = new LinkedList<>(); int count = 0; public synchronized void add(){ while (data.size()>=10){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } data.offer(count); System.out.println("Products produced:" + count++); notifyAll(); } public synchronized void dec(){ while (data.size()==0){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Products consumed:" + data.poll()); notifyAll(); } }
Condition
- It is usually created through lock.newCondition
//product class Produce { Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); Queue<Integer> data = new LinkedList<>(); int count = 0; public void add() { lock.lock(); try { while (data.size() >= 10) { try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } data.offer(count); System.out.println("Products produced:" + count++); condition.signalAll(); } finally { lock.unlock(); } } public void dec() { lock.lock(); try { while (data.size() == 0) { try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Products consumed:" + data.poll()); condition.signalAll(); } finally { lock.unlock(); } } }
- Multiple conditions can realize accurate wake-up
package com.hzy.juc.pc; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class C { public static void main(String[] args) { Data3 data3=new Data3(); new Thread(()->{ for (int i = 0; i < 10; i++) { data3.printA(); } },"A").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { data3.printB(); } },"B").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { data3.printC(); } },"C").start(); } } class Data3{ private Lock lock=new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); private int number=1; //1a 2b 3c public void printA(){ lock.lock(); try { while (number!=1) condition1.await(); { } System.out.println(Thread.currentThread().getName()+"=>AAAAAA"); number=2; condition2.signal();//Wake up the designated person to work } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printB(){ lock.lock(); try { while (number!=2) condition2.await(); { } System.out.println(Thread.currentThread().getName()+"=>BBBBBB"); number=3; condition3.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printC(){ lock.lock(); try { while (number!=3) condition3.await(); { } System.out.println(Thread.currentThread().getName()+"=>CCCCCC"); number=1; condition1.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
Counter
Countdownlatch
- Thread safety
- Multiple threads countDown, execute downward for 0await
public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { //The total number is 6. You can use it when you have to perform tasks! CountDownLatch countDownLatch=new CountDownLatch(6); for (int i = 1; i <= 6; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+"go out"); countDownLatch.countDown(); },String.valueOf(i)).start(); } countDownLatch.await();//Wait for the counter to return to 0 before executing downward System.out.println("Close Door"); } }
Cyclicbarrier (addition counter)
public class CyclicBarrierDemo { public static void main(String[] args) { /** * Gather 7 dragon balls to summon the Dragon * Thread calling Dragon Ball * */ CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{ System.out.println("Summon dragon successfully!"); }); for (int i = 1; i <= 7 ; i++) { final int temp=i; new Thread(()->{ System.out.println(Thread.currentThread().getName()+"Collected"+temp+"Dragon Ball"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }
Semaphore
Multiple semaphores can be used alternately by multiple threads
Mutually exclusive use of multiple shared resources! Concurrent flow restriction, control the maximum number of threads!
acquire(); Get, if it is full, wait until it is released!
release(); Release will release the current semaphore by + 1, and then wake up the waiting thread
//Up to three simultaneous visits Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 10; i++) { new Thread(()->{ try { //Get the license, if not, wait semaphore.acquire(); //Release the license in one second System.out.println(Thread.currentThread().getName()+"In execution"); TimeUnit.SECONDS.sleep(1); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }
Read write lock
- Read is a shared lock and write is an exclusive lock
- It is divided into writeLock().lock() and readLock().lock();
private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
queue
Blocking queue
- FIFO (first in first out)
- Full block, empty block
- Scenario: concurrent processing, thread pool
- add&remove
- offer&poll
ArrayBlockingQueue blockingQueue=new ArrayBlockingQueue(int num);
Synchronization queue
- Only one can be put in, and you must wait for it to be taken out (alternate printing)
public class SynchronousQueueDemo { public static void main(String[] args) { BlockingQueue<String> synchronousQueue= new SynchronousQueue(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"=>"+"put 1"); synchronousQueue.put("1"); System.out.println(Thread.currentThread().getName()+"=>"+"put 2"); synchronousQueue.put("2"); System.out.println(Thread.currentThread().getName()+"=>"+"put 3"); synchronousQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } },"T1").start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"=>"+synchronousQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"=>"+synchronousQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"=>"+synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"T2").start(); } }
Thread pool (key)
Pooling Technology: prepare some resources in advance and return them to me after use (no recycling)
- Save thread consumption
- Increase the desired speed
- And management
There is an article in the concurrent programming section of Alibaba Development Manual: thread pools are not allowed to be created using Executors, but through the way of ThreadPoolExecutor and the source code to analyze the reasons for the prohibition.
After reading the Executors source code, I found that it was just written parameters.
-
newFixedThreadPool
Function: this method returns a thread pool with a fixed number of threads. The number of threads can be customized. The maximum number of threads in the thread pool created by this method is equal to the number of core threads. If the newly submitted task has no idle thread to process, it will be put into the blocking queue.
Disadvantages: the blocking queue used by the thread pool is LinkedBlockingQueue: linked list blocking queue. The default capacity is Integer.MAX_VALUE: if the capacity is too large, a large number of tasks may be stacked, resulting in oom (memory overflow)
-
newSingleThreadExecutor
Function: this method creates a thread pool with only one thread. If there is no idle thread to process the submitted task, it will be put into the blocking queue
Disadvantages: the blocking queue used by the thread pool is LinkedBlockingQueue: linked list blocking queue. The default capacity is Integer.MAX_VALUE. If the capacity is too large, a large number of tasks may be accumulated, resulting in oom (memory overflow)
-
newCachedThreadPool
Function: this method returns a thread pool that can adjust the number of threads according to the actual demand. If the submitted task has no idle thread processing, a new thread will be created to process the task. If a thread is idle for more than 60 seconds, it will be destroyed
Disadvantages: the maximum number of threads allowed to be created in this thread pool is Integer.MAX_VALUE, which may create a large number of threads, resulting in OOM (memory overflow)
-
newScheduleThreadPool
Function: this method can create a thread pool with custom core thread capacity, and the thread pool supports timed and periodic task execution.
Disadvantages: the maximum number of threads allowed to be created in this thread pool is Integer.MAX_VALUE, which may create a large number of threads, resulting in OOM (memory overflow)
FixedThreadPool
FixedThreadPool is a shared unbounded queue that reuses a fixed number of threads
- It is a fixed size thread pool;
- Both corePoolSize and maximunPoolSize are the number of threads nThreads set by the user;
- keepAliveTime is 0, which means that once there are redundant idle threads, they will be stopped immediately; however, keepAliveTime here is invalid;
- The blocking queue adopts LinkedBlockingQueue, which is an unbounded queue;
- Because the blocking queue is an unbounded queue, it is never possible to reject a task;
- Due to the unbounded queue, the actual number of threads will always be maintained at nThreads, so the maximumPoolSize and keepAliveTime will be invalid.
SingleThreadExecutor
- It will only create a worker thread to process the task;
- The blocking queue used is LinkedBlockingQueue;
CachedThreadPool
- It is a thread pool that can be expanded infinitely;
- It is more suitable for processing tasks with relatively small execution time;
- corePoolSize is 0 and maximumPoolSize is infinite, which means that the number of threads can be infinite;
- keepAliveTime is 60S, which means that the thread will be killed if the idle time exceeds 60S;
- Synchronous queue is used to load waiting tasks. This blocking queue has no storage space, which means that as long as a request arrives, a working thread must be found to process it. If there is no idle thread at present, a new thread will be created.
Scheduledthreadpool (deferred task)
Scheduledthreadpool. Schedule (() - > system. Out. Println ("hello"), int time, timeunit. Minutes);: time print hello in minutes
- It receives tasks of type scheduledfuturetask. There are two ways to submit tasks:
- scheduledAtFixedRate
- scheduledWithFixedDelay
- Parameters received by scheduledfuturetask:
- Time: the time when the task started
- sequenceNumber: sequence number of the task
- period: the time interval of task execution
- It uses DelayQueue to store waiting tasks
- DelayQueue internally encapsulates a PriorityQueue, which will be sorted according to the time sequence. If the time is the same, it will be sorted according to the sequenceNumber;
- DelayQueue is also an unbounded queue;
- Execution process of worker thread:
- The worker thread will fetch the expired task from the DelayQueue to execute;
- After execution, reset the expiration time of the task and put it back into the DelayQueue again
ForkJoin
ForkJoin executes tasks in parallel in JDK1.7! Improve efficiency, big data! (split big tasks into small tasks)
public class Main { public static void main(String[] args) throws Exception { // Create an array of 2000 random numbers: long[] array = new long[2000]; long expectedSum = 0; for (int i = 0; i < array.length; i++) { array[i] = random(); expectedSum += array[i]; } System.out.println("Expected sum: " + expectedSum); // fork/join: ForkJoinTask<Long> task = new SumTask(array, 0, array.length); long startTime = System.currentTimeMillis(); Long result = ForkJoinPool.commonPool().invoke(task); long endTime = System.currentTimeMillis(); System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms."); } static Random random = new Random(0); static long random() { return random.nextInt(10000); } } class SumTask extends RecursiveTask<Long> { static final int THRESHOLD = 500; long[] array; int start; int end; SumTask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } @Override protected Long compute() { if (end - start <= THRESHOLD) { // If the task is small enough, calculate directly: long sum = 0; for (int i = start; i < end; i++) { sum += this.array[i]; // Deliberately slow down the calculation speed: try { Thread.sleep(1); } catch (InterruptedException e) { } } return sum; } // The task is too big and divided into two: int middle = (end + start) / 2; System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end)); SumTask subtask1 = new SumTask(this.array, start, middle); SumTask subtask2 = new SumTask(this.array, middle, end); invokeAll(subtask1, subtask2); Long subresult1 = subtask1.join(); Long subresult2 = subtask2.join(); Long result = subresult1 + subresult2; System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result); return result; } }
Future
- Ajax like
- Model the outcome of a future event
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"supplyAsync"+":Integer"); return 1024; }); System.out.println(completableFuture.whenComplete((t, u) -> { //t: Normal return result //u: Error message System.out.println(t + "\t" + u); }).exceptionally((e) -> { e.getMessage();//Print error message return 233;// You can get the return result of the error }).get());
CompletableFuture
Introduced by Java 8 and optimized for the Future, the callback object can be passed in. When the task ends, the method of an object will be called back automatically and can be executed serially. An asynchronous task can be created by using a static method:
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> { System.out.println("hello"); return "hello"; }); CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println("world"); return "world"; }); CompletableFuture<Void> completableFuture = CompletableFuture.allOf(completableFuture1, completableFuture2); completableFuture.thenAccept(c -> { System.out.println("The task has been completed"); });