IV. thread concurrency tools

Concurrent tool classes for threads

I. CountDownLatch

[1] what is CountDownLatch?

CountDownLatch is a synchronization helper class that allows one or more threads to wait until a set of operations are completed in another thread.

Locking can delay the progress of a thread until it reaches the termination state. Locking can be used to ensure that some activities will not continue until other activities are completed:

  • Ensure that a calculation will not continue until all the resources it needs are initialized;
  • Ensure that a service does not start until all other services it depends on have started;
  • Wait until all participants of an operation are ready to continue;

CountDownLatch has a positive counter. The countDown() method subtracts the counter. The wait() method waits for the counter to reach 0. All await threads block until the counter is 0 or wait for a thread to break or time out.

Locking (countdown lock) is mainly used to ensure that the preconditions for completing a task are met. Is a synchronization tool class, used to coordinate the synchronization between multiple threads. This tool is usually used to control thread waiting. It allows a thread to wait until the countdown is finished before execution.

[2] two typical uses of CountDownLatch

① a thread waits for n threads to finish executing before it starts running.

Initialize the counter of CountDownLatch to N: new CountDownLatch(n). Every time a task thread finishes executing, reduce the counter by 1 countdownlatch.countDown(). When the counter value changes to 0, the thread of wait() on CountDownLatch will be woken up. A typical application scenario is that when starting a service, the main thread needs to wait for multiple components to load before continuing to execute.

Secondly, it realizes the maximum parallelism of multiple threads starting to execute tasks.

Note that parallelism, not concurrency, emphasizes the simultaneous execution of multiple threads at a certain time. Similar to a race, put multiple threads at the starting point, wait for the firing gun, and then run at the same time. The method is to initialize a shared CountDownLatch object, and initialize its counter to 1: new CountDownLatch(1). Before multiple threads start executing tasks, they first count down latch. Await(). When the main thread calls countDown(), the counter changes to 0, and multiple threads wake up at the same time.

As shown in the following example, when multithreading is running, calculate the time spent by multithreading:

public class TestCountDownLatch {
    //CountDownLatch is the only shared resource
    static CountDownLatch countDownLatch = new CountDownLatch(5);

    static class LatchDemo extends Thread{
        @Override
        public void run() {
            int sum = 0;
            for (int i = 0; i < 1000000; i++) {
                sum++;
            }
            System.out.println(getName()+"Calculation results:"+sum);
            countDownLatch.countDown();
        }
    }
    public static void main(String[] args) throws InterruptedException {

        long begin = System.currentTimeMillis();
        System.out.println("Here we go-----"+begin);
        for (int i = 0; i < 5; i++) {
            new LatchDemo().start();
        }
        countDownLatch.await();

        long end = System.currentTimeMillis();
        System.out.println("It's over-----"+end);
        System.out.println("Total time:"+(end-begin));
    }
}

/**
Here we go
Thread-3 Calculation result: 1000000
Thread-0 Calculation result: 1000000
Thread-1 Calculation result: 1000000
Thread-2 Calculation result: 1000000
Thread-4 Calculation result: 1000000
 It's over
 Total time: 8
*/

II. CyclicBarrier

[1] what is CyclicBarrier?

CyclicBarrier is a fence class, similar to CountDownLatch. It can block a group of threads until an event occurs. The key difference between a fence and a lock is that all threads must reach the fence at the same time to continue execution.

CyclicBarrier enables a certain number of threads to repeatedly gather at the fence location. When the thread reaches the fence position, it will call the await method, which will block until all threads reach the fence position. If all threads reach the fence position, the fence will open, all threads will be released, and the fence will be reset for next use.

[2] construction method of CyclicBarrier

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

The default construction method of CyclicBarrier is CyclicBarrier(int parties). Its parameters indicate the number of threads blocked by the barrier. Each thread uses the await() method to tell CyclicBarrier that I have reached the barrier, and then the current thread is blocked.

