Blocking queues and producer consumer cases

Blocking queue, as the name suggests, is a queue first, and the role of blocking queue in data structure is:

Thread 1 adds elements to the queue, and thread 2 takes elements from the queue.
When the blocking queue is empty, the operation of fetching elements from the queue will be blocked.
When the blocking queue is full, adding elements to the queue is blocked.

Threads trying to get elements from an empty blocking queue will be blocked until other threads insert new elements into the empty queue.
Similarly, a thread trying to add elements to a full blocking queue will also be blocked until another thread removes one or more elements from the queue or completely empties the queue, making the queue idle again and adding new ones later.

In the field of multithreading: the so-called blocking will suspend the thread (i.e. blocking) in some cases. Once the conditions are met, the suspended thread will be awakened automatically.

Why BlockingQueue?

The advantage is that you don't need to care when you need to block the thread and wake up the thread, because all this is arranged by BlockingQueue.

Common types

  • ArrayBlockingQueue: a bounded blocking queue consisting of an array structure.
  • LinkedBlockingQueue: a bounded (but the default size is Integer.MAX_VALUE) blocking queue composed of a linked list structure.
  • PriorityBlockingQueue: an unbounded blocking queue that supports prioritization.
  • DelayQueue: delay unbounded blocking queue implemented using priority queue.
  • Synchronous queue: a blocking queue that does not store elements, that is, a queue of individual elements.
  • LinkedTransferQueue: an unbounded blocking queue with a linked list structure.
  • LinkedBlockingDeque: a bidirectional blocking queue composed of a linked list structure.

Method introduction

Throw exceptionWhen the blocking queue is full, add ing an element to the queue will throw IllegalStateException: Queue full; When the blocking queue is empty, removing elements from the queue will throw NoSuchElementException
Special valueInsert method, success true, failure false; Remove the method and successfully return the elements out of the queue. If there are no elements in the queue, null will be returned
blockWhen the blocking queue is full, the producer thread continues to put elements into the queue, and the queue will block the producer thread until the queue is full; When the blocking queue is empty, the consumer thread attempts to take elements from the queue, and the queue will block the consumer thread until there are elements in the queue.
overtime

case

Throw exception
add

package juc;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueDemo {

    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));
        //At this time, the queue is full, and the IllegalStateException will be thrown when adding again
        System.out.println(blockingQueue.add("x"));

    }

}


remove

package juc;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueDemo {

    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));

        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());

        //At this time, the queue is empty. Deleting it again will throw NoSuchElementException
        System.out.println(blockingQueue.remove());


    }

}


Special value

package juc;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueDemo {

    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("a"));
        //Insert failed, return false
        System.out.println(blockingQueue.offer("x"));

        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        //Failed to fetch element, null returned
        System.out.println(blockingQueue.poll());
    }

}


block
put

package juc;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

        blockingQueue.put("a");
        blockingQueue.put("a");
        blockingQueue.put("a");
        System.out.println("=======");
        //If it is full, it will be blocked if you continue to insert
        blockingQueue.put("x");


    }

}


take

package juc;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

        blockingQueue.put("a");
        blockingQueue.put("a");
        blockingQueue.put("a");

        
        blockingQueue.take();
        blockingQueue.take();
        blockingQueue.take();

        System.out.println("=======");
        //If it is empty, it will block
        blockingQueue.take();


    }

}



overtime

package juc;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

        System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));

        //After two seconds of full blocking, if it is still full, false is returned
        System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
    }

}

SynchronousQueue

Blocking queue for a single element. That is, if there is an element in the queue, it will not continue to be added to the queue.

case

