Multithreading case

catalogue

1, Singleton mode

1. What is singleton mode

2. Single case pattern classification

2, Producer consumer model

1. What is the blocking queue

2. Blocking queue in standard library

3. Producer consumer model

3, Timer

1. What is the timer

2. Timer in standard library

3. Implement timer

4, Thread pool

1. What is thread pool

2. Thread pool in standard library

3. Implement thread pool

1, Singleton mode

1. What is singleton mode

Design pattern is like "chess score" in chess The red side takes the lead and the black side jumps For some moves of the red side, there are some fixed routines when the black side responds If you follow the routine, the situation will not suffer

There are also many common "problem scenarios" in software development In response to these problem scenes, the bosses summed up some fixed routines If you implement the code according to this routine, you won't suffer any loss

2. Single case pattern classification

Hungry man model

Class is loaded and an instance is created

//First create a class that represents a singleton
    //We require that the class of Singletion can only have one instance
    //Hungry man mode, "hungry" means that when the class is loaded, the instance will be created
    static class Singletion{
        //When the constructor is made private, the instance of new cannot be created outside the class
        private Singletion(){}

        //Create a static member that represents a unique instance of the class
        private static Singletion instance = new Singletion();

        public static Singletion getInstance() {
            return instance;
        }
    }

    public static void main(String[] args) {
        Singletion s1 = Singletion.getInstance();
        Singletion s2 = Singletion.getInstance();
        System.out.println(s1 == s2);
    }

Lazy mode

Single thread

class Singleton {
    private static Singleton instance = null;
    private Singleton() {}
    public static Singleton getInstance() {
        if (instance == null) {
            instance = new Singleton();
       }
        return instance;
   }
}

However, the above code has the problem of thread insecurity

For lazy mode, multiple threads call getInstance, which does four things

Read the content of instance

Judge whether instance is empty

If instance is null, a new instance will be created

Return instance address

When a class is new twice, it is not in singleton mode. If there are more threads, there may be more times of new, which does not conform to singleton mode

Multithreading

static class Singletion{
        private Singletion() { }
        private static Singletion instance = null;

        public static Singletion getInstance(){
            if (instance == null){
                instance = new Singletion();
            }
            return instance;
        }

    }

To ensure thread safety, we choose to lock

static class Singletion{
        private Singletion() { }
        private static Singletion instance = null;

        public static Singletion getInstance(){
            synchronized (Singletion.class){
                if (instance == null){
                    instance = new Singletion();
                }
                return instance;
            }
        }
    }

 

When multiple threads call getInstance for the first time, everyone may find that instance is null, so they continue to execute to compete for locks. Among them, the threads that compete successfully will complete the operation of creating instances

When this instance is created, other threads competing for locks are blocked by the inner if layer You will not continue to create other instances

 static class Singletion{
        private Singletion() { }
        private static Singletion instance = null;

        public static Singletion getInstance(){
            if (instance == null){
                synchronized (Singletion.class){
                    if (instance == null){
                        instance = new Singletion();
                    }
                }
            }
            return instance;
        }
    }
}

Multiple read operations are involved and may be optimized by the compiler. In order to avoid the deviation of the read instance caused by "memory visibility", volatile is added.

static class Singletion{
        private Singletion() { }
        private volatile static Singletion instance = null;

        public static Singletion getInstance(){
            if (instance == null){
                synchronized (Singletion.class){
                    if (instance == null){
                        instance = new Singletion();
                    }
                }
            }
            return instance;
        }

    }

In order to ensure thread safety, three key points are involved

1. Lock to ensure thread safety

2. Double if guarantee efficiency

3. volatile avoids the problem of memory visibility

2, Producer consumer model

1. What is the blocking queue

Blocking queue is a special kind of queue Also abide by the "first in, first out" principle Blocking queue can be a thread safe data structure and has the following characteristics:

When the queue is full, the queue will be blocked until another thread takes the element from the queue

When the queue is empty, continuing out of the queue will also block until another thread inserts elements into the queue

A typical application scenario of blocking queue is "producer consumer model" This is a very typical development model

2. Blocking queue in standard library

A blocking queue is built into the Java standard library If we need to use blocking queues in some programs, we can directly use those in the standard library

BlockingQueue is an interface The real implementation class is LinkedBlockingQueue

The put method is used for blocking incoming queues and take for blocking outgoing queues

BlockingQueue also has offer, poll, peek and other methods, but these methods do not have blocking characteristics

BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// Queue
queue.put("abc");
// Get out of the queue If there is no put or take directly, it will block 
String elem = queue.take();

3. Producer consumer model

Producer consumer model is to solve the strong coupling problem between producers and consumers through a container. Producers and consumers do not communicate directly with each other, but communicate through the blocking queue. Therefore, after producing data, producers do not need to wait for consumers to process, but directly throw it to the blocking queue. Consumers do not ask producers for data, but directly take it from the blocking queue

1) The blocking queue is equivalent to a buffer, which balances the processing power of producers and consumers

For example, in the "second kill" scenario, the server may receive a large number of payment requests at the same time If these payment requests are processed directly, the server may not be able to handle them (the processing of each payment request requires a more complex process) At this time, these requests can be put into a blocking queue, and then the consumer thread will slowly process each payment request

2) Blocking queues can also decouple producers and consumers

For example, the family makes dumplings together during the Chinese New Year Generally, there is a clear division of labor. For example, one person is responsible for rolling dumpling skin and others are responsible for making dumplings The person who rolls dumpling skin is the "producer", and the person who makes dumplings is the "consumer" The person who rolls the dumpling skin doesn't care who the person who makes the dumpling is (whether it's made by hand, with the help of tools or machines), and the person who makes the dumpling doesn't care who the person who rolls the dumpling skin is (if there's a dumpling skin, whether it's rolled with a rolling pin, a can bottle, or bought directly from the supermarket)

static class BlockingQueue{
        //Based on linked list
        //Array based
        private int[] array = new int[100];
        private volatile int head = 0;
        private volatile int tail = 0;
        //head and tail constitute a section that is closed in front and opened in back
        //When the two coincide, it may indicate that the queue is empty or full
        //In order to distinguish between empty and full, you need to introduce an additional size to represent it

        private volatile int size = 0;
        //Queued list queue take the first element of the queue
        public void put(int value) throws InterruptedException {
            synchronized (this){
                while (size == array.length){
                    wait();
                }

                //Just put value at the end of the team
                array[tail] = value;
                tail++;
                if (tail == array.length){
                    tail = 0;
                }
                size++;
                notify();
            }
        }

