JUC Note Arrangement

JUC

Silicon Valley JUC Video

1.Summary

The difference between threads and processes

Processes are the basic unit of program running, and a program running is a process.

Threads are the basic unit of cpu scheduling and share memory

1.1 JUC

juc is short for the java.util.courrent toolkit and appeared after jdk1.5

1.2 Thread State

New, Ready, Run, Blocked, Dead

1.3 Differences between wait and sleep

Sleep needs to specify a sleep time, wait does not need

sleep is a method of the Thread class, wait is an object method, and must be used in the synchronized block of code

sleep does not release locks on objects, wait releases locks on objects

1.4 Daemon and non-daemon threads

A daemon thread is a background process that stops when all user threads are finished

Non-daemon threads: normal threads

2. Lock interface

2.1 Synchronized

Modify the synchronization code block or synchronization method

synchronized definition method, cannot be inherited

Method: The common method lock is this, while the static method lock is a Class class

Code block: Monitor object synchronized(this) is locked

class Ticket {
	//Number of votes
	private int number = 30;
	//Operation method: selling tickets
	public synchronized void sale() {
	//Judgment: Is there a ticket
		if(number > 0) { 
            System.out.println(Thread.currentThread().getName()+" :"+(number--)+" "+number);
     }
    }
}

2.2 Lock

The difference between Lock and Synchronized:

Synchronized does not need to release the lock manually, Lock needs to release the lock manually

Synchronized is JVM level, Lock is API level

Synchronized is an unfair lock, Lock can be a fair lock and an unfair lock

Synchronized cannot be interrupted, Lock can be interrupted and waked up precisely

2.3 Lock interface

public interface Lock 
{ void lock();
  void lockInterruptibly() throws InterruptedException;
  boolean tryLock();
  boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  void unlock();
  Condition newCondition();
}
1. lock

Acquire locks If locks are acquired by other threads, enter the wait

Need to release lock manually

Lock lock = new ReetrantLock();
lock.lock();
try {
    
}catch(Exeception e) {
    
}finally {
    lock.unlock();
}
2.newCondition

Synchronized wait() and notify () to achieve wait and notification

Lock waits for notification by returning a Condition object with newCondition()

await() == wait()

sigal() == notify()

2.4 ReentrantLock

Re-lockable

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Airmal{
    private int number;
    public Lock lock=new ReentrantLock();
    public  Condition condition=lock.newCondition();
    public void incrnumber(){
        lock.lock();

        try {
           while (number!=0){
               condition.await();
           }
           number++;
           System.out.println(Thread.currentThread().getName()+"\t"+number);
           condition.signalAll();
        }catch (Exception e){

        }finally {
            lock.unlock();
        }
    }
    public void decrenumber(){
        lock.lock();

        try {
              while (number==0){
                  condition.await();
              }
              number--;
              System.out.println(Thread.currentThread().getName()+"\t"+number);
              condition.signalAll();
        }catch (Exception e){

        }finally {
            lock.unlock();
        }
    }

}
public class ProdeCustomerLockDemo {
    public static void main(String[]ags){
        Airmal airmal=new Airmal();
        new Thread(()->{
           for (int i=1;i<=10;i++){
               airmal.incrnumber();
           }
        },"A").start();

        new Thread(()->{
            for (int i=1;i<=10;i++){
                airmal.decrenumber();
            }
        },"B").start();

        new Thread(()->{
            for (int i=1;i<=10;i++){
                airmal.incrnumber();
            }
        },"C").start();
        new Thread(()->{
            for (int i=1;i<=10;i++){
                airmal.decrenumber();
            }
        },"D").start();
    }
}
 
 

Precise wake-up

A B C three threads start, to A-B-C threads execute in turn, A prints 5 times. B prints 10 times, C prints 15 times

Idea: One lock with multiple keys, judged by the mark position

//Condition Precision Control

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * A B C Three threads
 * A -B-C Execute in sequence
 * A Print 5 times
 * B Print 10
 * C Print 15 times
 */