package juc;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class SynchronousBlockingDemo {
    public static void main(String[] args) {
        BlockingQueue<String> objects = new SynchronousQueue<>();

        //If it is another blocked queue, it will put continuously, but only one synchronous queue will be put in. If it is not fetched, it will not be put in again
        new Thread(()->{
            try{
                System.out.println(Thread.currentThread().getName()+"\t put 1");
                objects.put("1");
                System.out.println(Thread.currentThread().getName()+"\t put 2");
                objects.put("2");
                System.out.println(Thread.currentThread().getName()+"\t put 3");
                objects.put("3");
            }catch (Exception e){
                e.printStackTrace();
            }
        },"t1").start();



        new Thread(()->{
            try{
                try{
                    TimeUnit.SECONDS.sleep(5);
                }catch(InterruptedException e){
                    e.printStackTrace();
                }

                System.out.println(Thread.currentThread().getName()+"\t"+objects.take());
                try{
                    TimeUnit.SECONDS.sleep(5);
                }catch(InterruptedException e){
                    e.printStackTrace();
                }

                System.out.println(Thread.currentThread().getName()+"\t"+objects.take());
                try{
                    TimeUnit.SECONDS.sleep(5);
                }catch(InterruptedException e){
                    e.printStackTrace();
                }

                System.out.println(Thread.currentThread().getName()+"\t"+objects.take());
            }catch (Exception e){
                e.printStackTrace();
            }
        },"t2").start();
    }
}

The difference between synchronized and lock

Original composition

  • synchronized is a keyword and belongs to the jvm level. Its bottom layer is implemented through monitor. monitorenter indicates entry and monitorexit indicates departure.
  • Lock is a concrete class and an api level.

usage method

  • Synchronized does not need to release the lock manually. When the code is executed, the system will automatically let the thread release the occupation of the lock.
  • ReentrantLock requires the user to release the lock manually. If the lock is not released actively, it may lead to deadlock. The Lock() and unLock() methods are required to be completed with the try/catch statement block.

Is waiting interruptible

  • Synchronized cannot be interrupted unless an exception is thrown or normal operation is completed.
  • ReentrantLock can be interrupted. Set the timeout method trylock (long timeout, timeunit); In LockInterruptibly(), the interrupt() method can be interrupted.

Is locking fair

  • synchronized unfair lock.
  • ReentrantLock is a non fair lock by default. The constructor can pass in a boolean value. true is a fair lock and false is a non fair lock.

Lock binding multiple conditions

  • synchronized No.
  • ReentrantLock is used to wake up the threads required for grouping. It can wake up accurately, rather than waking up one thread randomly or all threads like synchronized.
package juc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 *  Multiple threads are called in order to start three threads a - > b - > C:
 *  AA Print 5 times, BB print 10 times, CC print 15 times..... Ten rounds.
 */

class ShareResource{

    private int number = 1;
    private Lock lock = new ReentrantLock();
    private Condition c1 = lock.newCondition();
    private Condition c2 = lock.newCondition();
    private Condition c3 = lock.newCondition();
    public void print5(){
        lock.lock();
        try{
            while (number!=1){
                c1.await();
            }

            for (int i = 1; i <= 5 ; i++) {
                System.out.println(Thread.currentThread().getName()+"\t "+number);
            }

            number = 2;
            c2.signal();

        } catch(Exception e){
            e.printStackTrace();
        }finally{
            lock.unlock();
        }
    }


    public void print10(){
        lock.lock();
        try{
            while (number!=2){
                c2.await();
            }

            for (int i = 1; i <= 10 ; i++) {
                System.out.println(Thread.currentThread().getName()+"\t "+number);
            }

            number = 3;
            c3.signal();

        } catch(Exception e){
            e.printStackTrace();
        }finally{
            lock.unlock();
        }
    }

    public void print15(){
        lock.lock();
        try{
            while (number!=3){
                c3.await();
            }

            for (int i = 1; i <= 15 ; i++) {
                System.out.println(Thread.currentThread().getName()+"\t "+number);
            }

            number = 1;
            c1.signal();

        } catch(Exception e){
            e.printStackTrace();
        }finally{
            lock.unlock();
        }
    }

}

public class SynchronizedReentrantDemo {
    public static void main(String[] args) {

        ShareResource shareResource = new ShareResource();

        new Thread(()->{
            for (int i = 1; i <= 10 ; i++) {
                shareResource.print5();
            }
        },"AA").start();

        new Thread(()->{
            for (int i = 1; i <= 10 ; i++) {
                shareResource.print10();
            }
        },"BB").start();

        new Thread(()->{
            for (int i = 1; i <= 10 ; i++) {
                shareResource.print15();
            }
        },"CC").start();

    }
}

