JUCxu Learning Arrangement

JUC Knowledge Point-Complete Set

What is juc?

java.util.concurrent
java.util.concurrent.locksLock-related API s
java.util.concurrent.atomicAtomic operation api
Thread Safe and Flexible

JUC classification summary?

lockReentrantLock, ReadWriteLock, StampedLock, Wait for Condition notification, Block wake-up LockSupport
AtomsBasic Type, Array Type, Reference Type, Field Type [AtomicLong] [AtomicLongArray] [AtomicReference] [AtomicReference]
Blocking QueueNon-blocking queue ConcurrentLinkedDeque, blocking queue [BlockingDeque, AbstractQueue], ArrayBlockingQueue, ConcurrentLinkedQueue, synchronous queue, delayed queue, priority PriorityQueue
Thread Pool[Single, Cacheable, Fixed Size, Cycle], Custom Thread Pool, [forkjoin poll] Divide and conquer, Work Stealing, future callable
Concurrent CollectionCopyOnWriteArrayList,CopyOnWriteArraySet,ConcurrentHashMap
Concurrent Tool ClassSemaphore, Current Limiting Signal, Starter Gun: CountDownLatch, Cyclic Barrier, ThreadLocalRandom

Three Core Ideas of JUC

AQSAbstractQueuedSynchronizer Abstract Synchronizer juc Lock Core Ideas
CASCompareAndSwap compares and replaces Atomic updates without locking, does not require cpu context switching, and wastes cpu performance
volatileVisibility, main memory data, consistent with the number of internal copies of threads

Lock

[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-iBWRYdfE-1621751537636) (C:\UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-2021110.png)]

Pessimistic Lock: When fetching data, worry about data being modified by other threads, so pessimistic add a lock, pessimistic lock. All-in-one approach to reduce performance. Thread security. ReentrantLock
 Optimistic lock: When fetching data, do not worry about the data being modified by other threads, so do not lock, wait for updates to check if the data is modified. If the check data is modified, the update fails. Don't switch contexts, waste cpu. [CAS]

Shared lock: Read lock, multiple threads can hold thread locks. Can be considered unlocked
 Exclusive locks: Mutual exclusive locks, where a thread holds a lock.

Fair Lock: First come, first get, order get. Reduced performance.
Unfair Lock: No need, luck provides performance,

Spin lock: CAS Thought: No spin, no waste cpu,No blocking.
Non-spin lock: Causes the thread to wait.

Interruptible lock: Send signal, interrupt acquisition lock, increase flexibility.
Non-interruptable lock:The acquisition of a lock cannot be interrupted, and the lock will be released all the time.

Re-lockable: Acquires locks multiple times, avoiding deadlocks to some extent
 Non-Re-Lockable: Locks cannot be acquired multiple times, but only once.

ReentrantLock

Characteristic

  • Lock timeout supported
  • Support interruptible
  • Supports fair locks, unfair locks
  • Get the result of the lock
  • ConditionWaiting for Notification
  • Reentrant locks

API

lockAcquire locks
unlockRelease lock
boolean trylock()Attempting to acquire a lock
boolean trylock(time,unit)Acquire locks within a limited time
lockInterruptibly()Corresponding interruption
newCondition()Waiting for Notification Queue

Re-lockable

  Non-reentrant: Threads can only acquire one lock. Deadlock is prone. sun.lock

  Reentrable: Threads can acquire the same lock multiple times. Avoid deadlocks  
package com.company;

import sun.misc.Lock;

/**
 * Non-reentrant lock
 */
public class NoLockDemo {