class Airmals{
    private int number=1;  //Flag 1:A B:2 C:3
    private Lock lock=new ReentrantLock();
    Condition c1=lock.newCondition();
   Condition c2=lock.newCondition();
   Condition c3=lock.newCondition();
    public void pring5(){
        lock.lock();
        try {
              while (number!=1){
                  c1.await();
              }
           for (int i=1;i<=5;i++){
               System.out.println(Thread.currentThread().getName()+"\t"+i);
           }

           number=2;
        c2.signal(); //Wake up B
        }catch (Exception e){

        }finally {
             lock.unlock();
        }

    }

    public void pring10(){
        lock.lock();
        try {
             while (number!=2){
                 c2.await();
             }
             for (int i=1;i<=10;i++){
                 System.out.println(Thread.currentThread().getName()+"\t"+i);
             }
             number=3;
             c3.signal();
        }catch (Exception e){

        }finally {
            lock.unlock();
        }
    }

   public  void pring15(){
          lock.lock();
          try {
              while (number!=3){
                  c3.await();
              }
              for (int i=1;i<=15;i++){
                  System.out.println(Thread.currentThread().getName()+"\t"+i);
              }
              number=1;
              c1.signal();
          }catch (Exception e){

          }finally {
              lock.unlock();
          }
   }

}
public class ConditionDemo {
    public static void main(String[] args) {
        Airmals airmals=new Airmals();
        new Thread(()->{
                 for (int i=1;i<=10;i++){
                     airmals.pring5();
                 }
        },"A").start();

        new Thread(()->{
            for (int i=1;i<=10;i++){
                airmals.pring10();
            }
        },"B").start();

        new Thread(()->{
            for (int i=1;i<=15;i++){
                airmals.pring15();
            }
        },"C").start();
    }
}
 
 

2.5 ReadWirteLock

This is an interface that implements the class ReentrantReadWriteLock

There are many rich methods available in ReentrantReadWriteLock, but there are two main ones: readLock() and writeLock() to acquire read and write locks.

To solve the inefficiency of Lock lock for both read and write, ReadWriteLock cannot read and write locks