Blocking queue usage: producer consumer mode

Traditional edition

package juc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

//For a variable with an initial value of 0, two threads operate alternately, one plus 1 and the other minus 1, for 5 rounds.
public class ProdConsumer_TraditionDemo {
    public static void main(String[] args) {
        ShareData shareData = new ShareData();

        new Thread(()->{
            for (int i = 1; i <=5 ; i++) {
                try {
                    shareData.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"t1").start();


        new Thread(()->{
            for (int i = 1; i <=5 ; i++) {
                try {
                    shareData.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"t2").start();
    }
}

class ShareData{
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    //Addition method
    public void increment() throws InterruptedException {
        lock.lock();
        try{
            //If the value of the current thread is not 0, do not add and wait.

            /**
             * Prevent false wakes: in the single parameter version, interrupts and false wakes are possible, and this method should always be used in the loop
             * As in the one argument version, interrupts and spurious wakeups are possible, and this method should always be used in a loop:
             *
             *      synchronized (obj) {
             *          while (<condition does not hold>)
             *              obj.wait();
             *          ... // Perform action appropriate to condition
             *      }
             */
            while (number!=0){
                //wait for
                condition.await();
            }

            number++;
            System.out.println(Thread.currentThread().getName()+"\t "+number);

            condition.signalAll();
        } catch(Exception e){
            e.printStackTrace();
        }finally{
            lock.unlock();
        }
    }


    //Subtraction method
    public void decrement() throws InterruptedException {
        lock.lock();
        try{
            while (number==0){
                //wait for
                condition.await();
            }

            number--;
            System.out.println(Thread.currentThread().getName()+"\t "+number);

            condition.signalAll();
        } catch(Exception e){
            e.printStackTrace();
        }finally{
            lock.unlock();
        }
    }

}

Blocking queue version

package juc;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class MyResource{
    private volatile boolean flag = true; //Enabled by default, production and consumption
    private AtomicInteger atomicInteger = new AtomicInteger();

    BlockingQueue<String> blockingQueue = null;

    public MyResource(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
        System.out.println(blockingQueue.getClass().getName());
    }

    public void myProd() throws Exception{
        String data = null;
        boolean retValue;
        while (flag){
            data = atomicInteger.incrementAndGet()+"";
            retValue =  blockingQueue.offer(data,2L, TimeUnit.SECONDS);
            if(retValue){
                System.out.println(Thread.currentThread().getName()+"\t Insert queue"+data+"success");
            }else{
                System.out.println(Thread.currentThread().getName()+"\t Insert queue"+data+"fail");
            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName()+"\t End of production");
    }

    public void myConsumer() throws Exception{
        String result = null;
        while (flag){
            result = blockingQueue.poll(2L,TimeUnit.SECONDS);
            if(result == null || result.equalsIgnoreCase("")){
                flag = false;
                System.out.println(Thread.currentThread().getName()+"\t Consumption exit");
                System.out.println();
                System.out.println();
                return;
            }
            System.out.println(Thread.currentThread().getName()+"\t Consumption queue"+result+"success");
        }
    }

    public void stop(){
        flag = false;
    }

}

public class ProdConsumer_BlockQueueDemo {

    public static void main(String[] args) {
        MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"\t Production thread start");
            try {
                myResource.myProd();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"Prod").start();


        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"\t Production thread start");
            try {
                myResource.myConsumer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"Consumer").start();

        try{
            TimeUnit.SECONDS.sleep(5);
        }catch(InterruptedException e){
            e.printStackTrace();
        }

        System.out.println();
        System.out.println();
        System.out.println();
        System.out.println("5 Seconds to, end");

        myResource.stop();

    }
}

Keywords: Java Multithreading queue

Added by TTT on Thu, 30 Dec 2021 09:46:31 +0200