Another constructor of CyclicBarrier, CyclicBarrier(int parties, Runnable barrierAction), is used to perform barrierAction first when the thread reaches the barrier, which is convenient for handling more complex business scenarios.

[3] application example of CyclicBarrier

public class CyclicBarrierTest {
    // Custom worker thread
    private static class Worker extends Thread {
        private CyclicBarrier cyclicBarrier;
        
        public Worker(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        
        @Override
        public void run() {
            super.run();
            
            try {
                System.out.println(Thread.currentThread().getName() + "Start waiting for other threads");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + "Start execution");
                // The worker thread starts processing. Here, Thread.sleep() is used to simulate the business processing.
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + "completion of enforcement");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
 
    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
        
        for (int i = 0; i < threadCount; i++) {
            System.out.println("Create worker thread" + i);
            Worker worker = new Worker(cyclicBarrier);
            worker.start();
        }
    }
}
/**
Create worker thread 0
 Create worker 1
Thread-0 Start waiting for other threads
 Create worker 2
Thread-1 Start waiting for other threads
Thread-2 Start waiting for other threads
Thread-2 Start execution
Thread-0 Start execution
Thread-1 Start execution
Thread-1 completion of enforcement
Thread-0 completion of enforcement
Thread-2 completion of enforcement
*/

In the above code, our custom worker thread must wait until all participating threads start to execute. We can use the CyclicBarrier class to help us. From the execution result of the program, it can also be seen that all the working threads have reached the fence position after running the await() method, and then the three working threads start to perform business processing.

[4] difference between CyclicBarrier and CountDownLatch

  • Counter of CountDownLatch can only be used once, while counter of CyclicBarrier can be reset by reset() method, which can be used many times, so CyclicBarrier can handle more complex scenes;
  • CyclicBarrier also provides some other useful methods. For example, getNumberWaiting() method can get the number of threads blocked by CyclicBarrier. isBroken() method is used to know whether the blocked threads are interrupted.
  • CountDownLatch allows one or more threads to wait for a set of events to occur, while CyclicBarrier is used to wait for other threads to run to the fence location.

III. Semaphore

[1] what is Semaphore?

Semaphore (Semaphore), also known as semaphore, is used to coordinate various threads in a multi-threaded environment to ensure that they can use public resources correctly and reasonably. Semaphores maintain a permission set. When initializing semaphore, we need to pass in a quantity value for this permission set, which represents the number of threads that can access shared resources at the same time.

[2] basic usage of Semaphore

The thread can obtain a license through the acquire() method, and then operate on the shared resources. Note that if the license set has been allocated, the thread will enter a waiting state, and no more license will be obtained until other threads release the license. The thread releases a license through the release() method, and the license will be returned to Semaphore.

[3] Semaphore to realize mutually exclusive lock

public class TestSemaphore {
    //Initialized to 1, mutex semaphore
    private final static Semaphore mutex = new Semaphore(1);

    static class thread extends Thread{
        @Override
        public void run() {
            try {
                mutex.acquire();
                System.out.println(getName()+"Start-up");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                //Use complete release lock
                mutex.release();
                System.out.println("Lock release!!!");
            }
        }
    }
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(new thread(),String.valueOf(i)).start();
        }
    }
}

Create a mutually exclusive Semaphore with a number of 1, and then execute 10 threads concurrently. In the thread, Semaphore is used to control the concurrent execution of the thread. Because the Semaphore value is only 1, only one thread can execute at a time, and other threads enter the waiting state.

IV. Callable, Future and FutureTask

The Future interface is generally used to retrieve the status of Callable execution. The main methods are as follows:

  • Cancel, cancel the execution of the Callable, when the Callable has not been completed
  • Get, get the return value of Callable
  • Iscancelled, judge whether it has been cancelled
  • isDone, judge if it's done

Keywords: Java

Added by monloi on Wed, 16 Oct 2019 12:44:24 +0300