//Multiple threads operate on a resource
//Read-Read can exist
//Read-Write cannot coexist
//Write-Write cannot coexist
class  MyLock{
    private Map<String,Object> mylock=new HashMap<>();
    private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
    public  void put(String key,Object value) throws InterruptedException {
        readWriteLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"\t"+"---Write data");
            TimeUnit.MICROSECONDS.sleep(300);
            System.out.println(Thread.currentThread().getName()+"\t"+"---Write data successfully");
        }catch (Exception e){
        }finally {
            readWriteLock.writeLock().unlock();
        }

    }
    public void get(String key) throws InterruptedException {
        readWriteLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"\t"+"----Read data");
            TimeUnit.MICROSECONDS.sleep(300);
            System.out.println(Thread.currentThread().getName()+"\t"+"---Read data successfully");
        }catch (Exception e){

        }finally {
            readWriteLock.readLock().unlock();
        }

    }
}
public class ReadWriteLockDemo {
    public static void main(String[] args) {
       MyLock myLock=new MyLock();
       for (int i=1;i<=3;i++){
           final  String temp= valueOf(i);
           //Write operation
           new Thread(()->{
               try {
                   myLock.put(temp, new Object());
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }, valueOf(i)).start();

       }
        for (int i=1;i<=3;i++){
            final  String temp= valueOf(i);
            new Thread(()->{
                try {
                    myLock.get(temp);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, valueOf(i)).start();
        }
    }
}

1 - Write data
1 - Successful writing of data
2 - Write data
2 - Successful writing of data
3 - Write data
3 - Successful writing of data
1 - Read data
2 - Read data
3 - Read Data
1 - Successful reading of data
2 - Successful reading of data
3 - Successful reading of data

3. Communication between threads

Communication model: shared memory and messaging

3.1 synchronized scheme

High cohesion and low coupling

/**
 * Generator consumer demo
 * Thread programming:
 *    1.High cohesion, low coupling, thread manipulation resources
 *    2.Judgment/Work/Notification
 *    3.Prevent false wake-up
 */

class Airmant{
    private  int number=0;

    public synchronized void  increnumber() throws InterruptedException {
        //judge
        while (number!=0){
            this.wait();
        }
        //work
        number++;
        System.out.println(Thread.currentThread().getName()+"\t"+number);
        //notice
        this.notifyAll();
    }
    public  synchronized  void decrennumber() throws InterruptedException {
        //judge
        while (number==0){
            this.wait();
        }
        //work
        number--;
        System.out.println(Thread.currentThread().getName()+"\t"+number);

        //notice
        this.notifyAll();

    }
}
public class ProdeCustomerDemo {
     public static void  main(String[]ags){
         Airmant airmant=new Airmant(); //High Cohesion

         new Thread(()->{

                      try {
                       for(int i=1;i<=10;i++){
                           airmant.increnumber();
                       }
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }

         },"A").start();

         new Thread(()->{
             try {
                 for(int i=1;i<=10;i++){
                     airmant.decrennumber();
                 }

             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         },"B").start();


     }
}
 
 

3.2 Lock scheme

class Airmal{
    private int number;
    public Lock lock=new ReentrantLock();
    public  Condition condition=lock.newCondition();
    public void incrnumber(){
        lock.lock();
        try {
           while (number!=0){
               condition.await();
           }
           number++;
           System.out.println(Thread.currentThread().getName()+"\t"+number);
           condition.signalAll();
        }catch (Exception e){

        }finally {
            lock.unlock();
        }
    }
    public void decrenumber(){
        lock.lock();

        try {
              while (number==0){
                  condition.await();
              }
              number--;
              System.out.println(Thread.currentThread().getName()+"\t"+number);
              condition.signalAll();
        }catch (Exception e){

        }finally {
            lock.unlock();
        }
    }

}
public class ProdeCustomerLockDemo {
    public static void main(String[]ags){
        Airmal airmal=new Airmal();
        new Thread(()->{
           for (int i=1;i<=10;i++){
               airmal.incrnumber();
           }
        },"A").start();

        new Thread(()->{
            for (int i=1;i<=10;i++){
                airmal.decrenumber();
            }
        },"B").start();

        new Thread(()->{
            for (int i=1;i<=10;i++){
                airmal.incrnumber();
            }
        },"C").start();
        new Thread(()->{
            for (int i=1;i<=10;i++){
                airmal.decrenumber();
            }
        },"D").start();
    }
}

4. Collection Threads

java.util.ConcurrentModificationExceptio

public class NotSafeDemo {
/**
* Multiple threads modify the collection at the same time
* @param args
*/
public static void main(String[] args) {  
    List list = new ArrayList();
    for (int i = 0; i < 100; i++) {
        new Thread(() ->{
           list.add(UUID.randomUUID().toString());
            list.forEach(System.out::println);
        }, "thread" + i).start();
     }
   }
}

4.1 List

Bottom level: The new ArrayList is essentially an array of new Object types

The default size is 10, which is initialized when jdk1.7 is 10, and jdk1.8 is initialized when add is lazy to load

Default expansion is 1.5 times the original capacity

The underlying copy uses Arrays.copyof() - > System.copyof()

Solution

Vector

public class nosaleCollection {
    public static void main(String[]ags){
       /* List <String>list=new ArrayList();*/
        List <String>list=new Vector<>();
        for(int i=1;i<=30;i++){
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0,8));
                System.out.println(list);
            },String.valueOf(i)).start();
        }

    }
}

Collections.synchronized

public class nosaleCollection {
    public static void main(String[]ags){
        List<String>list= Collections.synchronizedList(new ArrayList<>());
        for(int i=1;i<=30;i++){
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0,8));
                System.out.println(list);
            },String.valueOf(i)).start();
        }

    }
}

CopyOnWriteArrayList Copy as Write

Use the idea of read-write separation to ensure the final consistency. When writing data, create a new container, copy the original data, add elements to the new container, and point the original reference to the new container

Advantages: Read-write separation, high concurrency

Disadvantages: New containers need to be created, possibly frequently GC

Read data inconsistently when writing. Strong consistency is not guaranteed and final consistency is guaranteed

public class nosaleCollection {
    public static void main(String[]ags){
        List<String> list=new CopyOnWriteArrayList<>();
        for(int i=1;i<=30;i++){
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0,8));
                System.out.println(list);
            },String.valueOf(i)).start();
        }

    }
}
 