    public static void main(String[] args) {

        Lock lock=new Lock();
        try {
            lock.lock();
            System.out.println("First Acquisition Lock");
            lock.lock();
            System.out.println("Acquire lock again");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
package com.company;


import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Re-lockable
 */
public class NoLockDemo {

    public static void main(String[] args) {

        Lock lock=new ReentrantLock();
        try {
            lock.lock();
            System.out.println("First Acquisition Lock");
            lock.lock();
            System.out.println("Acquire lock again");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

Reentrainable Lock Principle

 A counter is maintained internally. One lock acquisition adds one. Release the lock once, subtract it once. 

How do I get the lock?

Namely AQS A variable is maintained internally state Equal to 0 means no lock, equal to 1 means acquired lock, greater than 1 means re-lockable.

[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-pyTraeHg-16211537638) (C:\UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-202107042.png)]

Fair-unfair locks

Fair Lock: care about queues, order, first come first. Cooking

Unfair Locks: Do not care about order, do not care about queue conditions. Bandit

Case Demonstration

package com.company;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 *  Fair lock construction method true:
 *
 *  The unfair lock construction method false defaults to unfair locks:
 *
 */
public class ImpartialLockTest {
    
    //The default construction method is an unfair lock, a false fair lock, and a true is a fair lock
   Lock lock  =new ReentrantLock(true);

    public void getLocks() {
            lock.lock();//Unfair does not queue, fair locks queue
            System.out.println(Thread.currentThread().getName()+" Acquire locks");
            lock.unlock();
    }

      public static void main(String[] args) {
        //Synchronize resources
          ImpartialLockTest tt=new ImpartialLockTest();
          new Thread(()->{
              tt.getLocks();
          },"thread A").start();

          new Thread(()->{

              tt.getLocks();

          },"thread B").start();
        }
}

principle

   Unfair Lock: Acquire locks first to pass CAS,If it fails try to acquire the lock, try to acquire the lock (unlock, acquire the lock, check if the current thread is its own) If it succeeds, end, otherwise wrap yourself up as a Node Nodes are placed in the blocking queue and the blocking is released.

  Fair Lock: Determine if there are any waiting nodes in the blocked queue first. If it does, it wraps itself up as Node Node to wait (reentrant check) or pass CAS Update acquisition lock.      

[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-zCUSUIeA-16211537642) (C:\UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-20249.png)]

Interrupt Get Lock Thread

lockInterruptibly()  throws InterruptedException       //Thread.interrupt()

Attempt to acquire lock&acquire lock timeout

   boolean tryLock :Attempt to acquire a lock and get a successful return true,Otherwise return false,Don't wait 

   boolean tryLock(time,unit),Get a successful return within a specified time true,Otherwise return false,wait for 

Cases

package com.company;

import java.util.concurrent.locks.ReentrantLock;

public class MyTryLock {
public ReentrantLock lock = new ReentrantLock();
 
public void serviceMethod(){
    try {
        if (lock.tryLock()) {
      // if (lock.tryLock(3,TimeUnit.SECONDS)) {
         if (lock.tryLock(3, TimeUnit.SECONDS)) {
            System.out.println(Thread.currentThread().getName() + "Acquire Lock ");
            Thread.sleep(4000);
        } else {
            System.out.println(Thread.currentThread().getName() + "Lock not acquired");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }finally {
        if (lock.isHeldByCurrentThread()) {
            lock.unlock();
        }
    }
}
public static void main(String[] args) {
    //Synchronize resources
    MyTryLock myTryLock = new MyTryLock();
    Runnable runnable = new Runnable() {
        @Override
        public void run() {
            myTryLock.serviceMethod();
        }
    };
    Thread aa = new Thread(runnable, "aa");
    aa.start();
    Thread bb = new Thread(runnable, "bb");
    bb.start();
}
}

Get wait notification, wait for queue instance Condition

Condition newCondition()  replace object Inside wait and notify() More flexible use of wait for notification in multi-threaded scenarios.

ReentrantReadWriteLock Read-Write Lock

Reasons for Read-Write Locks: Make up for exclusive locks, company scenario: Read more and write less

Characteristic:

Fairness and Inequality

Supports lock demotion

Support reentrant

Support interruptible

Support condition

API aspects

readLock()Get Read Lock Object
writeLock()Get Write Lock Object
int getReadLockCount()Number of read locks
getWriteHoldCount()Number of write locks
* Read-write exclusion
* Write-write exclusion
* Reading is not mutually exclusive

Principle of Read-Write Lock

Write lock:By calculating the number of write locks until the state is locked, [if there is a lock] --> [if there is a read lock, the acquisition of a write lock fails] [if the write lock is occupied by other threads and the acquisition of a write lock fails], the maximum number of write locks is [MAX_COUNT, 65535], otherwise the lock is updated and the acquisition ends successfully. [No locks present] -->If a check lock is not acquired, check to see if there is a waiting queue or false. Then update the lock, if the update succeeds, the lock will be acquired successfully, otherwise the lock will fail. (Fair Lock checks whether the queue has a wait task, and unfair returns false directly.) Reflects: [Read and write mutually exclusive, write and write mutually exclusive]
Read lock:Calculate the number of write locks: If other threads have write locks, acquiring them fails (read-write exclusion, the process in which the write lock is being written, we cannot read) by using a binary right operation to obtain the number of read locks. Acquire locks (wait queue, maximum number of write locks, cas update success, lock success) Return 1 indicates success, otherwise call fullTryAcquireShared(current) deadlock to get the final value (unlock success or unlock failure)

state represents the number of read and write locks calculated

Read lock: return C >>>> SHARED_ SHIFT; Right operation

Write lock: return C & EXCLUSIVE_ MASK;

[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-jv7NH8MW-1621751537645) (C:\UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-2024300;png)]

Disadvantage: Causes thread hunger

Postmark Lock StampedLock

  Thread Hunger?   Read locks are so intensive that write locks are always unavailable that write lock threads cannot access them. Waiting.

Stamp: Pass stamp check; Whether the lock has been acquired or not. Checkmark

Read/Write + Optimistic Read

public long tryOptimisticRead()Get an optimistic read lock and return a "stamp" version number
public boolean validate(long stamp)Checkmark
public long tryReadLock()Acquire Read Lock
public long tryUnlockWrite()Get Write Lock

Note: Reentrant is not supported

Note: Condition waiting for notification is not supported

Read and write locks convert to each other.

Postmark, Increase Concurrency

package com.company;

import java.util.concurrent.locks.StampedLock;

/**
 * Common postmark lock api
 */
public class StampedLockDemo {


    public static void main(String[] args) {

        StampedLock lock=new StampedLock();

        long stamped = lock.tryOptimisticRead();

        System.out.println("Optimistic Postmark "+stamped);
        
        long r = lock.tryReadLock();
        
        System.out.println("Read Lock Version Number "+r);
        
        lock.unlockRead(r);
        
        long w = lock.tryWriteLock();
        
        System.out.println("Write Lock Version Number "+w);
        
        lock.unlockWrite(w);


        boolean validate = lock.validate(stamped);
        
        if(validate){
            System.out.println("Not modified by write lock"+stamped);
        }else{
            System.out.println("Write Lock Acquisition"+stamped);
        }

         
    }
    
}

Optimistic reading: cases

package com.company;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;

/**
 * Calculate Sum and Resolve Thread Hunger
 */
public class StampedLockDemo {

    StampedLock lock=new StampedLock();
    int a=10,b=10;

    public void sum(){

        //Optimistic Read to Get Version Number
        long l = lock.tryOptimisticRead();

        int a1=a;
        int b1=b;


        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //Checkmark
        if(!lock.validate(l)){

            //Pessimistic Reading
            long l1 = lock.readLock();
            System.out.println("Getting a pessimistic read lock----->version number"+l1);

              a1=a;
              b1=b;

            System.out.println("Release Pessimistic Read Lock----->version number"+l1);
            lock.unlockRead(l1);
        }



        System.out.println("a1="+a1+"b1="+b1+"Calculated sum"+(a1+b1));


    }

    public void udpateInt(int a,int b){

        long w = lock.writeLock();
        System.out.println("Write lock acquired"+w);
        try {
            this.a=a;
            this.b=b;
        } finally {
            System.out.println("Write lock released"+w);
           lock.unlockWrite(w);
        }


    }

    public static void main(String[] args) {

        StampedLockDemo demo=new StampedLockDemo();

        new Thread(()->demo.sum()).start();

        new Thread(()->demo.udpateInt(1,2)).start();


        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

}

Condition Wait Notification Interthread Writing Tool

  • Reasons for the emergence of new wait-for-notification objects (conditional queues) in JUC packages: rich api s, powerful extensions.
  • Same thing: You must acquire the lock object.
  • The difference: api (wait,notify,notifyAll) inside the object, await,signal,signalAll inside the condition
  • Get one, you can get multiple conditional queues,
  • Wait timeout is also supported.

Lock+Condition substitution: synchronized+Object

void await()Wait, release lock resource
boolean await(long time, TimeUnit unit)Timeout Wait
void signal()Random wake-up
void signalAll()Wake Up All
package com.company;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Waiting for Notification Condition
 */
public class ConditionDemo {


    public static void main(String[] args) {


        ReentrantLock lock=new ReentrantLock();
        Condition condition = lock.newCondition();


        new Thread(()->{

            lock.lock();

            try {
                System.out.println(Thread.currentThread().getName()+"Start Waiting");
                //Wait, release lock
                condition.await();
                System.out.println(Thread.currentThread().getName()+"Release Waiting");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }

        }).start();


        new Thread(()->{


            //Lock is required before
            lock.lock();


            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            }


            try {
                System.out.println(Thread.currentThread().getName()+"Start waking up");
                //Wake up one at random
                condition.signal();
            } catch (IllegalMonitorStateException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }


        }).start();


    }

}

Source level:

await(): LockSupport.park(this); Wait, suspend thread, [join conditional queue, release lock resource, check aqs blocking queue, suspend thread] finished waiting

siognal():LockSupport.unpark(node.thread); The core wake-up thread [joins the blocked queue, aqs queue, has the chance to get a lock], unpark wakes up.

[External chain picture transfer failed, source may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-tOsxbDVm-16211537646) (C:\UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-2028019.png)]

LockSupport Block/Wake Tool

1, no locks required, blocking and waking up directly.

2. The process of waking up and blocking can be in an interchangeable order. Avoid Deadlock [License 0,1]

Static methods do not need to get objects, use them directly

public static void park()block
public static void unpark(Thread thread)Release, wake up thread
package com.company;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;

public class LockSupportTest {
    
    public static void main(String[] args) {

        Thread t=new Thread(()->{

            System.out.println("Threads Waiting Infinitely");
            //No locks required
            LockSupport.park();
            System.out.println("Wake Threads");

        });

        t.start();


        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        LockSupport.unpark(t);
     
    }

}

Atomic class atomic

Multiple** Threads without locking, still thread safe, atomic update synchronization resources, different interrupts. ** java.util.concurrent.atomic

Based on CAS+unsafe+volatile

CAS: [Compare and Swap] Main Memory, Expected Value, Updated Value, 10==10? 20 instead of 10: no updates.

unsafe: manipulate c,c++ libraries, jni operations, send instructions through hardware resources, directly manipulate memory, atomic operations

volatile: Visibility.

api classification: four parts [basic type] [array type] [reference type] [field type] accumulator

Basic typesAtomicInteger,AtomicBoolean,AtomicLong
Array typeAtomicLongArray,AtomicIntegerArray,AtomicReferenceArray,
reference typeAtomicReference,AtomicStampedReference,AtomicMarkableReference
Field typeAtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater,
accumulatorLongAdder,DoubleAdder,LongAccumulator,DoubleAccumulator

Atomic Integer

Reason for this: Atomic update operations are maintained in a multithreaded situation. No locks are required. Thread security, (i++, concurrency issues in multithreaded scenarios)

/**
 * Non-atomic operation (bulky)
 */
public class AtomicIntegerDemo {
    int i=0;
    public synchronized void sum(){
        System.out.println(Thread.currentThread().getName()+" "+(i++));
    }
    
    public static void main(String[] args) {

        AtomicIntegerDemo  demo=new AtomicIntegerDemo();

        for(int i=0;i<10;i++){

            new Thread(()->demo.sum()).start();

        }
    }
}
package com.company;


import java.util.concurrent.atomic.AtomicInteger;

/**
 * CAS-based unlocked operation
 */
public class AtomicIntegerDemo {
    AtomicInteger atomicInteger=new AtomicInteger();
    public   void sum(){
        System.out.println(Thread.currentThread().getName()+" "+(atomicInteger.getAndIncrement()));
    }

    public static void main(String[] args) {

        AtomicIntegerDemo  demo=new AtomicIntegerDemo();

        for(int i=0;i<10;i++){

            new Thread(()->demo.sum()).start();

        }
    }
}

API

public final int get()Get the latest value
public final int getAndSet(int newValue)Get the old value and set it to the new value
public final boolean compareAndSet(int expect, int update)expect==current value? Replace with update: Do nothing
public final int getAndIncrement()Get old value, increase by 1
public final int getAndDecrement()Get the old value minus 1
public final int getAndAdd(int delta)Wait until the old address, add up delta
public final int addAndGet(int delta)Get a new value, add up delta
package com.company;


import java.util.concurrent.atomic.AtomicInteger;

/**
 * api cas AtomicInteger
 */
public class AtomicIntegerDemo {



    public static void main(String[] args) {
        AtomicInteger atomicInteger=new AtomicInteger();
        int andIncrement = atomicInteger.getAndIncrement();
        System.out.println("andIncrement="+andIncrement);
        System.out.println("get="+atomicInteger.get());

        int andAdd = atomicInteger.getAndAdd(3);

        System.out.println("andAdd="+andAdd);

        System.out.println("get="+atomicInteger.get());

        int andDecrement = atomicInteger.getAndDecrement();

        System.out.println("getAndDecrement"+andDecrement);

        System.out.println("get="+atomicInteger.get());

        boolean b = atomicInteger.compareAndSet(7, 6);

        System.out.println("Compare Replacement"+b);

        System.out.println("get="+atomicInteger.get());

    }
}

Atomic Update Array

Update: Add a corner field

public final long getAndSet(int i, long newValue)Returns the old value and updates it with the corner label
public final boolean compareAndSet(int i, long expect,long update)Returns a Boolean value, Corner Scale: Expected value, Updated value (Expected value==Current value? Updated value: Do nothing)
public final long getAndIncrement(int i)Return the old value by adding one to the corner label
public final long getAndDecrement(int i)Returns the old value by subtracting one from the corner label

Atomic Update Reference Type

public final boolean compareAndSet(V expect, V update)Return the old value and replace the comparison with the new value
public final V getAndSet(V newValue)Return old value, set to new value
package com.company;

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceDemo {

    public static void main(String[] args) {
        AtomicReference reference=new AtomicReference("Jay Chou");

        boolean b = reference.compareAndSet("Jay Chou", "Hannah");
        System.out.println("b=  "+b);

        System.out.println(reference.get());

        Object fws = reference.getAndSet("Fang Wen Shan");

        System.out.println(fws);

        System.out.println(reference.get());
    }

}

Atomic Update Field Type - AtomicIntegerFieldUpdater

public abstract boolean compareAndSet(T obj, int expect, int update)
public int getAndAdd(T obj, int delta)

Must:

Must be volatile modifier,

Must be int

Cannot static

Cannot be private

package com.company;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class AtomicIntegerFieldUpdaterDemo {

    public static void main(String[] args) {

        Persion persion=new Persion();

        AtomicIntegerFieldUpdater f  = AtomicIntegerFieldUpdater.newUpdater(Persion.class,"i");

        boolean b = f.compareAndSet(persion, persion.i, 2);

        System.out.println(b);

        System.out.println(f.get(persion));

        int andAdd = f.getAndAdd(persion, 4);

        System.out.println(andAdd);

        System.out.println(f.get(persion));

    }


}

Non-spinning CAS unsafe unSafe class

CAS:  [Comparison and Exchange [ compareAndSwap]  Main memory value, expected value, value to be updated, 10==10?20 Replace 10: Do not update.

unSafe: Unsafe class, dangerous class, not recommended for use, can send instructions to operate hardware resources. c,c++Library. Operational memory

[External chain picture transfer failed, source may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-XcLmKHET-1621751537647) (C:UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-2027575.png)]

Problem: Constant spinning wastes cpu performance and is recommended for non-massive concurrency. No need to switch Online

ABA Question A-B--A

Solve ABA Problem-AtomicStampedReference

API

public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp)Expected reference, new value, stamp, new stamp
public V getReference()Get the current object application

Both Atomic MarkableReference and AtomicStampedReference can solve ABA problems.

Inborn problem CAS

package com.company;

import java.util.concurrent.atomic.AtomicStampedReference;

public class AtomicStampedReferenceDemo {

    public static void main(String[] args) {

        AtomicStampedReference  atomicStampedReference=new AtomicStampedReference("Jay Chou",1);

        boolean b = atomicStampedReference.compareAndSet("Jay Chou", "Zhou Yunfa", 1, 2);

        System.out.println(b);
        Object reference = atomicStampedReference.getReference();
        System.out.println(reference);
        int stamp = atomicStampedReference.getStamp();
        System.out.println(stamp);

        if(stamp!=2){
            return;
        }

    }

}

LongAdder-High Concurrent Atomic Accumulator

AtomicLong Why there are  LongAdder  ?
AtomicLong Cas Operation failed, spin, waste of energy     private volatile int value;  First Hand,
LongAdder cells[value] Array, which shares the pressure on competing lock resources and is divided into several parts. Cumulative. [No multiple threads competing for one bvaseValue. Use if multi-threaded cells[]Array)
Construction method: accumulate from 0 by default

API

public void increment()add one-tenth
public long sum()Summation
package com.company;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

public class LongAdderDemoo {

    public static void main(String[] args) {
        LongAdder l=new LongAdder();

        for (int i=0;i<10000;i++){

            new Thread(()->l.increment()).start();

        }
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Summation"+l.sum());
    }


}

Concurrent Collection-CopyOnWriteArrayList

  ArrayList :  elementData[size++] = e;  Thread insecurity caused data loss.

Solution:

Vector: All methods are synchronized. Performance degradation. Threads hungry, a lock

Collections. SynchronizedList(list);/ Synchronized transforms an unsafe collection into a secure collection. Performance degradation. Threads hungry, a lock

CopyOnWriteArrayList: Copy a new array when writing (add, delete, change) to ensure the security of the collection, (can be used when the amount of data is small, a waste of memory space).

API

addIfAbsent(E e)If there are elements, do not add them, have the function of weighting
public List subList(int fromIndex, int toIndex)Intercept elements within a specified range (including header, not tail)
package com.company;

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

public class CopyOnWriteArrayListDemo {

    public static void main(String[] args) {
        ArrayList list=new ArrayList();
        list.add("Jay Chou");

        Vector vector=new Vector();
        vector.add("");
      //  vector.get(9);

        //Turn an unsafe collection into a safe collection
        Collections.synchronizedList(list);


        
        //A secure collection, adding, deleting, and altering replicated collections
        CopyOnWriteArrayList clist=new CopyOnWriteArrayList();

        clist.add("Jay Chou");

        clist.addIfAbsent("Chapter 5");

        clist.add("Cai Yilin");

        clist.addIfAbsent("Jay Chou");


        ListIterator listIterator = clist.listIterator();

        while (listIterator.hasNext()){
            Object next = listIterator.next();
            System.out.println(next);
        }

        //Truncate partial values [including header, not tail]
        List list1 = clist.subList(0, 1);

        System.out.println(list.get(0));
    }

}

principle

Lock, copy the array, set the new array pointing.

public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        //Copy new elements to a new array
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        //Point to New Array
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

Concurrent Collection-CopyOnWriteArraySet

hashSet: Thread insecurity: bottom used HashMap

Collections. Synchronized Set (hashSet); Locking, poor performance.

CopyOnWriteArraySet : CopyOnWriteArrayList.addIfAbsent()api,No more data can be added to the container.

Concurrent Collection-ConcurrentHashMap

hashMap Why can't I use it in a multi-threaded situation?: Concurrent modification of exceptions, expansion leads to closed-loop, deadlock, put Data Loss [Multi-threading scenario]-Thread Pool)
Series not recommended
Hashtable hashtable=new Hashtable();
Collections.synchronizedMap(new HashMap<>());
//Unsafe concurrent modification exception
HashMap hashMap=new HashMap();
for (int i=0;i<1000;i++){
    int finalI = i;
    new Thread(()->{
        hashMap.put(Thread.currentThread().getName()+"-->"+ finalI, finalI);
        System.out.println(hashMap);
    }).start();

}
//Thread Security
ConcurrentHashMap hash=new ConcurrentHashMap();
for (int i=0;i<1000;i++){
    int finalI = i;
    new Thread(()->{
        hash.put(Thread.currentThread().getName()+"-->"+ finalI, finalI);
        System.out.println(hash);
    }).start();

}

Avoid expansion issues?

Number of expansion: int SC = n - (n >>> 2);

Do not use as an array

/**
      map Maximum capacity
 */
private static final int MAXIMUM_CAPACITY = 1 << 30;

/**
   Default capacity
 */
private static final int DEFAULT_CAPACITY = 16;


/**
 * Number of default concurrency level locks
 */
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
 * Load factor: calculate hash value to reduce hash collisions  
 */
private static final float LOAD_FACTOR = 0.75f;

/**
 *  More than or equal to 8-chain list structure converted to red and black numbers
 */
static final int TREEIFY_THRESHOLD = 8;

/**
 *  Converting structures with red and black numbers less than 6 to a chain table
 */
static final int UNTREEIFY_THRESHOLD = 6;

Capacity Expansion Quantity Calculation

// put 30 elements is not an array [30]

ConcurrentHashMap cHashMap=new ConcurrentHashMap(40);
cHashMap.put("zjl",48);

int sc = 16 - (16 >>> 2);
System.out.println(sc);


double i=30/0.75;
System.out.println(i);

When map simple principle put?

Version 1.7: Segment Lock seGement[]Lock+hashEntry Internal Chained Table Bacon (Reentrantlock+Entity+Chain List)

[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-feaCpAbt-16211537648) (C:\UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-2021625252525252.png)]

1.8 CAS+node+red-black tree

ConcurrentHashMap principles:

synchronized: Performance is not currently optimized very poorly. That's great
Node Node: Packaged data, part of a chain table
CAS Spin operation: Initialize add first node use
 Red-Black Tree, Chain List: Less than eight are chains, and more than eight are converted to red-black trees.
Red-Black Tree: Good performance can reduce io Number of times to improve traversal efficiency

put: calculates the hash value, initializes the table, waits for the expansion value, and adds the reason for the Cas operation to the node array if it is empty. Otherwise, if you want to see if you are expanding, instead of expanding, add a node node (chain table interface, red-black tree) and convert it to a red-black tree if the number is more than or equal to eight. Add count value.

Chat and Queue

Queue Classification: Blocked Queue + Non-Blocked Queue

Queue characteristics: FIFO, FIFO. [Priority Queue]

Blocking queue: See if the queue capacity is [empty, full] blocking the current thread waiting. Until the queue is not empty or the data is not satisfied, continue to operate the container and the thread wakes up.

Non-blocking queue: The capacity of the queue is wireless. Integer.MaxValue(); Based on CAS, non-blocking algorithm, concurrency capability is strong, data does not need to wait.

Blocked Queues: ReentrantLock [Pessimistic Policy], Non-blocked Queues [Optimistic Lock Mechanism for CAS Conflict Monitoring]

Blocked Queue &Nonblocked Matters Scenario: [Producer Consumer], [Thread Pool]

Non-blocking queue API:

ConcurrentLinkedQueue  Non-blocking queue  CAS No lock, unlimited capacity

Blocking queue API:

Blocking+Non-blocking api

ArrayBlockingQueueBounded queue based on array + specified capacity
DelayQueueDelayed queue, automatically executes fetch element at time
LinkedBlockingQueueBounded Queue Based on Chain List+Specified Capacity
PriorityBlockingQueuePriority queue, which can be sorted when fetching data
SynchronousQueueSynchronized Queue

Non-blocking Queue-ConcurrentLinkedQueue

Non-blocking queues are based on CAS Concurrency beyond blocked queue, insert to end of data, get from scratch, thread safe
boolean add(E e)Add element, cannot be null, return Boolean value
boolean offer(E e)Add element, cannot be null, return Boolean value
E peek()Get elements: peep, look, don't delete elements, cut back to the first element
E poll()Get Elements: Bright, Look, Delete Elements, Cut Back to First Element
package com.company;

import java.util.concurrent.ConcurrentLinkedQueue;

public class ConcurrentLinkedQueueDemo {

    public static void main(String[] args) {
        ConcurrentLinkedQueue queue=new ConcurrentLinkedQueue();
        boolean Jay Chou = queue.add("Jay Chou");
        System.out.println(Jay Chou);
        boolean Wang Lihong = queue.offer("Wang Lihong");
        System.out.println(Wang Lihong);
        Object peek = queue.peek();
        System.out.println(peek);
        //Element deleted
        Object poll = queue.poll();
        System.out.println(poll);

        //Inefficient
        int size = queue.size();
        System.out.println(size);

        //Efficient
        boolean empty = queue.isEmpty();
        System.out.println(empty);

    }

}

Principle: offer adds elements

node.next--->Next node Node Chain Table Structure
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val)  CAS operation+Dead cycle

Array Blocking Queue

Provides a way to block functions: take (), pull (), the effect of flow first.

Interface BlockingQueue

Two types of methods: blocking and non-blocking

Construct method: public ArrayBlockingQueue(int capacity): fair lock unfair lock + capacity + initialization array [must write a capacity]

All queues cannot add empty elements

boolean add(E e)Add elements and return resultstrue,false
boolean offer(E e)Add elements and return resultstrue,false
void put(E e)Add Elements, Block
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedExceptionAdd element, timeout blocking
E peek()Get elements: peep, look, don't delete elements, cut back to the first element
poll()Delete the first element without blocking
public E poll(long timeout, TimeUnit unit) throws InterruptedExceptionDelete the first element without blocking (time-out)
public E take() throws InterruptedExceptionDelete first element, will block
package com.company;

import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockingQueueDemo {

    public static void main(String[] args) {
        ArrayBlockingQueue arrayBlockingQueue=new ArrayBlockingQueue(1);
        boolean add = arrayBlockingQueue.add(3);
        System.out.println(add);
        boolean offer = arrayBlockingQueue.offer(4);
        System.out.println(offer);
        int size = arrayBlockingQueue.size();
        System.out.println(size);

        try {
            arrayBlockingQueue.put("32");
            System.out.println("32");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        Object peek = arrayBlockingQueue.peek();
        System.out.println(peek);

        try {
            Object take = arrayBlockingQueue.take();
            System.out.println("Printable logs"+take);
            Object take1 = arrayBlockingQueue.take();
            System.out.println("Unprintable logs");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

}

Array structure

/* Element Storage */
final Object[] items;

/** Number of data elements */
int count;

/** A reentrant lock */
final ReentrantLock lock;

/** Condition  Conditional Queue */
private final Condition notEmpty;

/** Condition Conditional Queue */
private final Condition notFull;

Blocking principle:

   pull();notFull.await(); Blocking the current thread, waiting for data to be fetched take()Wake up after data is fetched await()Threads ( signal Random wake-up)

   take(); notEmpty.await();  Block the current thread, add data, and wake up. ( signal Random wake-up)

Chain List Blocking Queue-LinkedBlockingQueue

Be based on Node Node: item,next, Chain list structure, MAX_VALUE,Memory leak, size of specified capacity, element cannot be empty otherwise null pointer exception, 2 locks, lock separation, higher throughput
package com.company;

import java.util.concurrent.LinkedBlockingQueue;

public class LinkedBlockingQueueDemo {

    public static void main(String[] args) {
        LinkedBlockingQueue queue=new LinkedBlockingQueue(2);
        queue.offer("Jay Chou");
        queue.add("Hannah");
        System.out.println(queue);
        queue.peek();
        queue.remove("Jay Chou");
        try {
            queue.put("Fang Wen Shan");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(queue);
    }

}
/**
 * Entity Object
 */
static class Node<E> {
    E item;

    /**
     * One of:
     * - the real successor Node
     * - this Node, meaning the successor is head.next
     * - null, meaning there is no successor (this is the last node)
     */
    Node<E> next;

    Node(E x) { item = x; }
}

/** capacity */
private final int capacity;

/** Number Node */
private final AtomicInteger count = new AtomicInteger();

/**
 * Header of Chain List
 */
transient Node<E> head;

/**
 * Tail of linked list.
 *  Tail of chain list
 */
private transient Node<E> last;

/**  AQS Blocking Queue First Element */
private final ReentrantLock takeLock = new ReentrantLock();

/** Conditional Queue (first element) */
private final Condition notEmpty = takeLock.newCondition();

/** AQS Blocking Queue Add Queue Lock */
private final ReentrantLock putLock = new ReentrantLock();

/**  Conditional Queue (add element) */
private final Condition notFull = putLock.newCondition();

Principle:

put:  When the queue is full, it will notFull.await();Blocking threads must wait for other threads to wake up blocking threads. Add Queue-- lastnode Point to New node Node, and modify count Value.

take:When the queue is empty: it will notEmpty.await(); Blocking threads must wait for other threads to wake up blocking threads. first.item = null; Modify to null. 

ArrayBlockingQueue&LinkedBlockingQueue Difference

Same:

ReentrantLock locks AQS blocking queues.

Condition: Conditional queue wakes up waiting.

Differences:

The former is one lock and the latter uses two locks. Read-write separation.

Data structure: an array and a chain table, which occupy a lot of memory.

Capacity part: can execute size, which does not specify that it is actually an unbound queue (MAX_VALUE)

Priority Queue - PriorityBlockingQueue

What's called a priority queue: We customize the comparator to specify the order of elements, such as vip from the bank handling business and queuing for dinner.

Storage Features: Binary Heap + Array + CAS, Unbounded Queue [default is 11 sizes, expandable to unbound queue MAX_VALUE] [sorted to the head of the queue]

Binary heap: [Maximum heap, minimum heap]

[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-5v4eeIpJ-1621751537649) (C:UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-2024.png)]

API

put inserts the specified element into this priority queue. Since the queue is unrestricted, this method will never be blocked.
package com.company;

import java.util.concurrent.PriorityBlockingQueue;

public class PriorityBlockingQueueDemo {
   static class  Persion implements Comparable<Persion>{
        int i;
        String name;

       public Persion(int i, String name) {
           this.i = i;
           this.name = name;
       }

       public int getI() {
            return i;
        }

        public void setI(int i) {
            this.i = i;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        @Override
        public int compareTo(Persion o) {
            return this.getI()>o.getI()?-1:1;
        }

       @Override
       public String toString() {
           return "Persion{" +
                   "i=" + i +
                   ", name='" + name + '\'' +
                   '}';
       }
   }

    public static void main(String[] args) {
        PriorityBlockingQueue queue=new PriorityBlockingQueue();
        queue.offer(new Persion(10,"Jay Chou"));
        queue.offer(new Persion(4,"Hannah"));
        queue.offer(new Persion(7,"Fang Wen Shan"));
        queue.offer(new Persion(2,"Wang Lihong"));
        queue.offer(new Persion(9,"Lin Junjie"));
        queue.offer(new Persion(5,"Xue Zhiqian"));
        try {
            queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.err.println(queue);
    }

}

Principle:

offer: After acquiring the lock, release the lock if expansion is required and based on CAS Operations to expand, otherwise another thread that did not succeed in the race will yield CPU Time slice. Assign a new array and execute a new reference queue Object. Add elements, sort. Wake the thread at random again. Conditional Queue (wakes up every time)

Delay Queue

What is a delayed queue: elements must expire before they can be removed peek(),poll(),take(),   Element implementation java.util.concurrent.Delayed ,long getDelay(TimeUnit unit) Returns the remaining time, if less than or equal to 0, indicating expiration. Ordered, PriorityQueue (Array structure)

Countdown, cancel order

Case:

package com.company;

import java.sql.Time;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueueDemo {


    static class DelayedDemo implements Delayed{


        int i;
        String name;
        //Expiration Time
        long exprie;

        public DelayedDemo(int i, String name, long exprie) {
            this.i = i;
            this.name = name;
            this.exprie = exprie+System.currentTimeMillis();
        }

        /**
         * Expiration calculation convert<=0 indicates expiration
         * @param unit
         * @return
         */
        @Override
        public long getDelay(TimeUnit unit) {
            long convert = unit.convert(exprie - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            System.out.println(convert);
            return convert;
        }

        @Override
        public int compareTo(Delayed o) {
            return this.getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS)>=0?1:-1;
        }

        @Override
        public String toString() {
            return "DelayedDemo{" +
                    "i=" + i +
                    ", name='" + name + '\'' +
                    ", exprie=" + exprie +
                    '}';
        }
    }



    public static void main(String[] args) {

        DelayQueue queue=new DelayQueue();
        queue.offer(new DelayedDemo(1,"Jay Chou",111));
        queue.offer(new DelayedDemo(2,"Hannah",10022));

        System.out.println(queue);

        while (true){

            try {
                Delayed take = queue.take();
                System.out.println(take);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }


    }

}

Principle:

offer();  After adding the element, and then heading, reset the thread null,Random wake-up threads.
take();   Dead cycle+available.await(); Check to see if the queue is empty, wait if it is empty, or get the timeout. If less than or equal to 0, the element pops up. Otherwise, check to see if someone is waiting to get the first element, and if someone is waiting, sleep and wait. Otherwise, wait for a fixed amount of time and wake up automatically. Go Again for Traverse through the first element to get the element.

Synchronous Queue

Synchronization Queue: A thread that wants to retrieve data is blocked until a thread adds data to the queue. So we can take it out. block

Features: Blocked, empty set, api not available, can not determine whether empty isEmpty() true, iterator is always empty, invalid. peek();

Thread pool: ExecutorService executorService = Executors.newCachedThreadPool();

Case:

package com.company;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;

/**
 * The synchronous queue put take blocking method cannot be offer,peek
 */
public class SynchronousQueueDemo {

    public static void main(String[] args) {

        SynchronousQueue queue=new SynchronousQueue();

        new Thread(){
            
            
            @Override
            public void run() {
                super.run();
                try {
                    /***
                     * Block the child thread queue.put("Jay Chou");
                     */
                    queue.put("Jay Chou");
                    System.out.println("Ouch good!"+queue);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();

        try {
            Thread.sleep(3000);
            System.out.println("I'm up~~~~~");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            Object take = queue.take();
            System.out.println("Ha ha ha="+take);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}

Chat Thread Pool-Executor

Not recommended: new Thread().start();

1. Waste server resources, create and destroy wasted resources

2. The object cannot be destroyed immediately. A large number of objects, GC

3, what cycle of a thread is longer, it is not good to manage the state of the thread, interrupt..

4. Periodic, scheduling functions are not supported. Threads compete for resources easily

[Resource Destroy] [Poor Management]

** What is a thread pool: ** Maintain a batch of new threads (). Start() into a pool or queue collection. When a task is committed, a thread is taken out and used directly. It is fast and returns the thread to the thread pool after the task is completed. Or the idle thread is destroyed.

**Benefits: ** Threads do not need to be created and destroyed frequently. Reduce waste of resources.

Easy to manage the state of threads.

Support periodicity and increase speed accordingly. Reduce competition for resources,

Easy to control the number of threads

Briefly introduce the classification of thread pools:

Executorvoid execute(Runnable command) top-level interface, submit tasks
ExecutorServiceExtended Executor interface, provided submit, api
ForkJoinPoolJob theft, intensive replenishment of previous thread pools
ThreadPoolExecutorUse when customizing thread pools
ExecutorsThere are static methods that return thread pool objects

Four built-in thread pools for JDK

//Fixed size thread pool

package com.company;

import java.util.concurrent.*;

public class ExecurotrsDemo {

    public static void main(String[] args) {

        //Fixed-size thread pool 3 submit submit task executor (new Runable) oom memory overflow 
        ExecutorService es= Executors.newFixedThreadPool(3);
        for (int i=0;i<5;i++){


            es.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });


            es.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });

            Future<Object> submit = es.submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    System.out.println(Thread.currentThread().getName());
                    return "General Manager";
                }
            });

            try {
                Object o = submit.get();
                System.out.println(o);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }

        }





    }


}
package com.company;

import java.util.concurrent.*;

public class ExecurotrsDemo {

    public static void main(String[] args) {

        //A single thread pool has only one thread. Threads that exit abnormally are automatically created. Role of Synchronization [Test Environment Usage] Risk of oom memory overflow  
        ExecutorService es= Executors.newSingleThreadExecutor();
        for (int i=0;i<5;i++){
            es.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
            
        }
 

    }


}
package com.company;

import java.util.concurrent.*;

public class ExecurotrsDemo {

    public static void main(String[] args) {

        //Keep creating thread reshaping max, oom [unlimited number of threads], threads that are not in use for 60 seconds will be terminated and deleted from the cache [Keep creating threads]
        ExecutorService es= Executors.newCachedThreadPool();
        for (int i=0;i<45;i++){
            es.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });

        }


    }


}
package com.company;

import java.util.concurrent.*;

public class ExecurotrsDemo {

    public static void main(String[] args) {

        //The periodic thread pool of the int corePoolSize core thread oom
        ScheduledExecutorService es= Executors.newScheduledThreadPool(5);


        for (int i=0;i<2;i++){

            //Execute after 3 seconds
            for (int i2=0;i2<45;i2++){
                es.schedule(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread().getName());
                    }
                },3, TimeUnit.SECONDS);



                //The first time is after 4 seconds, then every three seconds
            es.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            },4,3, TimeUnit.SECONDS);

        }

    }

}