        public int take() throws InterruptedException {
            int ret = -1;
            synchronized (this) {
                while (size == 0) {
                    this.wait();
                }
                ret = array[head];
                head++;
                if (head == array.length) {
                    head = 0;
                }
                size--;
                notify();
            }
            return ret;
        }
    }

    public static void main(String[] args) {
        BlockingQueue blockingQueue = new BlockingQueue();
        //For the first time, let consumers consume faster and producers produce slower
        //You will see that consumers are blocking and waiting. Consumers can only consume when there are new producer production elements
        //Second, let consumers consume slower and producers produce faster
        //You will see that the producer inserts elements into the queue. When the inserted elements are full, the wait will be blocked
        //Then consumers consume one element at a time, and production can produce new elements
        Thread producer = new Thread(){
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++){
                    try {
                        blockingQueue.put(i);
                        System.out.println("Production elements:" + i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        producer.start();
        Thread consumer = new Thread(){
            @Override
            public void run() {
                while (true){
                    try {
                        int ret = blockingQueue.take();
                        System.out.println("Consumption elements:" + ret);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        consumer.start();
    }

There are two conditions for wait to touch the method. One is that the queue is empty and the other is that the queue is full

Features of realizing blocking queue:

1. If the queue is empty, it will be blocked until other threads enter the queue

2. If the queue is full, the queue will be blocked until other threads leave the queue

public static void main(String[] args) {
        BlockingQueue blockingQueue = new BlockingQueue();
        //For the first time, let consumers consume faster and producers produce slower
        //You will see that consumers are blocking and waiting. Consumers can only consume when there are new producer production elements
        //Second, let consumers consume slower and producers produce faster
        //You will see that the producer inserts elements into the queue. When the inserted elements are full, the wait will be blocked
        //Then consumers consume one element at a time, and production can produce new elements
        Thread producer = new Thread(){
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++){
                    try {
                        blockingQueue.put(i);
                        System.out.println("Production elements:" + i);
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        producer.start();
        Thread consumer = new Thread(){
            @Override
            public void run() {
                while (true){
                    try {
                        int ret = blockingQueue.take();
                        System.out.println("Consumption elements:" + ret);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        consumer.start();
    }

or (int i = 0; i < 10000; i++){
                    try {
                        blockingQueue.put(i);
                        System.out.println("Production elements:" + i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        producer.start();
        Thread consumer = new Thread(){
            @Override
            public void run() {
                while (true){
                    try {
                        int ret = blockingQueue.take();
                        System.out.println("Consumption elements:" + ret);
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        consumer.start();
    }

3, Timer

1. What is the timer

Timer is also an important component in software development Similar to an alarm clock After a set time is reached, a specified code is executed

2. Timer in standard library

A standard class is provided in the Timer library The core method of Timer class is schedule

schedule contains two parameters The first parameter specifies the task code to be executed, and the second parameter specifies how long it will be executed (in milliseconds)

Timer timer = new Timer();
timer.schedule(new TimerTask() {
    @Override
    public void run() {
        System.out.println("hello");
   }
}, 3000);

3. Implement timer

Composition of timer:

A blocking team with priority

Each element in the queue is a Task object

The Task has a time attribute, and the first element of the team is the upcoming

At the same time, a worker thread has been scanning the first element of the queue to see if the first element needs to be executed

import java.util.concurrent.PriorityBlockingQueue;

public class Thread4 {

    //The elements of the priority queue must be comparable
    //There are two kinds of comparison rules:
    //1. Let Task implement Comparable interface
    //2. When the priority queue is constructed, an object comparator is passed in
    static class Task implements Comparable<Task>{
        //There is a run method in Runnable. What are the specific tasks to be performed with the help of the run method
        private Runnable command;
        //Time indicates when to execute the command. It is an absolute time
        private long time;
        //Parameters of construction method
        public Task(Runnable command,long after){
            this.command = command;
            this.time = System.currentTimeMillis() + after;
        }

        //Specific logical tasks
        public void run(){
            command.run();
        }

        @Override
        public int compareTo(Task o) {
            return (int) (this.time - o.time);
        }
    }

    static class Worker extends Thread{
        private PriorityBlockingQueue<Task> queue = null;
        private Object mailBox = null;

        public Worker(PriorityBlockingQueue<Task> queue, Object mailBox){
            this.queue = queue;
            this.mailBox = mailBox;
        }

        @Override
        public void run() {
            while (true){
                try{
                    //1. Take out the team head element
                    Task task = queue.take();
                    //2. Check whether the current task time is up
                    long curTime = System.currentTimeMillis();
                    if (task.time > curTime){
                        //It's not time yet. Put the task back in the queue
                        queue.put(task);
                        synchronized (mailBox){
                            mailBox.wait(task.time - curTime);
                        }
                    }else {
                        task.run();
                    }
                }catch (InterruptedException e){
                    e.printStackTrace();
                    break;
                }
            }
        }
    }

    static class Timer{
        //To avoid being busy, you need to use the wait method
        //Use a separate object to assist the wait
        //You can also use this
        private Object mailBox = new Object();
        //Basic composition of timer
        //1. Use a class to describe the team
        //2. Use a blocking priority queue to organize several tasks, so that the first element of the team is the task with the earliest time. If the time of the first element of the team is not up, other elements cannot be executed
        //3. Use a thread to scan the queue head element of the current blocking queue in a loop. If the time is up, let it execute the specified task
        private PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();

        public Timer(){
            //Create thread
            Worker worker = new Worker(queue,mailBox);
            worker.start();

        }
        //4. It is also necessary to provide a method for the caller to "arrange" the task
        public void schedule(Runnable command,long after){
            Task task = new Task(command,after);
            queue.put(task);
            synchronized (mailBox){
                mailBox.notify();
            }
        }
    }

    public static void main(String[] args) {
        Timer timer = new Timer();
        timer.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("he he");
                timer.schedule(this,2000);
            }
        },2000);
    }


}

1. There is a class to describe a task

2. There is a blocking priority queue to organize these tasks

3. There is a scanning thread to scan the first element of the team regularly

4. There is an interface that allows the caller to arrange tasks for the timer

5. Solve busy problems, plus wait and notify

Initially, the queue is empty, and the blocking is to take blocking at the first position. Calling schedlule will wake up the blocking at the take position and get the task.

Get the current time, get the task time, compare and find that the time is not yet up, and continue to wait.

When the time comes, wait returns. In the next cycle, try to get elements from the head of the team. There are elements at the head of the team. They are not blocked and can be taken out directly.

4, Thread pool

1. What is thread pool

Imagine such a scene: a new express store has been opened near the school. The boss is very smart and thinks of a different way to operate it. The store didn't hire people, but every time a business came, it found a classmate to deliver the express, and then fired the classmate. In this analogy, we usually have a task and a thread processing mode.

Soon the boss found that the problem came, and the cost of recruiting + firing students every time was still very high. The boss is still very flexible. He knows why everyone has to hire people, so he has specified an index. The company's business personnel will expand to three, but he will hire people gradually with the business. So there is business again. The boss sees that if there are no three people in the company, hire one person to deliver the express. Otherwise, just put the business on a notebook and wait for the three express workers to deal with it when they are free. This is the pattern of thread pool we want to bring out.

The biggest advantage of thread pool is to reduce the loss of threads every time they are started and destroyed.

2. Thread pool in standard library

Use executors Newfixed ThreadPool (10) can create a fixed thread pool containing 10 threads

The return value type is ExecutorService

Through executorservice Submit can register a task to the thread pool

Several ways for Executors to create thread pool

newFixedThreadPool: create a thread pool with a fixed number of threads

newCachedThreadPool: create a thread pool with dynamically increasing number of threads

newSingleThreadExecutor: create a thread pool that contains only a single thread

newScheduledThreadPool: execute the command after setting the delay time, or execute the command regularly It's an advanced version of Timer

Executors are essentially an encapsulation of the ThreadPoolExecutor class

ThreadPoolExecutor provides more optional parameters to further refine the setting of thread pool behavior

3. Implement thread pool

The core operation is submit, adding the task to the thread pool

Use the Worker class to describe a Worker thread Use Runnable to describe a task

Use a BlockingQueue to organize all tasks

What each worker thread has to do: keep getting tasks from BlockingQueue and executing them

Specify the maximum number of threads in a thread pool maxWorkerCount; When the number of threads exceeds this maximum, no new threads will be added

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Thread5 {

    //Use a class to describe the worker thread
    static class Worker extends Thread{
        private int id = 0;
        //Each Worker thread needs to fetch tasks from the task queue
        //You need to get an instance of the task queue
        private BlockingQueue<Runnable> queue = null;

        public Worker(BlockingQueue<Runnable> queue,int id){
            this.queue = queue;
            this.id = id;
        }

        @Override
        public void run() {
           //The try here wraps the while in order to end the run method (also the thread) as soon as the thread receives an exception
            try {
                while (!Thread.currentThread().isInterrupted()){
                    Runnable command = queue.take();
                    System.out.println("thread: " + id + "running...");
                    command.run();
                }
            }catch (InterruptedException e){
                System.out.println("Thread terminated");
            }
        }
    }

    //The essence is the producer consumption model
    //The code that calls execute is the producer and produces the task
    //A worker is a consumer, a task in the consumer queue
    //The exchange is BlockingQueue
    static class MyThreadPool{
        //Several tasks of blocking queue organization
        private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
        //List organizes several threads
        private List<Worker> workers = new ArrayList<>();
        //Assume 10 thread capacity first
        private static final int maxWorkerCount = 10;

        //Implement the execute method and shutdown method
        public void execute(Runnable command) throws InterruptedException {
            //When the number of threads is small, create a new thread as a worker thread
            //There are enough threads to reach the threshold without creating new threads
            if (workers.size() < maxWorkerCount){
                Worker worker = new Worker(queue,workers.size());
                worker.start();
                workers.add(worker);
            }
            queue.put(command);
        }

        //When shutdown ends, all threads end
        public void shutdown() throws InterruptedException {
            //Terminate all threads
            for (Worker worker : workers){
                worker.interrupt();
            }
            //You also need to wait for each thread to finish executing
            for (Worker worker : workers){
                worker.join();
            }
        }

        static class Command implements Runnable{
            private int num;
            public Command(int num){
                this.num = num;
            }

            @Override
            public void run() {
                System.out.println("Executing task:" + num);
            }
        }

        public static void main(String[] args) throws InterruptedException {
            MyThreadPool myThreadPool = new MyThreadPool();
            for (int i = 0; i < 1000; i++){
                myThreadPool.execute(new Command(i));
            }
            Thread.sleep(2000);
            myThreadPool.shutdown();
            System.out.println("The thread has been destroyed");
        }

    }

}

When the thread pool instance is initially created, there are no threads in the thread pool. When you continue to call execute, the operation of creating threads will be triggered.

interrupt triggers an exception or modifies the flag bit so that the thread run method ends, the thread ends, and the jion ends.

If the queue is empty, take will be blocked until other threads call execute to insert elements

Keywords: Java Back-end Singleton pattern

Added by fohanlon on Thu, 03 Mar 2022 06:11:34 +0200