4.2 Map

The default size is 16, which doubles the original capacity, such as 16 to 32

Solve

Collections.synchronizedMap

ConcurrentHashMap

HashTable

4.3 Set

HashSet uses HashMap at the bottom

HashSet's add() method has only one parameter plus hashMap (key,value) because haspMap(e,object)

Solution

Collections.synchronizedSet

public class nosaleCollection {
    public static void main(String[]ags){
        Set <String>set=Collections.synchronizedSet(new HashSet<>());
        for (int i=1;i<=30;i++){
            new Thread(()->{
                set.add(UUID.randomUUID().toString().substring(0,8));
            },String.valueOf(i)).start();
        }

    }

    private static void nosaftList() {
        List<String> list=new CopyOnWriteArrayList<>();
        for(int i=1;i<=30;i++){
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0,8));
                System.out.println(list);
            },String.valueOf(i)).start();
        }
    }
}

CopyOnWriteHashSet

public class nosaleCollection {
    public static void main(String[]ags){
        Set <String>set=new CopyOnWriteArraySet<>();
        for (int i=1;i<=30;i++){
            new Thread(()->{
                set.add(UUID.randomUUID().toString().substring(0,8));
                System.out.println(set);
            },String.valueOf(i)).start();
        }

    }

    private static void nosaftList() {
        List<String> list=new CopyOnWriteArrayList<>();
        for(int i=1;i<=30;i++){
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0,8));
                System.out.println(list);
            },String.valueOf(i)).start();
        }
    }
}

5. Multithreaded problem

class Phone {
    public static synchronized void sendSMS() throws Exception {
  	//Stay for 4 seconds at TimeUnit.SECONDS.sleep(4);
    System.out.println("------sendSMS");
	}
	public synchronized void sendEmail() throws Exception {
        System.out.println(" ---- sendEmail");
    }
	public void getHello() { 
        System.out.println(" --- getHello");
	}
}

1 Standard Access, Print SMS or Mail first ------- sendSMS --------- sendEmail

2 Stop for 4 seconds in SMS method, print SMS or email first ------- sendSMS --------- sendEmail

3 New common Hello method is text messaging or hello ----- getHello ------- sendSMS

4 Now there are two mobile phones, print SMS or email first ------- sendEmail ------- sendSMS

5 Two static synchronization methods, one mobile phone, print SMS or email first ------- sendSMS --------- sendEmail

6 Two static synchronization methods, 2 Mobile phones, print SMS or email first ------- sendSMS --------- sendEmail

7 1 static synchronization method, 1 common synchronization method, 1 mobile phone, print SMS or email first ------- sendEmail --------- sendSMS

81 static synchronization methods, 1 common synchronization method, 2 Mobile phones, print SMS or email first ------- sendEmail --------- sendSMS

It is manifested in the following three forms.

For normal synchronization methods, locks are the current instance object.

For static synchronization methods, locks are Class objects of the current class.

For synchronization method blocks, locks are objects configured in Synchonized parentheses

6. Callable

6.1 How threads are implemented

  • Inherit Thread

  • Implement Runnable Interface

  • Implement Callable Interface

  • Thread pool creation

6.2 Callable interface

//Thread interface callable with return value
class MyThread implements Callable<Integer>
{

    @Override
    public Integer call() throws Exception {
        System.out.println("======Callable==========");
        return 1024;
    }
}
public class CallbackDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask futureTask = new FutureTask(new MyThread());
        new Thread(futureTask,"a").start();
        Integer task = (Integer) futureTask.get();
        System.out.println(task);
    }
}
 

6.3 Future Interface

public boolean cancel(boolean mayInterrupt): Used to stop tasks
 If it has not been started, it will stop the task. If it has been started, only mayInterrupt by true Will interrupt the task
 public Object get()Throw InterruptedException,ExecutionException: Used to get the results of a task. If the task is completed, it will return the results immediately, otherwise it will wait for the task to complete and then return the results    
• public boolean isDone(): If the task is completed, return true,Otherwise return false

6.4 FutureTask