Custom Thread Pool

Why use a custom thread pool: four built-in thread pools [fixed-size thread pool newFixedThreadPool(int nThreads)] [single-threaded thread pool xecutorService newSingleThreadExecutor()] [cacheable thread pool ExecutorService newCachedThreadPool()] [periodic ScheduledExecutorService newScheduledThreadPool(int corePoolSize)] Causes oom. Required Business Custom Thread Pool

1, Queue size

2, such as whether to run a discarded task.

ThreadPoolExecutor

package com.company;

import java.sql.Time;
import java.util.concurrent.*;

public class ExecurotrsDemo {

    static  class ThreadFactoryDemo implements ThreadFactory{


        @Override
        public Thread newThread(Runnable r) {
            Thread t= new Thread(r,"Jay Chou's Customized Threads");
            return t;
        }
    }

    public static void main(String[] args) {

//        int corePoolSize, Number of banks in the core thread pool
//        int maximumPoolSize, maximum thread pool
//        long keepAliveTime, 60 idle time: non-core thread
//        TimeUnit unit, time unit seconds, milliseconds, days
//        BlockingQueue <Runnable> workQueue, blocking queue [ArrayBlockingQueue] [LinkedBlockingQueue] Specifies queue size
//        ThreadFactory threadFactory, Thread Project, Execution Thread Name
//        RejectedExecutionHandler handler Rejection Policy [Throw Exception, Throw Task] [Throw Exception, Don't Throw Task] [Throw Oldest Task, New Task Add Queue] [Task Main Thread Runs]

        RejectedExecutionHandler handler=new ThreadPoolExecutor.DiscardOldestPolicy();

        ThreadPoolExecutor threadPoolExecutor =
        new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(20), new ThreadFactoryDemo(), handler);

