JUC Knowledge Point-Complete Set
What is juc?
java.util.concurrent | |
---|---|
java.util.concurrent.locks | Lock-related API s |
java.util.concurrent.atomic | Atomic operation api |
Thread Safe and Flexible |
JUC classification summary?
lock | ReentrantLock, ReadWriteLock, StampedLock, Wait for Condition notification, Block wake-up LockSupport |
---|---|
Atoms | Basic Type, Array Type, Reference Type, Field Type [AtomicLong] [AtomicLongArray] [AtomicReference] [AtomicReference] |
Blocking Queue | Non-blocking queue ConcurrentLinkedDeque, blocking queue [BlockingDeque, AbstractQueue], ArrayBlockingQueue, ConcurrentLinkedQueue, synchronous queue, delayed queue, priority PriorityQueue |
Thread Pool | [Single, Cacheable, Fixed Size, Cycle], Custom Thread Pool, [forkjoin poll] Divide and conquer, Work Stealing, future callable |
Concurrent Collection | CopyOnWriteArrayList,CopyOnWriteArraySet,ConcurrentHashMap |
Concurrent Tool Class | Semaphore, Current Limiting Signal, Starter Gun: CountDownLatch, Cyclic Barrier, ThreadLocalRandom |
Three Core Ideas of JUC
AQS | AbstractQueuedSynchronizer Abstract Synchronizer juc Lock Core Ideas |
---|---|
CAS | CompareAndSwap compares and replaces Atomic updates without locking, does not require cpu context switching, and wastes cpu performance |
volatile | Visibility, main memory data, consistent with the number of internal copies of threads |
Lock
[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-iBWRYdfE-1621751537636) (C:\UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-2021110.png)]
Pessimistic Lock: When fetching data, worry about data being modified by other threads, so pessimistic add a lock, pessimistic lock. All-in-one approach to reduce performance. Thread security. ReentrantLock Optimistic lock: When fetching data, do not worry about the data being modified by other threads, so do not lock, wait for updates to check if the data is modified. If the check data is modified, the update fails. Don't switch contexts, waste cpu. [CAS] Shared lock: Read lock, multiple threads can hold thread locks. Can be considered unlocked Exclusive locks: Mutual exclusive locks, where a thread holds a lock. Fair Lock: First come, first get, order get. Reduced performance. Unfair Lock: No need, luck provides performance, Spin lock: CAS Thought: No spin, no waste cpu,No blocking. Non-spin lock: Causes the thread to wait. Interruptible lock: Send signal, interrupt acquisition lock, increase flexibility. Non-interruptable lock:The acquisition of a lock cannot be interrupted, and the lock will be released all the time. Re-lockable: Acquires locks multiple times, avoiding deadlocks to some extent Non-Re-Lockable: Locks cannot be acquired multiple times, but only once.
ReentrantLock
Characteristic
- Lock timeout supported
- Support interruptible
- Supports fair locks, unfair locks
- Get the result of the lock
- ConditionWaiting for Notification
- Reentrant locks
API
lock | Acquire locks |
---|---|
unlock | Release lock |
boolean trylock() | Attempting to acquire a lock |
boolean trylock(time,unit) | Acquire locks within a limited time |
lockInterruptibly() | Corresponding interruption |
newCondition() | Waiting for Notification Queue |
Re-lockable
Non-reentrant: Threads can only acquire one lock. Deadlock is prone. sun.lock Reentrable: Threads can acquire the same lock multiple times. Avoid deadlocks
package com.company; import sun.misc.Lock; /** * Non-reentrant lock */ public class NoLockDemo { public static void main(String[] args) { Lock lock=new Lock(); try { lock.lock(); System.out.println("First Acquisition Lock"); lock.lock(); System.out.println("Acquire lock again"); } catch (InterruptedException e) { e.printStackTrace(); } } }
package com.company; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Re-lockable */ public class NoLockDemo { public static void main(String[] args) { Lock lock=new ReentrantLock(); try { lock.lock(); System.out.println("First Acquisition Lock"); lock.lock(); System.out.println("Acquire lock again"); } catch (Exception e) { e.printStackTrace(); } } }
Reentrainable Lock Principle
A counter is maintained internally. One lock acquisition adds one. Release the lock once, subtract it once.
How do I get the lock?
Namely AQS A variable is maintained internally state Equal to 0 means no lock, equal to 1 means acquired lock, greater than 1 means re-lockable.
[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-pyTraeHg-16211537638) (C:\UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-202107042.png)]
Fair-unfair locks
Fair Lock: care about queues, order, first come first. Cooking Unfair Locks: Do not care about order, do not care about queue conditions. Bandit
Case Demonstration
package com.company; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Fair lock construction method true: * * The unfair lock construction method false defaults to unfair locks: * */ public class ImpartialLockTest { //The default construction method is an unfair lock, a false fair lock, and a true is a fair lock Lock lock =new ReentrantLock(true); public void getLocks() { lock.lock();//Unfair does not queue, fair locks queue System.out.println(Thread.currentThread().getName()+" Acquire locks"); lock.unlock(); } public static void main(String[] args) { //Synchronize resources ImpartialLockTest tt=new ImpartialLockTest(); new Thread(()->{ tt.getLocks(); },"thread A").start(); new Thread(()->{ tt.getLocks(); },"thread B").start(); } }
principle
Unfair Lock: Acquire locks first to pass CAS,If it fails try to acquire the lock, try to acquire the lock (unlock, acquire the lock, check if the current thread is its own) If it succeeds, end, otherwise wrap yourself up as a Node Nodes are placed in the blocking queue and the blocking is released. Fair Lock: Determine if there are any waiting nodes in the blocked queue first. If it does, it wraps itself up as Node Node to wait (reentrant check) or pass CAS Update acquisition lock.
[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-zCUSUIeA-16211537642) (C:\UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-20249.png)]
Interrupt Get Lock Thread
lockInterruptibly() throws InterruptedException //Thread.interrupt()
Attempt to acquire lock&acquire lock timeout
boolean tryLock :Attempt to acquire a lock and get a successful return true,Otherwise return false,Don't wait boolean tryLock(time,unit),Get a successful return within a specified time true,Otherwise return false,wait for
Cases
package com.company; import java.util.concurrent.locks.ReentrantLock; public class MyTryLock { public ReentrantLock lock = new ReentrantLock(); public void serviceMethod(){ try { if (lock.tryLock()) { // if (lock.tryLock(3,TimeUnit.SECONDS)) { if (lock.tryLock(3, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + "Acquire Lock "); Thread.sleep(4000); } else { System.out.println(Thread.currentThread().getName() + "Lock not acquired"); } } catch (Exception e) { e.printStackTrace(); }finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } public static void main(String[] args) { //Synchronize resources MyTryLock myTryLock = new MyTryLock(); Runnable runnable = new Runnable() { @Override public void run() { myTryLock.serviceMethod(); } }; Thread aa = new Thread(runnable, "aa"); aa.start(); Thread bb = new Thread(runnable, "bb"); bb.start(); } }
Get wait notification, wait for queue instance Condition
Condition newCondition() replace object Inside wait and notify() More flexible use of wait for notification in multi-threaded scenarios.
ReentrantReadWriteLock Read-Write Lock
Reasons for Read-Write Locks: Make up for exclusive locks, company scenario: Read more and write less
Characteristic:
Fairness and Inequality
Supports lock demotion
Support reentrant
Support interruptible
Support condition
API aspects
readLock() | Get Read Lock Object |
---|---|
writeLock() | Get Write Lock Object |
int getReadLockCount() | Number of read locks |
getWriteHoldCount() | Number of write locks |
* Read-write exclusion * Write-write exclusion * Reading is not mutually exclusive
Principle of Read-Write Lock
Write lock: | By calculating the number of write locks until the state is locked, [if there is a lock] --> [if there is a read lock, the acquisition of a write lock fails] [if the write lock is occupied by other threads and the acquisition of a write lock fails], the maximum number of write locks is [MAX_COUNT, 65535], otherwise the lock is updated and the acquisition ends successfully. [No locks present] -->If a check lock is not acquired, check to see if there is a waiting queue or false. Then update the lock, if the update succeeds, the lock will be acquired successfully, otherwise the lock will fail. (Fair Lock checks whether the queue has a wait task, and unfair returns false directly.) Reflects: [Read and write mutually exclusive, write and write mutually exclusive] |
---|---|
Read lock: | Calculate the number of write locks: If other threads have write locks, acquiring them fails (read-write exclusion, the process in which the write lock is being written, we cannot read) by using a binary right operation to obtain the number of read locks. Acquire locks (wait queue, maximum number of write locks, cas update success, lock success) Return 1 indicates success, otherwise call fullTryAcquireShared(current) deadlock to get the final value (unlock success or unlock failure) |
state represents the number of read and write locks calculated
Read lock: return C >>>> SHARED_ SHIFT; Right operation
Write lock: return C & EXCLUSIVE_ MASK;
[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-jv7NH8MW-1621751537645) (C:\UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-2024300;png)]
Disadvantage: Causes thread hunger
Postmark Lock StampedLock
Thread Hunger? Read locks are so intensive that write locks are always unavailable that write lock threads cannot access them. Waiting.
Stamp: Pass stamp check; Whether the lock has been acquired or not. Checkmark
Read/Write + Optimistic Read
public long tryOptimisticRead() | Get an optimistic read lock and return a "stamp" version number |
---|---|
public boolean validate(long stamp) | Checkmark |
public long tryReadLock() | Acquire Read Lock |
public long tryUnlockWrite() | Get Write Lock |
Note: Reentrant is not supported
Note: Condition waiting for notification is not supported
Read and write locks convert to each other.
Postmark, Increase Concurrency
package com.company; import java.util.concurrent.locks.StampedLock; /** * Common postmark lock api */ public class StampedLockDemo { public static void main(String[] args) { StampedLock lock=new StampedLock(); long stamped = lock.tryOptimisticRead(); System.out.println("Optimistic Postmark "+stamped); long r = lock.tryReadLock(); System.out.println("Read Lock Version Number "+r); lock.unlockRead(r); long w = lock.tryWriteLock(); System.out.println("Write Lock Version Number "+w); lock.unlockWrite(w); boolean validate = lock.validate(stamped); if(validate){ System.out.println("Not modified by write lock"+stamped); }else{ System.out.println("Write Lock Acquisition"+stamped); } } }
Optimistic reading: cases
package com.company; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.StampedLock; /** * Calculate Sum and Resolve Thread Hunger */ public class StampedLockDemo { StampedLock lock=new StampedLock(); int a=10,b=10; public void sum(){ //Optimistic Read to Get Version Number long l = lock.tryOptimisticRead(); int a1=a; int b1=b; try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //Checkmark if(!lock.validate(l)){ //Pessimistic Reading long l1 = lock.readLock(); System.out.println("Getting a pessimistic read lock----->version number"+l1); a1=a; b1=b; System.out.println("Release Pessimistic Read Lock----->version number"+l1); lock.unlockRead(l1); } System.out.println("a1="+a1+"b1="+b1+"Calculated sum"+(a1+b1)); } public void udpateInt(int a,int b){ long w = lock.writeLock(); System.out.println("Write lock acquired"+w); try { this.a=a; this.b=b; } finally { System.out.println("Write lock released"+w); lock.unlockWrite(w); } } public static void main(String[] args) { StampedLockDemo demo=new StampedLockDemo(); new Thread(()->demo.sum()).start(); new Thread(()->demo.udpateInt(1,2)).start(); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } }
Condition Wait Notification Interthread Writing Tool
- Reasons for the emergence of new wait-for-notification objects (conditional queues) in JUC packages: rich api s, powerful extensions.
- Same thing: You must acquire the lock object.
- The difference: api (wait,notify,notifyAll) inside the object, await,signal,signalAll inside the condition
- Get one, you can get multiple conditional queues,
- Wait timeout is also supported.
Lock+Condition substitution: synchronized+Object
void await() | Wait, release lock resource | |
---|---|---|
boolean await(long time, TimeUnit unit) | Timeout Wait | |
void signal() | Random wake-up | |
void signalAll() | Wake Up All |
package com.company; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * Waiting for Notification Condition */ public class ConditionDemo { public static void main(String[] args) { ReentrantLock lock=new ReentrantLock(); Condition condition = lock.newCondition(); new Thread(()->{ lock.lock(); try { System.out.println(Thread.currentThread().getName()+"Start Waiting"); //Wait, release lock condition.await(); System.out.println(Thread.currentThread().getName()+"Release Waiting"); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } }).start(); new Thread(()->{ //Lock is required before lock.lock(); try { TimeUnit.SECONDS.sleep(3); } catch (Exception e) { e.printStackTrace(); } try { System.out.println(Thread.currentThread().getName()+"Start waking up"); //Wake up one at random condition.signal(); } catch (IllegalMonitorStateException e) { e.printStackTrace(); }finally { lock.unlock(); } }).start(); } }
Source level:
await(): LockSupport.park(this); Wait, suspend thread, [join conditional queue, release lock resource, check aqs blocking queue, suspend thread] finished waiting
siognal():LockSupport.unpark(node.thread); The core wake-up thread [joins the blocked queue, aqs queue, has the chance to get a lock], unpark wakes up.
[External chain picture transfer failed, source may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-tOsxbDVm-16211537646) (C:\UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-2028019.png)]
LockSupport Block/Wake Tool
1, no locks required, blocking and waking up directly.
2. The process of waking up and blocking can be in an interchangeable order. Avoid Deadlock [License 0,1]
Static methods do not need to get objects, use them directly
public static void park() | block |
---|---|
public static void unpark(Thread thread) | Release, wake up thread |
package com.company; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.LockSupport; public class LockSupportTest { public static void main(String[] args) { Thread t=new Thread(()->{ System.out.println("Threads Waiting Infinitely"); //No locks required LockSupport.park(); System.out.println("Wake Threads"); }); t.start(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } LockSupport.unpark(t); } }
Atomic class atomic
Multiple** Threads without locking, still thread safe, atomic update synchronization resources, different interrupts. ** java.util.concurrent.atomic
Based on CAS+unsafe+volatile
CAS: [Compare and Swap] Main Memory, Expected Value, Updated Value, 10==10? 20 instead of 10: no updates.
unsafe: manipulate c,c++ libraries, jni operations, send instructions through hardware resources, directly manipulate memory, atomic operations
volatile: Visibility.
api classification: four parts [basic type] [array type] [reference type] [field type] accumulator
Basic types | AtomicInteger,AtomicBoolean,AtomicLong |
---|---|
Array type | AtomicLongArray,AtomicIntegerArray,AtomicReferenceArray, |
reference type | AtomicReference,AtomicStampedReference,AtomicMarkableReference |
Field type | AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater, |
accumulator | LongAdder,DoubleAdder,LongAccumulator,DoubleAccumulator |
Atomic Integer
Reason for this: Atomic update operations are maintained in a multithreaded situation. No locks are required. Thread security, (i++, concurrency issues in multithreaded scenarios)
/** * Non-atomic operation (bulky) */ public class AtomicIntegerDemo { int i=0; public synchronized void sum(){ System.out.println(Thread.currentThread().getName()+" "+(i++)); } public static void main(String[] args) { AtomicIntegerDemo demo=new AtomicIntegerDemo(); for(int i=0;i<10;i++){ new Thread(()->demo.sum()).start(); } } }
package com.company; import java.util.concurrent.atomic.AtomicInteger; /** * CAS-based unlocked operation */ public class AtomicIntegerDemo { AtomicInteger atomicInteger=new AtomicInteger(); public void sum(){ System.out.println(Thread.currentThread().getName()+" "+(atomicInteger.getAndIncrement())); } public static void main(String[] args) { AtomicIntegerDemo demo=new AtomicIntegerDemo(); for(int i=0;i<10;i++){ new Thread(()->demo.sum()).start(); } } }
API
public final int get() | Get the latest value |
---|---|
public final int getAndSet(int newValue) | Get the old value and set it to the new value |
public final boolean compareAndSet(int expect, int update) | expect==current value? Replace with update: Do nothing |
public final int getAndIncrement() | Get old value, increase by 1 |
public final int getAndDecrement() | Get the old value minus 1 |
public final int getAndAdd(int delta) | Wait until the old address, add up delta |
public final int addAndGet(int delta) | Get a new value, add up delta |
package com.company; import java.util.concurrent.atomic.AtomicInteger; /** * api cas AtomicInteger */ public class AtomicIntegerDemo { public static void main(String[] args) { AtomicInteger atomicInteger=new AtomicInteger(); int andIncrement = atomicInteger.getAndIncrement(); System.out.println("andIncrement="+andIncrement); System.out.println("get="+atomicInteger.get()); int andAdd = atomicInteger.getAndAdd(3); System.out.println("andAdd="+andAdd); System.out.println("get="+atomicInteger.get()); int andDecrement = atomicInteger.getAndDecrement(); System.out.println("getAndDecrement"+andDecrement); System.out.println("get="+atomicInteger.get()); boolean b = atomicInteger.compareAndSet(7, 6); System.out.println("Compare Replacement"+b); System.out.println("get="+atomicInteger.get()); } }
Atomic Update Array
Update: Add a corner field
public final long getAndSet(int i, long newValue) | Returns the old value and updates it with the corner label |
---|---|
public final boolean compareAndSet(int i, long expect,long update) | Returns a Boolean value, Corner Scale: Expected value, Updated value (Expected value==Current value? Updated value: Do nothing) |
public final long getAndIncrement(int i) | Return the old value by adding one to the corner label |
public final long getAndDecrement(int i) | Returns the old value by subtracting one from the corner label |
Atomic Update Reference Type
public final boolean compareAndSet(V expect, V update) | Return the old value and replace the comparison with the new value |
---|---|
public final V getAndSet(V newValue) | Return old value, set to new value |
package com.company; import java.util.concurrent.atomic.AtomicReference; public class AtomicReferenceDemo { public static void main(String[] args) { AtomicReference reference=new AtomicReference("Jay Chou"); boolean b = reference.compareAndSet("Jay Chou", "Hannah"); System.out.println("b= "+b); System.out.println(reference.get()); Object fws = reference.getAndSet("Fang Wen Shan"); System.out.println(fws); System.out.println(reference.get()); } }
Atomic Update Field Type - AtomicIntegerFieldUpdater
public abstract boolean compareAndSet(T obj, int expect, int update) | |
---|---|
public int getAndAdd(T obj, int delta) |
Must:
Must be volatile modifier,
Must be int
Cannot static
Cannot be private
package com.company; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; public class AtomicIntegerFieldUpdaterDemo { public static void main(String[] args) { Persion persion=new Persion(); AtomicIntegerFieldUpdater f = AtomicIntegerFieldUpdater.newUpdater(Persion.class,"i"); boolean b = f.compareAndSet(persion, persion.i, 2); System.out.println(b); System.out.println(f.get(persion)); int andAdd = f.getAndAdd(persion, 4); System.out.println(andAdd); System.out.println(f.get(persion)); } }
Non-spinning CAS unsafe unSafe class
CAS: [Comparison and Exchange [ compareAndSwap] Main memory value, expected value, value to be updated, 10==10?20 Replace 10: Do not update. unSafe: Unsafe class, dangerous class, not recommended for use, can send instructions to operate hardware resources. c,c++Library. Operational memory
[External chain picture transfer failed, source may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-XcLmKHET-1621751537647) (C:UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-2027575.png)]
Problem: Constant spinning wastes cpu performance and is recommended for non-massive concurrency. No need to switch Online
ABA Question A-B--A
Solve ABA Problem-AtomicStampedReference
API
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) | Expected reference, new value, stamp, new stamp |
---|---|
public V getReference() | Get the current object application |
Both Atomic MarkableReference and AtomicStampedReference can solve ABA problems.
Inborn problem CAS
package com.company; import java.util.concurrent.atomic.AtomicStampedReference; public class AtomicStampedReferenceDemo { public static void main(String[] args) { AtomicStampedReference atomicStampedReference=new AtomicStampedReference("Jay Chou",1); boolean b = atomicStampedReference.compareAndSet("Jay Chou", "Zhou Yunfa", 1, 2); System.out.println(b); Object reference = atomicStampedReference.getReference(); System.out.println(reference); int stamp = atomicStampedReference.getStamp(); System.out.println(stamp); if(stamp!=2){ return; } } }
LongAdder-High Concurrent Atomic Accumulator
AtomicLong Why there are LongAdder ? AtomicLong Cas Operation failed, spin, waste of energy private volatile int value; First Hand, LongAdder cells[value] Array, which shares the pressure on competing lock resources and is divided into several parts. Cumulative. [No multiple threads competing for one bvaseValue. Use if multi-threaded cells[]Array) Construction method: accumulate from 0 by default
API
public void increment() | add one-tenth |
---|---|
public long sum() | Summation |
package com.company; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; public class LongAdderDemoo { public static void main(String[] args) { LongAdder l=new LongAdder(); for (int i=0;i<10000;i++){ new Thread(()->l.increment()).start(); } try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Summation"+l.sum()); } }
Concurrent Collection-CopyOnWriteArrayList
ArrayList : elementData[size++] = e; Thread insecurity caused data loss.
Solution:
Vector: All methods are synchronized. Performance degradation. Threads hungry, a lock
Collections. SynchronizedList(list);/ Synchronized transforms an unsafe collection into a secure collection. Performance degradation. Threads hungry, a lock
CopyOnWriteArrayList: Copy a new array when writing (add, delete, change) to ensure the security of the collection, (can be used when the amount of data is small, a waste of memory space).
API
addIfAbsent(E e) | If there are elements, do not add them, have the function of weighting |
---|---|
public List subList(int fromIndex, int toIndex) | Intercept elements within a specified range (including header, not tail) |
package com.company; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; public class CopyOnWriteArrayListDemo { public static void main(String[] args) { ArrayList list=new ArrayList(); list.add("Jay Chou"); Vector vector=new Vector(); vector.add(""); // vector.get(9); //Turn an unsafe collection into a safe collection Collections.synchronizedList(list); //A secure collection, adding, deleting, and altering replicated collections CopyOnWriteArrayList clist=new CopyOnWriteArrayList(); clist.add("Jay Chou"); clist.addIfAbsent("Chapter 5"); clist.add("Cai Yilin"); clist.addIfAbsent("Jay Chou"); ListIterator listIterator = clist.listIterator(); while (listIterator.hasNext()){ Object next = listIterator.next(); System.out.println(next); } //Truncate partial values [including header, not tail] List list1 = clist.subList(0, 1); System.out.println(list.get(0)); } }
principle
Lock, copy the array, set the new array pointing.
public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; //Copy new elements to a new array Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; //Point to New Array setArray(newElements); return true; } finally { lock.unlock(); } }
Concurrent Collection-CopyOnWriteArraySet
hashSet: Thread insecurity: bottom used HashMap
Collections. Synchronized Set (hashSet); Locking, poor performance.
CopyOnWriteArraySet : CopyOnWriteArrayList.addIfAbsent()api,No more data can be added to the container.
Concurrent Collection-ConcurrentHashMap
hashMap Why can't I use it in a multi-threaded situation?: Concurrent modification of exceptions, expansion leads to closed-loop, deadlock, put Data Loss [Multi-threading scenario]-Thread Pool)
Series not recommended Hashtable hashtable=new Hashtable(); Collections.synchronizedMap(new HashMap<>());
//Unsafe concurrent modification exception HashMap hashMap=new HashMap(); for (int i=0;i<1000;i++){ int finalI = i; new Thread(()->{ hashMap.put(Thread.currentThread().getName()+"-->"+ finalI, finalI); System.out.println(hashMap); }).start(); }
//Thread Security ConcurrentHashMap hash=new ConcurrentHashMap(); for (int i=0;i<1000;i++){ int finalI = i; new Thread(()->{ hash.put(Thread.currentThread().getName()+"-->"+ finalI, finalI); System.out.println(hash); }).start(); }
Avoid expansion issues?
Number of expansion: int SC = n - (n >>> 2);
Do not use as an array
/** map Maximum capacity */ private static final int MAXIMUM_CAPACITY = 1 << 30; /** Default capacity */ private static final int DEFAULT_CAPACITY = 16; /** * Number of default concurrency level locks */ private static final int DEFAULT_CONCURRENCY_LEVEL = 16; /** * Load factor: calculate hash value to reduce hash collisions */ private static final float LOAD_FACTOR = 0.75f; /** * More than or equal to 8-chain list structure converted to red and black numbers */ static final int TREEIFY_THRESHOLD = 8; /** * Converting structures with red and black numbers less than 6 to a chain table */ static final int UNTREEIFY_THRESHOLD = 6;
Capacity Expansion Quantity Calculation
// put 30 elements is not an array [30] ConcurrentHashMap cHashMap=new ConcurrentHashMap(40); cHashMap.put("zjl",48); int sc = 16 - (16 >>> 2); System.out.println(sc); double i=30/0.75; System.out.println(i);
When map simple principle put?
Version 1.7: Segment Lock seGement[]Lock+hashEntry Internal Chained Table Bacon (Reentrantlock+Entity+Chain List)
[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-feaCpAbt-16211537648) (C:\UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-2021625252525252.png)]
1.8 CAS+node+red-black tree
ConcurrentHashMap principles:
synchronized: Performance is not currently optimized very poorly. That's great Node Node: Packaged data, part of a chain table CAS Spin operation: Initialize add first node use Red-Black Tree, Chain List: Less than eight are chains, and more than eight are converted to red-black trees. Red-Black Tree: Good performance can reduce io Number of times to improve traversal efficiency
put: calculates the hash value, initializes the table, waits for the expansion value, and adds the reason for the Cas operation to the node array if it is empty. Otherwise, if you want to see if you are expanding, instead of expanding, add a node node (chain table interface, red-black tree) and convert it to a red-black tree if the number is more than or equal to eight. Add count value.
Chat and Queue
Queue Classification: Blocked Queue + Non-Blocked Queue
Queue characteristics: FIFO, FIFO. [Priority Queue]
Blocking queue: See if the queue capacity is [empty, full] blocking the current thread waiting. Until the queue is not empty or the data is not satisfied, continue to operate the container and the thread wakes up.
Non-blocking queue: The capacity of the queue is wireless. Integer.MaxValue(); Based on CAS, non-blocking algorithm, concurrency capability is strong, data does not need to wait.
Blocked Queues: ReentrantLock [Pessimistic Policy], Non-blocked Queues [Optimistic Lock Mechanism for CAS Conflict Monitoring]
Blocked Queue &Nonblocked Matters Scenario: [Producer Consumer], [Thread Pool]
Non-blocking queue API:
ConcurrentLinkedQueue Non-blocking queue CAS No lock, unlimited capacity
Blocking queue API:
Blocking+Non-blocking api
ArrayBlockingQueue | Bounded queue based on array + specified capacity |
---|---|
DelayQueue | Delayed queue, automatically executes fetch element at time |
LinkedBlockingQueue | Bounded Queue Based on Chain List+Specified Capacity |
PriorityBlockingQueue | Priority queue, which can be sorted when fetching data |
SynchronousQueue | Synchronized Queue |
Non-blocking Queue-ConcurrentLinkedQueue
Non-blocking queues are based on CAS Concurrency beyond blocked queue, insert to end of data, get from scratch, thread safe
boolean add(E e) | Add element, cannot be null, return Boolean value |
---|---|
boolean offer(E e) | Add element, cannot be null, return Boolean value |
E peek() | Get elements: peep, look, don't delete elements, cut back to the first element |
E poll() | Get Elements: Bright, Look, Delete Elements, Cut Back to First Element |
package com.company; import java.util.concurrent.ConcurrentLinkedQueue; public class ConcurrentLinkedQueueDemo { public static void main(String[] args) { ConcurrentLinkedQueue queue=new ConcurrentLinkedQueue(); boolean Jay Chou = queue.add("Jay Chou"); System.out.println(Jay Chou); boolean Wang Lihong = queue.offer("Wang Lihong"); System.out.println(Wang Lihong); Object peek = queue.peek(); System.out.println(peek); //Element deleted Object poll = queue.poll(); System.out.println(poll); //Inefficient int size = queue.size(); System.out.println(size); //Efficient boolean empty = queue.isEmpty(); System.out.println(empty); } }
Principle: offer adds elements
node.next--->Next node Node Chain Table Structure private transient volatile Node<E> head; private transient volatile Node<E> tail; UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val) CAS operation+Dead cycle
Array Blocking Queue
Provides a way to block functions: take (), pull (), the effect of flow first.
Interface BlockingQueue
Two types of methods: blocking and non-blocking
Construct method: public ArrayBlockingQueue(int capacity): fair lock unfair lock + capacity + initialization array [must write a capacity]
All queues cannot add empty elements
boolean add(E e) | Add elements and return results | true,false |
---|---|---|
boolean offer(E e) | Add elements and return results | true,false |
void put(E e) | Add Elements, Block | |
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException | Add element, timeout blocking |
E peek() | Get elements: peep, look, don't delete elements, cut back to the first element | |
---|---|---|
poll() | Delete the first element without blocking | |
public E poll(long timeout, TimeUnit unit) throws InterruptedException | Delete the first element without blocking (time-out) | |
public E take() throws InterruptedException | Delete first element, will block |
package com.company; import java.util.concurrent.ArrayBlockingQueue; public class ArrayBlockingQueueDemo { public static void main(String[] args) { ArrayBlockingQueue arrayBlockingQueue=new ArrayBlockingQueue(1); boolean add = arrayBlockingQueue.add(3); System.out.println(add); boolean offer = arrayBlockingQueue.offer(4); System.out.println(offer); int size = arrayBlockingQueue.size(); System.out.println(size); try { arrayBlockingQueue.put("32"); System.out.println("32"); } catch (InterruptedException e) { e.printStackTrace(); } Object peek = arrayBlockingQueue.peek(); System.out.println(peek); try { Object take = arrayBlockingQueue.take(); System.out.println("Printable logs"+take); Object take1 = arrayBlockingQueue.take(); System.out.println("Unprintable logs"); } catch (InterruptedException e) { e.printStackTrace(); } } }
Array structure
/* Element Storage */ final Object[] items; /** Number of data elements */ int count; /** A reentrant lock */ final ReentrantLock lock; /** Condition Conditional Queue */ private final Condition notEmpty; /** Condition Conditional Queue */ private final Condition notFull;
Blocking principle:
pull();notFull.await(); Blocking the current thread, waiting for data to be fetched take()Wake up after data is fetched await()Threads ( signal Random wake-up) take(); notEmpty.await(); Block the current thread, add data, and wake up. ( signal Random wake-up)
Chain List Blocking Queue-LinkedBlockingQueue
Be based on Node Node: item,next, Chain list structure, MAX_VALUE,Memory leak, size of specified capacity, element cannot be empty otherwise null pointer exception, 2 locks, lock separation, higher throughput
package com.company; import java.util.concurrent.LinkedBlockingQueue; public class LinkedBlockingQueueDemo { public static void main(String[] args) { LinkedBlockingQueue queue=new LinkedBlockingQueue(2); queue.offer("Jay Chou"); queue.add("Hannah"); System.out.println(queue); queue.peek(); queue.remove("Jay Chou"); try { queue.put("Fang Wen Shan"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(queue); } }
/** * Entity Object */ static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } } /** capacity */ private final int capacity; /** Number Node */ private final AtomicInteger count = new AtomicInteger(); /** * Header of Chain List */ transient Node<E> head; /** * Tail of linked list. * Tail of chain list */ private transient Node<E> last; /** AQS Blocking Queue First Element */ private final ReentrantLock takeLock = new ReentrantLock(); /** Conditional Queue (first element) */ private final Condition notEmpty = takeLock.newCondition(); /** AQS Blocking Queue Add Queue Lock */ private final ReentrantLock putLock = new ReentrantLock(); /** Conditional Queue (add element) */ private final Condition notFull = putLock.newCondition();
Principle:
put: When the queue is full, it will notFull.await();Blocking threads must wait for other threads to wake up blocking threads. Add Queue-- lastnode Point to New node Node, and modify count Value. take:When the queue is empty: it will notEmpty.await(); Blocking threads must wait for other threads to wake up blocking threads. first.item = null; Modify to null.
ArrayBlockingQueue&LinkedBlockingQueue Difference
Same:
ReentrantLock locks AQS blocking queues.
Condition: Conditional queue wakes up waiting.
Differences:
The former is one lock and the latter uses two locks. Read-write separation.
Data structure: an array and a chain table, which occupy a lot of memory.
Capacity part: can execute size, which does not specify that it is actually an unbound queue (MAX_VALUE)
Priority Queue - PriorityBlockingQueue
What's called a priority queue: We customize the comparator to specify the order of elements, such as vip from the bank handling business and queuing for dinner.
Storage Features: Binary Heap + Array + CAS, Unbounded Queue [default is 11 sizes, expandable to unbound queue MAX_VALUE] [sorted to the head of the queue]
Binary heap: [Maximum heap, minimum heap]
[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-5v4eeIpJ-1621751537649) (C:UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-2024.png)]
API
put inserts the specified element into this priority queue. Since the queue is unrestricted, this method will never be blocked. |
---|
package com.company; import java.util.concurrent.PriorityBlockingQueue; public class PriorityBlockingQueueDemo { static class Persion implements Comparable<Persion>{ int i; String name; public Persion(int i, String name) { this.i = i; this.name = name; } public int getI() { return i; } public void setI(int i) { this.i = i; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public int compareTo(Persion o) { return this.getI()>o.getI()?-1:1; } @Override public String toString() { return "Persion{" + "i=" + i + ", name='" + name + '\'' + '}'; } } public static void main(String[] args) { PriorityBlockingQueue queue=new PriorityBlockingQueue(); queue.offer(new Persion(10,"Jay Chou")); queue.offer(new Persion(4,"Hannah")); queue.offer(new Persion(7,"Fang Wen Shan")); queue.offer(new Persion(2,"Wang Lihong")); queue.offer(new Persion(9,"Lin Junjie")); queue.offer(new Persion(5,"Xue Zhiqian")); try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.err.println(queue); } }
Principle:
offer: After acquiring the lock, release the lock if expansion is required and based on CAS Operations to expand, otherwise another thread that did not succeed in the race will yield CPU Time slice. Assign a new array and execute a new reference queue Object. Add elements, sort. Wake the thread at random again. Conditional Queue (wakes up every time)
Delay Queue
What is a delayed queue: elements must expire before they can be removed peek(),poll(),take(), Element implementation java.util.concurrent.Delayed ,long getDelay(TimeUnit unit) Returns the remaining time, if less than or equal to 0, indicating expiration. Ordered, PriorityQueue (Array structure)
Countdown, cancel order
Case:
package com.company; import java.sql.Time; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class DelayQueueDemo { static class DelayedDemo implements Delayed{ int i; String name; //Expiration Time long exprie; public DelayedDemo(int i, String name, long exprie) { this.i = i; this.name = name; this.exprie = exprie+System.currentTimeMillis(); } /** * Expiration calculation convert<=0 indicates expiration * @param unit * @return */ @Override public long getDelay(TimeUnit unit) { long convert = unit.convert(exprie - System.currentTimeMillis(), TimeUnit.MILLISECONDS); System.out.println(convert); return convert; } @Override public int compareTo(Delayed o) { return this.getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS)>=0?1:-1; } @Override public String toString() { return "DelayedDemo{" + "i=" + i + ", name='" + name + '\'' + ", exprie=" + exprie + '}'; } } public static void main(String[] args) { DelayQueue queue=new DelayQueue(); queue.offer(new DelayedDemo(1,"Jay Chou",111)); queue.offer(new DelayedDemo(2,"Hannah",10022)); System.out.println(queue); while (true){ try { Delayed take = queue.take(); System.out.println(take); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Principle:
offer(); After adding the element, and then heading, reset the thread null,Random wake-up threads. take(); Dead cycle+available.await(); Check to see if the queue is empty, wait if it is empty, or get the timeout. If less than or equal to 0, the element pops up. Otherwise, check to see if someone is waiting to get the first element, and if someone is waiting, sleep and wait. Otherwise, wait for a fixed amount of time and wake up automatically. Go Again for Traverse through the first element to get the element.
Synchronous Queue
Synchronization Queue: A thread that wants to retrieve data is blocked until a thread adds data to the queue. So we can take it out. block
Features: Blocked, empty set, api not available, can not determine whether empty isEmpty() true, iterator is always empty, invalid. peek();
Thread pool: ExecutorService executorService = Executors.newCachedThreadPool();
Case:
package com.company; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; /** * The synchronous queue put take blocking method cannot be offer,peek */ public class SynchronousQueueDemo { public static void main(String[] args) { SynchronousQueue queue=new SynchronousQueue(); new Thread(){ @Override public void run() { super.run(); try { /*** * Block the child thread queue.put("Jay Chou"); */ queue.put("Jay Chou"); System.out.println("Ouch good!"+queue); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); try { Thread.sleep(3000); System.out.println("I'm up~~~~~"); } catch (InterruptedException e) { e.printStackTrace(); } try { Object take = queue.take(); System.out.println("Ha ha ha="+take); } catch (InterruptedException e) { e.printStackTrace(); } } }
Chat Thread Pool-Executor
Not recommended: new Thread().start();
1. Waste server resources, create and destroy wasted resources
2. The object cannot be destroyed immediately. A large number of objects, GC
3, what cycle of a thread is longer, it is not good to manage the state of the thread, interrupt..
4. Periodic, scheduling functions are not supported. Threads compete for resources easily
[Resource Destroy] [Poor Management]
** What is a thread pool: ** Maintain a batch of new threads (). Start() into a pool or queue collection. When a task is committed, a thread is taken out and used directly. It is fast and returns the thread to the thread pool after the task is completed. Or the idle thread is destroyed.
**Benefits: ** Threads do not need to be created and destroyed frequently. Reduce waste of resources.
Easy to manage the state of threads.
Support periodicity and increase speed accordingly. Reduce competition for resources,
Easy to control the number of threads
Briefly introduce the classification of thread pools:
Executor | void execute(Runnable command) top-level interface, submit tasks |
---|---|
ExecutorService | Extended Executor interface, provided submit, api |
ForkJoinPool | Job theft, intensive replenishment of previous thread pools |
ThreadPoolExecutor | Use when customizing thread pools |
Executors | There are static methods that return thread pool objects |
Four built-in thread pools for JDK
//Fixed size thread pool
package com.company; import java.util.concurrent.*; public class ExecurotrsDemo { public static void main(String[] args) { //Fixed-size thread pool 3 submit submit task executor (new Runable) oom memory overflow ExecutorService es= Executors.newFixedThreadPool(3); for (int i=0;i<5;i++){ es.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }); es.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }); Future<Object> submit = es.submit(new Callable<Object>() { @Override public Object call() throws Exception { System.out.println(Thread.currentThread().getName()); return "General Manager"; } }); try { Object o = submit.get(); System.out.println(o); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } }
package com.company; import java.util.concurrent.*; public class ExecurotrsDemo { public static void main(String[] args) { //A single thread pool has only one thread. Threads that exit abnormally are automatically created. Role of Synchronization [Test Environment Usage] Risk of oom memory overflow ExecutorService es= Executors.newSingleThreadExecutor(); for (int i=0;i<5;i++){ es.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }); } } }
package com.company; import java.util.concurrent.*; public class ExecurotrsDemo { public static void main(String[] args) { //Keep creating thread reshaping max, oom [unlimited number of threads], threads that are not in use for 60 seconds will be terminated and deleted from the cache [Keep creating threads] ExecutorService es= Executors.newCachedThreadPool(); for (int i=0;i<45;i++){ es.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }); } } }
package com.company; import java.util.concurrent.*; public class ExecurotrsDemo { public static void main(String[] args) { //The periodic thread pool of the int corePoolSize core thread oom ScheduledExecutorService es= Executors.newScheduledThreadPool(5); for (int i=0;i<2;i++){ //Execute after 3 seconds for (int i2=0;i2<45;i2++){ es.schedule(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } },3, TimeUnit.SECONDS); //The first time is after 4 seconds, then every three seconds es.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } },4,3, TimeUnit.SECONDS); } } }
Custom Thread Pool
Why use a custom thread pool: four built-in thread pools [fixed-size thread pool newFixedThreadPool(int nThreads)] [single-threaded thread pool xecutorService newSingleThreadExecutor()] [cacheable thread pool ExecutorService newCachedThreadPool()] [periodic ScheduledExecutorService newScheduledThreadPool(int corePoolSize)] Causes oom. Required Business Custom Thread Pool
1, Queue size
2, such as whether to run a discarded task.
ThreadPoolExecutor
package com.company; import java.sql.Time; import java.util.concurrent.*; public class ExecurotrsDemo { static class ThreadFactoryDemo implements ThreadFactory{ @Override public Thread newThread(Runnable r) { Thread t= new Thread(r,"Jay Chou's Customized Threads"); return t; } } public static void main(String[] args) { // int corePoolSize, Number of banks in the core thread pool // int maximumPoolSize, maximum thread pool // long keepAliveTime, 60 idle time: non-core thread // TimeUnit unit, time unit seconds, milliseconds, days // BlockingQueue <Runnable> workQueue, blocking queue [ArrayBlockingQueue] [LinkedBlockingQueue] Specifies queue size // ThreadFactory threadFactory, Thread Project, Execution Thread Name // RejectedExecutionHandler handler Rejection Policy [Throw Exception, Throw Task] [Throw Exception, Don't Throw Task] [Throw Oldest Task, New Task Add Queue] [Task Main Thread Runs] RejectedExecutionHandler handler=new ThreadPoolExecutor.DiscardOldestPolicy(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(20), new ThreadFactoryDemo(), handler); //A thread was created, with 3 core threads, 10 maximum threads, and 20 queues. The rejection policy is to discard old tasks and add them to new queues. threadPoolExecutor.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"===========Print Jay Chou is a custom thread"); } }); //Configure Thread Pool [io-intensive] [cpu-intensive] } }
Thread pool added process
// int corePoolSize, Number of banks in the core thread pool // int maximumPoolSize, maximum thread pool // long keepAliveTime, 60 idle time: non-core thread // TimeUnit unit, time unit seconds, milliseconds, days // BlockingQueue <Runnable> workQueue, blocking queue [ArrayBlockingQueue] [LinkedBlockingQueue] Specifies queue size // ThreadFactory threadFactory, Thread Factory, Custom Thread Name // RejectedExecutionHandler handler Rejection Policy [Throw Exception, Throw Task] [Throw Exception, Don't Throw Task] [Throw Oldest Task, New Task Add Queue] [Task Main Thread Runs]
Add Task: First determine if the number of core threads is full, and then hand it over directly to the core thread to perform the task, otherwise it will put the task in the queue and wait for the core thread to read.
If the core + queue is full. A non-core thread is created to process the task. Rejection policy is enabled until the maximum thread pool is full and the core + queue + maximum number of threads are full.
If there are no tasks beyond idle time, keep AliveTime deletes non-core threads.
Thread pool rejection four major rejection policies
Rejection Policy
ThreadPoolExecutor.AbortPolicy
ThreadPoolExecutor.DiscardPolicy
[Discard old tasks, add queues to new tasks] ThreadPoolExecutor.DiscardOldestPolicy
[Tasks run by the main thread (caller)] ThreadPoolExecutor.CallerRunsPolicy
//ThreadPoolExecutor.AbortPolicy Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@5cad8086 rejected from java.util.concurrent.ThreadPoolExecutor@6e0be858[Running, pool size = 10, active threads = 10, queued tasks = 20, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) at com.company.ExecurotrsDemo.main(ExecurotrsDemo.java:42)
[Task Main Thread Run) ThreadPoolExecutor.CallerRunsPolicy Jay Chou's Customized Threads===========Print Jay Chou is a custom thread main===========Print Jay Chou is a custom thread
FutureTask&Future
Provides the ability to cancel task results and block the main thread from getting results. [Feature: Cancel Task + Block for Callable Return Value]
boolean cancel(boolean mayInterruptIfRunning) | Attempt to cancel this task. |
---|---|
V get() throws InterruptedException, ExecutionException | Wait for the calculation to complete and then retrieve the results. [Blocked Function] |
boolean isCancelled() | true`If this task is cancelled before it is completed |
boolean isDone() | true if this task is completed |
Future
package com.company; import java.util.concurrent.*; public class FutureDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(3); Future<String> submit = executorService.submit(new Callable<String>() { @Override public String call() throws Exception { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Ouch good oh~~~~~"); return "Jay Chou"; } }); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } // submit.cancel(false); try { System.out.println("Is it cancelled"+submit.isCancelled()); System.out.println("Is the task completed"+submit.isDone()); if(!submit.isCancelled()){ String s = submit.get(); System.out.println(s); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
package com.company; import java.util.concurrent.*; public class FutureDemo { public static void main(String[] args) { //Features: Cancel Task + Block to get Callable return value ExecutorService executorService = Executors.newFixedThreadPool(3); FutureTask task=new FutureTask(new Callable<String>() { @Override public String call() throws Exception { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Ouch good oh~~~~~"); return "Jay Chou"; } }); executorService.submit(task); try { Object o = task.get(); System.out.println(o); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
ForkJoinPool&Fork/Join Framework
1, jdk 1.7 ForkJoinPool handles CPU-intensive tasks. Replace [supplemented] threadpoolexecutor [Fork/Join+ForkJoinPool]
CPU intensive: operations, ternary, plus minus, multiply and divide. if switch is not an IO operation [look up database io]
A simple understanding of ForkJoinPool:
Thread pool, idea of work stealing or dividing and conquering: full interest CPU Multi-core performance
[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-Ze0gq2YE-1621751537651) (C:\UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-queue 9.png)]
Where can Fork/Join be used?
ForkJoinTask class: +ForkJoinPool fast sum
Split a large task into smaller tasks. Finally, the sum yields the total number [Split Small Tasks + Iterative Recursive Thought]
RecursiveTask Processing tasks with return values are implemented ForkjoinTask RecursiveAction Processing task has no return value and is implemented ForkjoinTask
fork() :Submit Task to Queue.Execute. join() : Merge, return result set
package com.company; import com.sun.javafx.image.IntPixelGetter; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; /** * ForkJoinTask+ForkjoinPool Calculate one and split by [CPU intensive] */ public class ForkJoinPoolTest extends RecursiveTask<Integer> { //Minimum Split Quantity int min=3; int start; int end; public ForkJoinPoolTest(int start, int end) { this.start = start; this.end = end; } public static void main(String[] args) { //Runtime.getRuntime().availableProcessors()) resembles core threads ForkJoinPool pool=new ForkJoinPool(); ForkJoinTask<Integer> submit = pool.submit(new ForkJoinPoolTest(1,1000000)); Integer integer = null; try { integer = submit.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println(integer); } //Split the task and return a sum @Override protected Integer compute() { //No splitting [range too small] minus, less than 31,3-1=2<=3,8,10=2 if(end-start<=min){ int sum=0; for(int i=start;i<=end;i++){ sum+=i; } return sum; //split }else{ int mid=(end+start)/2; ForkJoinPoolTest test1=new ForkJoinPoolTest(start,mid); // 0,5 ForkJoinPoolTest test2=new ForkJoinPoolTest(mid+1,end);//6,10 //split test1.fork(); test2.fork(); //Get small task results Integer join1 = test1.join(); Integer join2 = test2.join(); int sum= join1+join2; System.out.println(Thread.currentThread().getName()+"====="+sum); //Returns the sum return sum; } } }
CountDownLatch-Synchronization Counter
What is a synchronization counter: Let one thread wait (block/wait wait.await(), and the other wake the waiting thread after each thread has completed its own task.
int i=10; --i 0;
Scenario: First step check, multithreading turned on, second batch repository.
void await() | Meaning of blocking waiting |
---|---|
public void countDown() | Similar i- |
Initial value: countDown will be reduced by 1. Until it becomes 0. awaken
package com.company; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountdownLatchTest { public static void main(String[] args) { //Number of count threads for construction method CountDownLatch latch=new CountDownLatch(10); ExecutorService executorService = Executors.newFixedThreadPool(10); //First step verification for(int i=0;i<10;i++) executorService.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"Start Task"); //Reduce 1 latch.countDown(); } }); try { //Bulk repositories cannot be synchronized in one step such as join System.out.println("Start Thread Waiting"); //Condition await(); After 0, wake up automatically latch.await(); System.out.println("Thread is awakened"); } catch (InterruptedException e) { e.printStackTrace(); } } }
Integrated AQS state=10 quantity + CAS dead cycle
Semaphore-semaphore
What is a semaphore: the lobby of a bank. Number of parking spaces: [Role of flow restriction]
Scenario: The number of database connection pools is not exhausted.
acquire() | Acquire a license, reduce the number of licenses available by one if available and return immediately |
---|---|
void release() | Issuing licenses increases the number of licenses available by one. |
package com.company; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; public class SemaphoreTest { public static void main(String[] args) { //Only 10 threads can execute simultaneously Semaphore semaphore=new Semaphore(10); ExecutorService executorService = Executors.newFixedThreadPool(21); //First step verification for(int i=0;i<21;i++) executorService.submit(new Runnable() { @Override public void run() { try { //Reduce 1 semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"In the bank lobby"); TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println(Thread.currentThread().getName()+"Out of the bank lobby"); //Add 1 out semaphore.release(); } } }); } }
LockSupport.park(this);Blocking method, LockSupport.unpark(s.thread); One is minus 1, one plus 1,
CyclicBarrier-Cyclic Barrier
What is a circular fence: Multiple threads have one wave of operations. Then when the operation is complete. You can start an event (thread Runable) and return an operation (listening for events)
public int await() throws InterruptedException Wait, minus 1
Scenario: Sum: Last thread operation listens for events
package com.company; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest { public static void main(String[] args) { class CycLicDemo extends Thread{ CyclicBarrier cyclicBarrier; public CycLicDemo(CyclicBarrier cyclicBarrier) { this.cyclicBarrier=cyclicBarrier; } @Override public void run() { super.run(); try { cyclicBarrier.await(); cyclicBarrier.await(); cyclicBarrier.await(); System.out.println(Thread.currentThread().getName()+"I wake up"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } CyclicBarrier cyclicBarrier=new CyclicBarrier(3, new Runnable() { @Override public void run() { System.out.println("completion of enforcement"); } }); for (int i=0;i<3;i++){ new CycLicDemo(cyclicBarrier).start(); } } }
Principle: await();ReentrantLock,Condition.Wake up all threads. Subtract, wake up all threads if equal to 0, but before that, call run Method. trip.await();