When a time-consuming operation needs to be performed in the main thread, but you don't want to block the main thread, you can leave these jobs behind to the Future object to complete in the background

  • Future pair calculation results when the main thread needs them
  • Used mostly for time-consuming calculations

7. Three JUC auxiliary classes

7.1 CountDownLatch

Decrease Count

CountDownLatch class sets a counter

Reduce 1 by countDown, use await wait counter no more than 0, and let the statement after await execute

countdownlatch calls the await method, this thread will block

countDown method counter minus 1

When the counter is 0, await blocked threads wake up

//Six people left before main closed
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
      CountDownLatch countDownLatch=new CountDownLatch(6);
        for (int i=1;i<=6;i++)
        {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"\t  Leave Classroom");
                countDownLatch.countDown();
            },String.valueOf(i)).start();
        }
        countDownLatch.await();
        System.out.println("main close");
    }
}

7.2 CyclicBarrier

Circulating fence

CyclicBarrier, looking at the English words, can see that it probably means circular blocking. The first parameter of CyclicBarrier's construction method in use is the number of target barriers. The number of obstacles per execution of CyclicBarrier is increased by one. If the number of target barriers is reached, the statement after cyclicBarrier.await() will be executed.

public class CyclicBrrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
            System.out.println("*****Call Dragon");
        });
        for (int i=1;i<=7;i++){
            final  Integer temp=i;
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"\t Collected"+temp+"Dragon bead");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}

7.3 Semaphore

The first parameter passed in Semaphore's construction method is the maximum semaphore, which can be viewed as the maximum thread pool, and each semaphore is initialized to a maximum of one license for distribution. Use the acquire method to obtain a license, and the release method to release the license

Scene: Grab parks, 6 cars, 3 parking spaces

public class SemaphoreDemo {
    public static void main(String[] args) {
        Semaphore semaphore=new Semaphore(3); //Simulated resource class with 3 empty parking spaces
        for (int i=1;i<=6;i++){
            new Thread(()->{
                try {
                    semaphore.acquire(); //Number-1
                    System.out.println(Thread.currentThread().getName()+"\t Grab parking space");
                    TimeUnit.SECONDS.sleep(4);
                    System.out.println(Thread.currentThread().getName()+"\t Leave parking");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release(); //Number + 1
                }
            },String.valueOf(i)).start();
        }
    }
}

8. Read and write locks

Writing is less frequent when reading and writing shared resources.

No locks required for reading

ReentrantReadWriteLock Represents two locks, one read as a shared lock and one write-related lock as an exclusive lock.

Read Lock Read Lock Only

Write Lock Other unreadable and write locks