        //A thread was created, with 3 core threads, 10 maximum threads, and 20 queues. The rejection policy is to discard old tasks and add them to new queues.

        threadPoolExecutor.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"===========Print Jay Chou is a custom thread");
        
            }
        });
      
        //Configure Thread Pool [io-intensive] [cpu-intensive]
    }


}

Thread pool added process

//        int corePoolSize, Number of banks in the core thread pool
//        int maximumPoolSize, maximum thread pool
//        long keepAliveTime, 60 idle time: non-core thread
//        TimeUnit unit, time unit seconds, milliseconds, days
//        BlockingQueue <Runnable> workQueue, blocking queue [ArrayBlockingQueue] [LinkedBlockingQueue] Specifies queue size
//        ThreadFactory threadFactory, Thread Factory, Custom Thread Name
//        RejectedExecutionHandler handler Rejection Policy [Throw Exception, Throw Task] [Throw Exception, Don't Throw Task] [Throw Oldest Task, New Task Add Queue] [Task Main Thread Runs]

Add Task: First determine if the number of core threads is full, and then hand it over directly to the core thread to perform the task, otherwise it will put the task in the queue and wait for the core thread to read.

If the core + queue is full. A non-core thread is created to process the task. Rejection policy is enabled until the maximum thread pool is full and the core + queue + maximum number of threads are full.

