CountDownLatch&Semaphore&CyclicBarrier&Executors of concurrent programming

1, Semaphore

Semaphore definition

Semaphore literally means semaphore. Its function is to control the number of threads accessing specific resources. The bottom layer depends on the State of AQS. It is a tool class commonly used in production.

Common methods of Semaphore

Construction method

public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
  • permits indicates the number of threads allowed
  • fair indicates fairness. If this is set to true, the next thread to execute will be the thread that waits the longest

common method

public void acquire() throws InterruptedException

public void release()

tryAcquire(long timeout, TimeUnit unit)
  • acquire() indicates blocking and
  • release() means to release the license

Basic use

Usage scenario:
Resource access and service flow restriction (in Hystrix, the flow restriction is based on semaphore).

Code implementation:

package com.jihu.test.semaphore;

import java.util.concurrent.Semaphore;

public class SemaphoreRunner {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);
        for (int i = 0; i < 5; i++) {
            new Thread(new Task(semaphore, "xiao yan+" + i)).start();
        }
    }

    static class Task extends Thread {
        Semaphore semaphore;

        public Task(Semaphore semaphore, String tname) {
            this.semaphore = semaphore;
            this.setName(tname);
        }

        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + ":aquire() at time:" + System.currentTimeMillis());
                Thread.sleep(1000);
                semaphore.release();
                System.out.println(Thread.currentThread().getName() + ":aquire() at time:" + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

result:

Thread-1:aquire() at time:1622590044923
Thread-3:aquire() at time:1622590044924

Thread-3:aquire() at time:1622590045931
Thread-5:aquire() at time:1622590045931

Thread-7:aquire() at time:1622590045932
Thread-1:aquire() at time:1622590045932

Thread-9:aquire() at time:1622590046931
Thread-5:aquire() at time:1622590046931

Thread-7:aquire() at time:1622590046932
Thread-9:aquire() at time:1622590047932

Process finished with exit code 0

It can be seen from the print results that only two threads execute acquire() at a time. Only after the thread performs the release() method can other threads execute acquire().

2, CountDownLatch

definition

CountDownLatch enables a thread to wait for other threads to finish their work before executing. For example, the main thread of an application wants to execute after the thread responsible for starting the framework service has started all the framework services. Wait for all threads in the thread pool to finish executing before ending.

Usage scenario:
Zookeeper distributed lock, Jmeter simulation, high concurrency, etc.

working principle

CountDownLatch is implemented through a counter. The initial value of the counter is the number of threads. Every time a thread completes its task, the value of the counter will be reduced by 1. When the counter value reaches 0, it indicates that all threads have completed the task, and then the thread waiting on the lock can resume executing the task.

Common API

CountDownLatch.countDown()  // The number of threads in the wait counter is decremented by 1
CountDownLatch.await();  // Wait for all threads in the counter to complete execution

==============  Source code   ==============
public void countDown() {
        sync.releaseShared(1);
  }

Application scenario example

For example, accompany your daughter-in-law to see a doctor.

There are many people lining up in the hospital. If you are alone, you should see the doctor first, and then go to the queue to pay for the medicine. Now we are dual core and can do these two things at the same time (multithreading). Suppose it takes 3 seconds to see the doctor and 5 seconds to queue up to pay for the medicine. If we do it at the same time, we can finish it in five seconds, and then go home together (back to the main thread).

The code is as follows:

/**
 * Doctor mission
 * */
public class SeeDoctorTask implements Runnable {
    private CountDownLatch countDownLatch;

    public SeeDoctorTask(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    public void run() {
        try {
            System.out.println("Start seeing a doctor");
            Thread.sleep(3000);
            System.out.println("After seeing the doctor, get ready to leave the ward");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (countDownLatch != null)
                countDownLatch.countDown();
        }
    }
}

/**
 * Queued tasks
 */
public class QueueTask implements Runnable {

    private CountDownLatch countDownLatch;

    public QueueTask(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    public void run() {
        try {
            System.out.println("Began to queue up at the hospital pharmacy to buy medicine....");
            Thread.sleep(5000);
            System.out.println("If the queue is successful, you can start paying for medicine");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (countDownLatch != null)
                countDownLatch.countDown();
        }
    }
}
/**
 * When it's her turn to see a doctor
 * I began to line up to pay.
 */

public class CountDownLaunchRunner {
    public static void main(String[] args) throws InterruptedException {
        long now = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(2);

        new Thread(new SeeDoctorTask(countDownLatch)).start();
        new Thread(new QueueTask(countDownLatch)).start();
        //Wait until the execution of two tasks in the thread pool is completed, otherwise it will continue
        countDownLatch.await();
        long time = System.currentTimeMillis() - now;
        System.out.println("over,get home cost:" + time);
    }
}

3, CyclicBarrier

Fence barrier, so that a group of threads will be blocked when they reach a barrier (also known as synchronization point). The barrier will not open until the last thread reaches the barrier, and all threads intercepted by the barrier will continue to run.

The default construction method of CyclicBarrier is CyclicBarrier (int parties). Its parameter indicates the number of threads intercepted by the barrier. Each thread calls the await method to tell CyclicBarrier that I have reached the barrier, and then the current thread is blocked.

Common methods

cyclicBarrier.await();

Application scenario

It can be used in the scenario of multi-threaded calculation of data and finally merging the calculation results. For example, an Excel is used to save all the user's Bank flow, and each sheet is used to save each bank flow of an account for nearly a year. Now it is necessary to count the user's daily average bank flow. First, use multiple threads to process the bank flow in each sheet, and then get the daily average bank flow of each sheet. Finally, use the calculation results of these threads with barrierAction, Calculate the daily average bank flow of the whole Excel. Example code:

public class CyclicBarrierRunner implements Runnable {
    private CyclicBarrier cyclicBarrier;
    private int index;

    public CyclicBarrierRunner(CyclicBarrier cyclicBarrier, int index) {
        this.cyclicBarrier = cyclicBarrier;
        this.index = index;
    }

    public void run() {
        try {
            System.out.println("index: " + index);
            index--;
            cyclicBarrier.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() {
            public void run() {
                System.out.println("All agents reached the barrier and were ready to start a secret mission");
            }
        });
        for (int i = 0; i < 10; i++) {
            new Thread(new CyclicBarrierRunner(cyclicBarrier, i)).start();
        }
        cyclicBarrier.await();
        System.out.println("All reach the barrier....");
    }
}

result:

index: 0
index: 1
index: 2
index: 3
index: 4
index: 5
index: 6
index: 7
index: 8
index: 9
 All agents reached the barrier and were ready to start a secret mission
 All reach the barrier....

Process finished with exit code 0

4, Executors

It is mainly used to create thread pool. It acts as an agent for the creation of thread pool, which makes your creation entry parameters simple.

Related to thread pool can be found in this article: Java Concurrent ThreadPoolExecutor principle source code explanation- https://blog.csdn.net/qq_43631716/article/details/114788744

Important methods

  • newCachedThreadPool: create a cacheable thread pool. If the length of the thread pool exceeds the processing needs, you can flexibly recycle idle threads. If there is no recyclable thread, you can create a new thread.
  • newFixedThreadPool: create a fixed length thread pool to control the maximum concurrent number of threads. The exceeded threads will wait in the queue.
  • newScheduledThreadPool: create a fixed length thread pool to support scheduled and periodic task execution.
  • Newsinglethreadexecution creates a singleton thread pool, which only uses a unique worker thread to execute tasks, ensuring that all tasks are executed in the specified order (FIFO, LIFO, priority).

Disadvantages (not recommended by Alibaba):

The thread pool created by Executors uses unbounded queues, which will bring many disadvantages. The most important thing is that it can save tasks indefinitely, so it is likely to cause OOM exceptions. At the same time, in some types of thread pools, the use of unbounded queue will also lead to the invalidation of parameters such as maximumpoolsize, keepAliveTime and handler

CachedThreadPool: this maximum creates an integer MAX_ Value is a thread, which will consume a lot of CPU resources.

5, Exchange

When one thread runs to the exchange() method, it will block. When another thread runs to exchange(), they exchange data and then execute the following program.

There are few application scenarios. You can understand them.

package com.jihu.test.exchanger;

import java.util.concurrent.Exchanger;

public class ExchangerTest {

    public static void main(String[] args) {
        final Exchanger<Integer> exchanger = new Exchanger<Integer>();
        for (int i = 0; i < 10; i++) {
            final Integer num = i;
            new Thread() {
                public void run() {
                    System.out.println("I am a thread: Thread_" + this.getName() + "My data is:" + num);
                    try {
                        Integer exchangeNum = exchanger.exchange(num);
                        Thread.sleep(1000);
                        System.out.println("I am a thread: Thread_" + this.getName() + "My original data is:" + num + " , The data after exchange is:" + exchangeNum);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }.start();
        }
    }
}

Keywords: Java Multithreading Concurrent Programming

Added by ca87 on Wed, 02 Feb 2022 14:12:53 +0200