//Multiple threads operate on a resource
//Read-Read can exist
//Read-Write cannot coexist
//Write-Write cannot coexist
class  MyLock{
    private Map<String,Object> mylock=new HashMap<>();
    private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
    public  void put(String key,Object value) throws InterruptedException {
        readWriteLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"\t"+"---Write data");
            TimeUnit.MICROSECONDS.sleep(300);
            System.out.println(Thread.currentThread().getName()+"\t"+"---Write data successfully");
        }catch (Exception e){
        }finally {
            readWriteLock.writeLock().unlock();
        }

    }
    public void get(String key) throws InterruptedException {
        readWriteLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"\t"+"----Read data");
            TimeUnit.MICROSECONDS.sleep(300);
            System.out.println(Thread.currentThread().getName()+"\t"+"---Read data successfully");
        }catch (Exception e){

        }finally {
            readWriteLock.readLock().unlock();
        }

    }
}
public class ReadWriteLockDemo {
    public static void main(String[] args) {
       MyLock myLock=new MyLock();
       for (int i=1;i<=3;i++){
           final  String temp= valueOf(i);
           //Write operation
           new Thread(()->{
               try {
                   myLock.put(temp, new Object());
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }, valueOf(i)).start();

       }
        for (int i=1;i<=3;i++){
            final  String temp= valueOf(i);
            new Thread(()->{
                try {
                    myLock.get(temp);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, valueOf(i)).start();
        }
    }
}

9. Blocking queues

9.1 BlockingQueue

Queue FIFO

Blocking queue has blocking function, fetching elements when there are no elements will block the thread to release CPU

Blocked when adding elements when the queue is full

Will automatically wake up the thread

Method TypethrowSpecial Valuesblockovertime
insertadd(e)offer(e)put(e)offer(e,time,unit)
removeremove()poll()take()poll(time,unit)
inspectelement()peek()Not availableNot available
throwWhen the blocking queue is full, throw an exception at add. When the blocking queue is empty, remove throws an exception
Special ValuesInsert method, successful true otherwise false remove method, successful queue element, null if none
Always BlockedWhen the queue is full, blocking or responding to interruptions is empty, take keeps blocking consumer threads
Timeout ExitThe queue is full and blocks the producer for some time.
/**
* Blocking Queue
*/
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
// List list = new ArrayList();
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
//first group
// System.out.println(blockingQueue.add("a"));
// System.out.println(blockingQueue.add("b"));
// System.out.println(blockingQueue.add("c"));
// System.out.println(blockingQueue.element());
//System.out.println(blockingQueue.add("x"));
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
// Group 2
// System.out.println(blockingQueue.offer("a"));
// System.out.println(blockingQueue.offer("b"));
// System.out.println(blockingQueue.offer("c"));
// System.out.println(blockingQueue.offer("x"));
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll());
// Group 3
// blockingQueue.put("a");
// blockingQueue.put("b");
// blockingQueue.put("c");
// //blockingQueue.put("x");
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take());
// Group 4
   System.out.println(blockingQueue.offer("a"));
   System.out.println(blockingQueue.offer("b"));
   System.out.println(blockingQueue.offer("c"));
   System.out.println(blockingQueue.offer("a",3L, TimeUnit.SECONDS));
 }
}

9.2 Kinds

1.ArrayBlockingQueue

Bounded blocking queue of arrays

Maintained a fixed-length array

Save two integer variables to identify the position of the queue's head and tail in the array

Producer puts in data and consumer consumption data and shares a single object lock, LinkedBlockingQueue does not use the same object lock

Unlike LinkedBlockingQueue, insertion and deletion results in an additional Node object, ArrayBlockingQueue does not

2. LinkedBlockingQueue

A blocked queue of chained lists

Size is Integer.MAX_VALUE

The queue consists of a list of chains

LinkedBlockingQueue is able to process concurrent data efficiently because it uses separate locks for both producers and consumers to control data synchronization.

3.PriorityBlockingQueue

Unbounded blocking queue for priority ordering

Priority is determined by Compator object

Not blocking producers, only consumers when there is no data they can consume

Producer production data must never be faster than consumer consumption data

4.SynchronousQueue

A blocking queue that does not store elements, that is, a queue of individual elements

5. DelayQueue

Delayed Unbounded Blocking Queue Using Priority Queue

An element in a DelayQueue can only be retrieved from a queue if its specified delay time is reached. A DelayQueue is a queue with no size limit, so the operation to insert data into the queue (producer) will never be blocked, and only the operation to get data (consumer) will be blocked.

6. LinkedTransferQueue

An unbounded blocking queue consisting of a list of chains

Preemptive mode

When consumers get elements, if the queue is not empty, take elements

The queue is empty, and a node (element is null) is generated. When the generator joins the queue, it is found to be a null node. The producer does not join the queue and fills the element. The node wakes up the waiting thread, and the waked thread fetches the element.

7. LinkedBlockingDeque

A two-way blocking queue consisting of a list of chains

9.3 Summary

  1. In a multi-threaded realm: so-called blocking, threads are suspended (i.e., blocked) in some cases, and once conditions are met, suspended threads are automatically awakened

  2. Why do I need BlockingQueue?Before the concurrent package was released, in a multi-threaded environment, each of us programmers had to control these details on our own, with particular regard to efficiency and thread security, which caused considerable complexity to our programs. After use, we didn't need to worry about when we needed to block threads or wake them up, because all of this BlockingQueue happenedAll done for you

10. Thread pool

Pool holding threads

No need to create and destroy threads every time to reduce resource destruction

Increase response speed

Easy Thread Management