If there are no tasks beyond idle time, keep AliveTime deletes non-core threads.

Thread pool rejection four major rejection policies

Rejection Policy

ThreadPoolExecutor.AbortPolicy

ThreadPoolExecutor.DiscardPolicy

[Discard old tasks, add queues to new tasks] ThreadPoolExecutor.DiscardOldestPolicy

[Tasks run by the main thread (caller)] ThreadPoolExecutor.CallerRunsPolicy

//ThreadPoolExecutor.AbortPolicy
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@5cad8086 rejected from java.util.concurrent.ThreadPoolExecutor@6e0be858[Running, pool size = 10, active threads = 10, queued tasks = 20, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at com.company.ExecurotrsDemo.main(ExecurotrsDemo.java:42)
[Task Main Thread Run) ThreadPoolExecutor.CallerRunsPolicy 
Jay Chou's Customized Threads===========Print Jay Chou is a custom thread
main===========Print Jay Chou is a custom thread

FutureTask&Future

Provides the ability to cancel task results and block the main thread from getting results. [Feature: Cancel Task + Block for Callable Return Value]

boolean cancel(boolean mayInterruptIfRunning)Attempt to cancel this task.
V get() throws InterruptedException, ExecutionExceptionWait for the calculation to complete and then retrieve the results. [Blocked Function]
boolean isCancelled()true`If this task is cancelled before it is completed
boolean isDone()true if this task is completed

Future

package com.company;

import java.util.concurrent.*;

public class FutureDemo {

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(3);

        Future<String> submit = executorService.submit(new Callable<String>() {

            @Override
            public String call() throws Exception {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Ouch good oh~~~~~");
                return "Jay Chou";
            }
        });

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    //    submit.cancel(false);

        try {
            System.out.println("Is it cancelled"+submit.isCancelled());
            System.out.println("Is the task completed"+submit.isDone());
            if(!submit.isCancelled()){
                String s = submit.get();
                System.out.println(s);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }


    }

}
package com.company;

import java.util.concurrent.*;

public class FutureDemo {

    public static void main(String[] args) {

        //Features: Cancel Task + Block to get Callable return value
        ExecutorService executorService = Executors.newFixedThreadPool(3);

            FutureTask task=new FutureTask(new Callable<String>() {

                @Override
                public String call() throws Exception {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Ouch good oh~~~~~");
                    return "Jay Chou";
                }
            });
         executorService.submit(task);

        try {
            Object o = task.get();
            System.out.println(o);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

ForkJoinPool&Fork/Join Framework

1, jdk 1.7 ForkJoinPool handles CPU-intensive tasks. Replace [supplemented] threadpoolexecutor [Fork/Join+ForkJoinPool]

CPU intensive: operations, ternary, plus minus, multiply and divide. if switch is not an IO operation [look up database io]

A simple understanding of ForkJoinPool:

Thread pool, idea of work stealing or dividing and conquering: full interest CPU Multi-core performance

[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-Ze0gq2YE-1621751537651) (C:\UsersAdministratorAppDataRoamingTyporatypora-user-imagesimage-queue 9.png)]

Where can Fork/Join be used?

ForkJoinTask class: +ForkJoinPool fast sum

Split a large task into smaller tasks. Finally, the sum yields the total number [Split Small Tasks + Iterative Recursive Thought]

RecursiveTask Processing tasks with return values are implemented ForkjoinTask
RecursiveAction Processing task has no return value and is implemented ForkjoinTask
fork() :Submit Task to Queue.Execute.
join() : Merge, return result set
package com.company;

import com.sun.javafx.image.IntPixelGetter;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * ForkJoinTask+ForkjoinPool Calculate one and split by [CPU intensive]
 */
public class ForkJoinPoolTest extends RecursiveTask<Integer> {
    //Minimum Split Quantity
    int min=3;
    int start;
    int end;

    public ForkJoinPoolTest(int start, int end) {
        this.start = start;
        this.end = end;
    }

    public static void main(String[] args) {

        //Runtime.getRuntime().availableProcessors()) resembles core threads
        ForkJoinPool pool=new ForkJoinPool();
        ForkJoinTask<Integer> submit = pool.submit(new ForkJoinPoolTest(1,1000000));
        Integer integer = null;
        try {
            integer = submit.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println(integer);

    }


    //Split the task and return a sum
    @Override
    protected Integer compute() {

        //No splitting [range too small] minus, less than 31,3-1=2<=3,8,10=2
        if(end-start<=min){
                int sum=0;
                for(int i=start;i<=end;i++){
                      sum+=i;
                }

                 return sum;
        //split
        }else{
             int mid=(end+start)/2;
             ForkJoinPoolTest test1=new ForkJoinPoolTest(start,mid); // 0,5
             ForkJoinPoolTest test2=new ForkJoinPoolTest(mid+1,end);//6,10

              //split
             test1.fork();
             test2.fork();

            //Get small task results
             Integer join1 = test1.join();
             Integer join2 = test2.join();
             int sum= join1+join2;
            System.out.println(Thread.currentThread().getName()+"====="+sum);
            //Returns the sum
            return sum;

        }
    }
}

CountDownLatch-Synchronization Counter

What is a synchronization counter: Let one thread wait (block/wait wait.await(), and the other wake the waiting thread after each thread has completed its own task.

int i=10; --i 0;

Scenario: First step check, multithreading turned on, second batch repository.

void await()Meaning of blocking waiting
public void countDown()Similar i-

Initial value: countDown will be reduced by 1. Until it becomes 0. awaken

package com.company;


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountdownLatchTest {

    public static void main(String[] args) {
        //Number of count threads for construction method
        CountDownLatch latch=new CountDownLatch(10);
        ExecutorService executorService = Executors.newFixedThreadPool(10);

        //First step verification
        for(int i=0;i<10;i++)
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"Start Task");
                //Reduce 1
                latch.countDown();
            }
        });

        try {
            //Bulk repositories cannot be synchronized in one step such as join
            System.out.println("Start Thread Waiting");

            //Condition await(); After 0, wake up automatically
            latch.await();

            System.out.println("Thread is awakened");

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Integrated AQS state=10 quantity + CAS dead cycle

Semaphore-semaphore

What is a semaphore: the lobby of a bank. Number of parking spaces: [Role of flow restriction]

Scenario: The number of database connection pools is not exhausted.

acquire()Acquire a license, reduce the number of licenses available by one if available and return immediately
void release()Issuing licenses increases the number of licenses available by one.
package com.company;


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreTest {

    public static void main(String[] args) {

        //Only 10 threads can execute simultaneously
        Semaphore semaphore=new Semaphore(10);

        ExecutorService executorService = Executors.newFixedThreadPool(21);

        //First step verification
        for(int i=0;i<21;i++)
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        //Reduce 1
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName()+"In the bank lobby");
                        TimeUnit.SECONDS.sleep(3);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println(Thread.currentThread().getName()+"Out of the bank lobby");
                        //Add 1 out
                        semaphore.release();
                    }
                }
            });
    }
}
LockSupport.park(this);Blocking method, LockSupport.unpark(s.thread); 
One is minus 1, one plus 1,

CyclicBarrier-Cyclic Barrier

What is a circular fence: Multiple threads have one wave of operations. Then when the operation is complete. You can start an event (thread Runable) and return an operation (listening for events)

public int await() throws InterruptedException Wait, minus 1

Scenario: Sum: Last thread operation listens for events

package com.company;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {
    public static void main(String[] args) {

        class CycLicDemo extends Thread{
            CyclicBarrier cyclicBarrier;

            public CycLicDemo(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier=cyclicBarrier;
            }

            @Override
            public void run() {
                super.run();
                try {
                    cyclicBarrier.await();
                    cyclicBarrier.await();
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName()+"I wake up");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }

            }
        }


        CyclicBarrier cyclicBarrier=new CyclicBarrier(3, new Runnable() {
            @Override
            public void run() {
                System.out.println("completion of enforcement");
            }
        });


        for (int i=0;i<3;i++){

            new CycLicDemo(cyclicBarrier).start();

        }


    }
}
Principle: await();ReentrantLock,Condition.Wake up all threads. Subtract, wake up all threads if equal to 0, but before that, call run Method. trip.await();

Added by printf on Fri, 11 Feb 2022 09:54:44 +0200