Four common concurrent tool classes in JUC

JUC is Java util. Concurrent package, commonly known as JUC, contains some things to solve concurrency problems.
This package is located under the rt.jar package under java

Four common concurrent tool classes:

CountDownLatch

CountDownLatch is a class I use more at present. A count will be given during the initialization of CountDownLatch, and then the count will be reduced by 1 every time countDown() is called,

When the count does not reach 0, the await() method is blocked until the count is reduced to 0.

Usage scenario: it is used to divide tasks, which are executed by multiple threads. For example, when a watercress crawler was recently written, it needs to crawl the first five pages of each movie, which can be divided into five threads to process data. Via latch Await () guarantees that all are completed before returning.

    public void latch() throws InterruptedException {
        int count= 5;
        CountDownLatch latch = new CountDownLatch(count);
        for (int x=0;x<count;x++){
            new Worker(x*20,latch).start();
        }
        latch.await();
        System.out.println("All implemented");
    }
    
    class Worker extends Thread{
        Integer start;
        CountDownLatch latch;
        public Worker(Integer start,CountDownLatch latch){
            this.start=start;
            this.latch=latch;
        }        @Override
        public void run() {
            System.out.println(start+" executed");
            latch.countDown();
        }
    }

The output is as follows:

20 executed
0 executed
40 executed
60 executed
80 executed
 All implemented

CyclicBarrier

It allows a group of threads to wait for each other until they reach a common barrier point, that is, blocking the call to cyclicbarrier Await ().

It seems that CyclicBarrier is similar to CountDownLatch in function, java training It is explained in the description of CountDownLatch on the official doc that the count of CountDownLatch cannot be reset. If you need to reset the count, please consider using CyclicBarrier.

At the beginning of the CyclicBarrier, you can also add a parameter of Runnable, which is executed by the last thread entering the CyclicBarrier after the number of cyclicbarriers reaches and before all other threads are awakened

Usage scenario: similar to CyclicBarrier, but CyclicBarrier provides several methods that countdowncatch does not have to deal with more complex scenarios, such as:

getNumberWaiting() gets the number of blocked threads,

isBroken() is used to know whether the blocked thread is interrupted or not.

reset() resets the barrier to its initial state. If all participants are currently waiting at the barrier, they will return with a BrokenBarrierException thrown.

    public void latch() throws InterruptedException {
        int count = 5;
        CyclicBarrier cb = new CyclicBarrier(count, new Runnable() {
            @Override
            public void run() {
                System.out.println("All implemented");
            }
        });
        ExecutorService executorService = Executors.newFixedThreadPool(count);
        while (true){
            for (int x=0;x<count;x++){
                executorService.execute(new Worker(x,cb));
            }
        }
    }    
    
    class Worker extends Thread {
        Integer start;
        CyclicBarrier cyclicBarrier;        public Worker(Integer start, CyclicBarrier cyclicBarrier) {
            this.start = start;
            this.cyclicBarrier = cyclicBarrier;
        }        @Override
        public void run() {
            System.out.println(start + " executed");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

The output is as follows:

0 executed
3 executed
4 executed
2 executed
1 executed
 All implemented
0 executed
1 executed
2 executed
3 executed
4 executed
 All implemented

Semaphore

Semaphore semaphore maintains a license set. Each time it is used, execute acquire() to obtain a license from semaphore. If not, it will be blocked. After each use, execute release() to release the license. Beijing java training
Usage scenario: Semaphore is used to control resources. For example, the data connection is limited, and Semaphore is used to limit the number of threads accessing the database.

    public void latch() throws InterruptedException, IOException {
        int count = 5;
        Semaphore semaphore = new Semaphore(1);
        ExecutorService executorService = Executors.newFixedThreadPool(count);
            for (int x=0;x<count;x++){
                executorService.execute(new Worker(x,semaphore));
            }
        System.in.read();
    }    
    
    class Worker extends Thread {
        Integer start;
        Semaphore semaphore;        public Worker(Integer start, Semaphore semaphore) {
            this.start = start;
            this.semaphore = semaphore;
        }        @Override
        public void run() throws IllegalArgumentException {
            try {
                System.out.println(start + " Ready to execute");
                TimeUnit.SECONDS.sleep(1);
                semaphore.acquire();
                System.out.println(start + " Already implemented");
                semaphore.release();
                System.out.println(start + " Released");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }        }
    }

The output is as follows:

0 Ready to execute
2 Ready to execute
1 Ready to execute
3 Ready to execute
4 Ready to execute
2 Already implemented
2 Released
4 Already implemented
4 Released
1 Already implemented
1 Released
0 Already implemented
0 Released
3 Already implemented
3 Released

Exchanger

Exchange is used for data exchange between two threads. It provides a synchronization point where two threads can exchange data with each other.
Usage scenario: two threads wait for processing results and transfer data to each other.

    public void latch() throws InterruptedException, IOException {
        int count = 5;
        Exchanger<String> exchanger = new Exchanger<>();
        ExecutorService executorService = Executors.newFixedThreadPool(count);
            for (int x=0;x<count;x++){
                executorService.execute(new Worker(x,exchanger));
            }
        System.in.read();
    }    
    
    class Worker extends Thread {
        Integer start;
        Exchanger<String>  exchanger;        public Worker(Integer start, Exchanger<String> exchanger) {
            this.start = start;
            this.exchanger = exchanger;
        }        @Override
        public void run() throws IllegalArgumentException {
            try {
                System.out.println(Thread.currentThread().getName() + " Ready to execute");
                TimeUnit.SECONDS.sleep(start);
                System.out.println(Thread.currentThread().getName() + " Waiting for exchange");
                String value = exchanger.exchange(Thread.currentThread().getName());
                System.out.println(Thread.currentThread().getName() + " The data exchanged are:"+value);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }        }
    }

The output is as follows:

pool-1-thread-1 Ready to execute
pool-1-thread-1 Waiting for exchange
pool-1-thread-3 Ready to execute
pool-1-thread-2 Ready to execute
pool-1-thread-5 Ready to execute
pool-1-thread-4 Ready to execute
pool-1-thread-2 Waiting for exchange
pool-1-thread-1 The data exchanged are: pool-1-thread-2
pool-1-thread-2 The data exchanged are: pool-1-thread-1
pool-1-thread-3 Waiting for exchange
pool-1-thread-4 Waiting for exchange
pool-1-thread-4 The exchanged data are: pool-1-thread-3
pool-1-thread-3 The data exchanged are: pool-1-thread-4
pool-1-thread-5 Waiting for exchange

Exchange must appear in pairs, otherwise, like the above code execution results, pool-1-thread-5 will always block the threads waiting to exchange data with them. To avoid this phenomenon, you can use exchange(V x, long timeout, TimeUnit unit) to set the maximum waiting time.

Keywords: JUC

Added by NCC1701 on Tue, 18 Jan 2022 03:07:06 +0200