[External chain picture transfer failed, source station may have anti-theft chain mechanism, it is recommended to save the picture and upload it directly (img-gESvOyeU-1632504298472)(img/Sogou Truncation Figure 2210731222039.png)]

10.1 Core Parameters

  • Number of corePoolSize core threads
  • maxinumPoolSize Maximum Threads
  • Ke keepAliveTime idle thread lifetime
  • Unit time unit
  • workQueue blocking queue
  • threadFactory Factory
  • handler rejection policy

10.2 Principle

Submit task, current number of threads < corepoolsize Create Core Thread Processing

CurrtThreadNum > corePoolSize but put into blocking queue when blocking queue is not full

Create non-core threads to process when the blocking queue is full and the number of threads <= maxinumpoolsize

Rejection policy when number of threads > maxinumpoolsize

10.3 Rejection Policy

AbortPolicy: Discard Tasks, Thread Pool Default Rejection Policy

CallerRunsPolicy: Return to caller

DiscardPolicy: Discard directly

DiscardOldestPolicy: Trigger rejection policy, discard oldest one

10.4 Executors Tool Class

1.newCachedThreadPool

Role: Create a cacheable thread pool that allows for flexible recycling of idle threads if the length of the thread pool exceeds processing requirements, or a new thread if none is available

  • No fixed number of threads to reach Integer.MAX_VALUE
  • Thread pools can reuse and recycle caches
  • A thread is recreated when no thread is available in the thread pool
/**
* Cacheable Thread Pool
* @return
*/
public static ExecutorService newCachedThreadPool(){
/**
* corePoolSize Number of core threads in the thread pool
* maximumPoolSize Maximum number of threads to accommodate
* keepAliveTime Idle Thread Lifetime
* unit Unit of time for survival
* workQueue Store queues submitted but not executed
* threadFactory Factory class for creating threads: can be omitted
* handler Rejection policy after waiting for the queue to be full: can be omitted
*/
return new ThreadPoolExecutor(0,
	Integer.MAX_VALUE,
	60L,
	TimeUnit.SECONDS,
	new SynchronousQueue<>(),
	Executors.defaultThreadFactory(),
	new ThreadPoolExecutor.AbortPolicy());
}

Scenario: For scenarios where you create an infinitely expandable thread pool with less server load pressure, shorter execution time, and more tasks

2.newFixedThreadPool

Create a thread pool that can reuse a fixed number of threads

Threads in the thread pool are at a certain amount, which is a good way to control the concurrency of threads

* Threads can be reused and will persist until the display is closed

* Wait in queue when more than a certain number of threads are submitted

/**
* Fixed-length thread pool
* @return
*/
public static ExecutorService newFixedThreadPool(){
/**
* corePoolSize Number of core threads in the thread pool
* maximumPoolSize Maximum number of threads to accommodate
* keepAliveTime Idle Thread Lifetime
* unit Unit of time for survival
* workQueue Store queues submitted but not executed
* threadFactory Factory class for creating threads: can be omitted
* handler Rejection policy after waiting for the queue to be full: can be omitted
*/
return new ThreadPoolExecutor(10,
	10,
	0L,
	TimeUnit.SECONDS,
	new LinkedBlockingQueue<>(),
	Executors.defaultThreadFactory(),
	new ThreadPoolExecutor.AbortPolicy());
}

Scenario: For scenarios where the number of threads can be predicted in a business or where the server is heavily loaded and there is a strict limit on the number of threads

3.newSingleThreadExecutor

Create an Executor that uses a single worker thread

Feature: A maximum of one thread is executed in the thread pool, and subsequently submitted thread activities are queued for execution

/**
* Single Thread Pool
* @return
*/
public static ExecutorService newSingleThreadExecutor(){
/**
* corePoolSize Number of core threads in the thread pool
* maximumPoolSize Maximum number of threads to accommodate
* keepAliveTime Idle Thread Lifetime
* unit Unit of time for survival
* workQueue Store queues submitted but not executed
* threadFactory Factory class for creating threads: can be omitted
* handler Rejection policy after waiting for the queue to be full: can be omitted
*/
return new ThreadPoolExecutor(1,
	1,
	0L,
	TimeUnit.SECONDS,
	new LinkedBlockingQueue<>(),
	Executors.defaultThreadFactory(),
	new ThreadPoolExecutor.AbortPolicy());
}

Scenario: For scenarios where you need to ensure that tasks are executed sequentially and that there are no multiple threads at any point in time

public class ThreadPoolDemo1 {
/**
* Three ticket outlets in the railway station, 10 users buy tickets
* @param args
*/
public static void main(String[] args) {
//Timer threads: 3 threads - 3 windows
ExecutorService threadService = new ThreadPoolExecutor(3,
	3,
	60L,
	TimeUnit.SECONDS,
	new LinkedBlockingQueue<>(),
	Executors.defaultThreadFactory(),
                                                       
    new ThreadPoolExecutor.DiscardOldestPolicy());
	try {
		//Ten people buy tickets
		for (int i = 1; i <= 10; i++) {
            	threadService.execute(()->{
					try {
						System.out.println(Thread.currentThread().getName() + "window,Start selling tickets");
						Thread.sleep(5000);
						System.out.println(Thread.currentThread().getName() + "End of window ticket");
					}catch (Exception e){
                        e.printStackTrace();
   					});
         
 	}catch (Exception e){
             e.printStackTrace();
    }finally {
     //End after completion
     }

10.5 Thread Creation Method

Threads are not created through Executors because the LinkedBlockingQueue used at the bottom of FixedThreadPool and SingleThreadExecutor has a maximum length of Integer.MAX_VALUE (OOM), and the maximum number of threads for CacheThreadPool is Integer.MAX_VALUE(CPU pressure)

10.6 Manual Creation

new ThreadPoolExecutor to create objects

11.Fork /Join

fork splits large tasks into small ones

join merges tasks

1.Fork

1.ForkJoinTask

First you need to create a ForkJoin task. This class provides a mechanism to execute fork and join in a task. Normally, we don't need to integrate the ForkJoinTask class directly, we just need to inherit its subclasses. The Fork/Join framework provides two subclasses:

a.RecursiveAction: For tasks that do not return results

b.RecursiveTask: Task for returning results

2.ForkJoinPool

ForkJoinPool: ForkJoinTask needs to be executed via ForkJoinPool

3.RecursiveTask:

Tasks that can be invoked recursively (self-adjusting) after inheritance

2. Examples

/**
* Recursive Accumulation
*/
public class TaskExample extends RecursiveTask<Long> {
    private int start;
    private int end;
	private long sum;
	/**
	* Constructor
	* @param start
	* @param end
	*/
	public TaskExample(int start, int end){
        this.start = start;
        this.end = end;
    }
	/**
	* The main computation performed by this task.
	*
	* @return the result of the computation
	*/
    @Override
	protected Long compute() {
		System.out.println("task" + start + "=========" + end + "Accumulation Start");
		//More than 100 additive slices, less than direct additions
		if(end - start <= 100){
			for (int i = start; i <= end; i++) {
				//accumulation
				sum += i;
			}
		}else {
			//Divide into 2 pieces
			int middle = start + 100;
			//Recursive call, divided into two small tasks
			TaskExample taskExample1 = new TaskExample(start, middle);
			TaskExample taskExample2 = new TaskExample(middle + 1, end);
			//Execution: Asynchronous
			taskExample1.fork();
			taskExample2.fork();
			//Synchronize blocking to get execution results
			sum = taskExample1.join() + taskExample2.join();
		}
			//Add Back
			return sum;
	}
}
public class ForkJoinPoolDemo {
/**
* Generate a calculation task, calculate 1+2+3.... +1000
* @param args
*/
public static void main(String[] args) {
	//Define Tasks
	TaskExample taskExample = new TaskExample(1, 1000);
	//Define execution object
	ForkJoinPool forkJoinPool = new ForkJoinPool();
	//Join Task Execution
	ForkJoinTask<Long> result = forkJoinPool.submit(taskExample);
	//Output Results
	try {
		System.out.println(result.get());
	}catch (Exception e){
        e.printStackTrace();
	}finally {
		forkJoinPool.shutdown();
    }
  }
}

Keywords: Java JUC

Added by starsol on Fri, 24 Sep 2021 19:58:17 +0300