Under the shared model of JUC

Under the shared model of JUC

wait notify

Little story - why wait

Because the conditions are not met, Xiaonan cannot continue the calculation

But if Xiaonan keeps occupying the lock, others will have to block all the time. The efficiency is too low

So Lao Wang opened a single Lounge (calling the wait method) and asked Xiao Nan to wait in the WaitSet. But at this time, the lock was released and others could enter the room at random

Until xiaom sends the cigarette, shout "your cigarette is here" (call the notify method)

Xiao Nan can then leave the lounge and re-enter the queue of competing locks

wait notify principle

  • When the Owner thread finds that the conditions are not met, it calls the wait method to enter the WaitSet and change to the WAITING state
  • Both BLOCKED and WAITING threads are BLOCKED and do not occupy CPU time slices
  • The BLOCKED thread wakes up when the Owner thread releases the lock
  • The WAITING thread will wake up when the Owner thread calls notify or notifyAll, but after waking up, it does not mean that the Owner obtains the lock immediately. It still needs to enter the EntryList to compete again
  • If there are two WAITING threads at the same time, when they are awakened, enter the EntryList first to re compete for the lock. If the competition fails, the thread that does not obtain the lock will enter the WAITING state again

API introduction

  • obj.wait() causes the thread entering the object monitor to wait in the waitSet
  • obj.notify() selects one of the threads waiting for waitSet on the object to wake up
  • obj.notifyAll() wakes up all the threads waiting for waitSet on the object

They are all means of cooperation between threads and belong to the methods of Object objects. You must obtain a lock on this Object to call these methods

The precondition for the execution of notify and notifyall methods is that the object obtains the lock

@Slf4j
public class Main
{
    final static Object obj = new Object();
    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            synchronized (obj) {
                log.debug("implement....");
                try {
                    obj.wait(); // Let the thread wait on obj
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("Other codes....");
            }
        }).start();
        new Thread(() -> {
            synchronized (obj) {
                log.debug("implement....");
                try {
                    obj.wait(); // Let the thread wait on obj
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("Other codes....");
            }
        }).start();
        // The main thread executes in two seconds
        sleep(2);
        log.debug("awaken obj Other threads on");
        synchronized (obj) {
            obj.notify(); // Wake up obj previous thread
            // obj.notifyAll(); //  Wake up all waiting threads on obj
        }
    }
}

A result of notify

20:00:53.096 [Thread-0] c.TestWaitNotify - implement.... 
20:00:53.099 [Thread-1] c.TestWaitNotify - implement.... 
20:00:55.096 [main] c.TestWaitNotify - awaken obj Other threads on
20:00:55.096 [Thread-0] c.TestWaitNotify - Other codes....

Results of notifyAll

19:58:15.457 [Thread-0] c.TestWaitNotify - implement.... 
19:58:15.460 [Thread-1] c.TestWaitNotify - implement.... 
19:58:17.456 [main] c.TestWaitNotify - awaken obj Other threads on
19:58:17.456 [Thread-1] c.TestWaitNotify - Other codes.... 
19:58:17.456 [Thread-0] c.TestWaitNotify - Other codes....
  • The wait() method will release the lock of the object and enter the WaitSet waiting area, so that other threads can get the lock of the object. Wait indefinitely until notify
  • wait(long n) is a time limited wait that ends after n milliseconds or is notified

Correct posture of wait notify

The difference between sleep(long n) and wait(long n)

  • 1) sleep is the Thread method, while wait is the Object method
  • 2) sleep does not need to be used with synchronized forcibly, but wait needs to be used with synchronized
  • 3) sleep will not release the object lock while sleeping, but wait will release the object lock while waiting
  • 4) They are all state TIMED_WAITING

Use case

Think about the following solutions. Why?

import lombok.extern.slf4j.Slf4j;

import static java.lang.Thread.sleep;

@Slf4j
public class Main {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeout = false;

    public static void main(String[] args)
    {
        new Thread(() -> {
            synchronized (room) {
                log.debug("Any smoke?[{}]", hasCigarette);
                if (!hasCigarette) {
                    log.debug("No smoke, take a break!");
                    try {
                        sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("Any smoke?[{}]", hasCigarette);
                if (hasCigarette) {
                    log.debug("You can start working");
                }
            }
        }, "Xiaonan").start();
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                synchronized (room) {
                    log.debug("You can start working");
                }
            }, "Other people").start();
        }
        try {
            //Throw an exception that may be interrupted by other threads
            sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(() -> {
            // Can I add synchronized (room) here?
            hasCigarette = true;
            log.debug("Here comes the smoke!");
        }, "Cigarette delivery").start();
    }
}

output

20:49:49.883 [Xiaonan] c.TestCorrectPosture - Any smoke?[false] 
20:49:49.887 [Xiaonan] c.TestCorrectPosture - No smoke, take a break!
20:49:50.882 [Cigarette delivery] c.TestCorrectPosture - Here comes the smoke!
20:49:51.887 [Xiaonan] c.TestCorrectPosture - Any smoke?[true] 
20:49:51.887 [Xiaonan] c.TestCorrectPosture - You can start working
20:49:51.887 [Other people] c.TestCorrectPosture - You can start working
20:49:51.887 [Other people] c.TestCorrectPosture - You can start working
20:49:51.888 [Other people] c.TestCorrectPosture - You can start working
20:49:51.888 [Other people] c.TestCorrectPosture - You can start working
20:49:51.888 [Other people] c.TestCorrectPosture - You can start working
  • Other working threads have to be blocked all the time, which is too inefficient
  • Xiaonan thread can't wake up until it has slept for 2s. Even if the smoke is delivered in advance, it can't wake up immediately
  • After adding synchronized (room), it's like Xiao Nan sleeping inside locking the door. Cigarettes can't be sent in at all. main doesn't add it synchronized is like the main thread coming in through the window
  • The solution is to use the wait - notify mechanism

Case optimization 1

import lombok.extern.slf4j.Slf4j;

import static java.lang.Thread.sleep;

@Slf4j
public class Main {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeout = false;

    public static void main(String[] args)
    {
        new Thread(() -> {
            synchronized (room) {
                log.debug("Any smoke?[{}]", hasCigarette);
                if (!hasCigarette) {
                    log.debug("No smoke, take a break!");
                    try {
                        room.wait(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("Any smoke?[{}]", hasCigarette);
                if (hasCigarette) {
                    log.debug("You can start working");
                }
            }
        }, "Xiaonan").start();

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                synchronized (room) {
                    log.debug("You can start working");
                }
            }, "Other people").start();
        }

        try {
            sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(() -> {
            synchronized (room) {
                hasCigarette = true;
                log.debug("Here comes the smoke!");
                room.notify();
            }
        }, "Cigarette delivery").start();
        
    }
}

output

20:51:42.489 [Xiaonan] c.TestCorrectPosture - Any smoke?[false] 
20:51:42.493 [Xiaonan] c.TestCorrectPosture - No smoke, take a break!
20:51:42.493 [Other people] c.TestCorrectPosture - You can start working
20:51:42.493 [Other people] c.TestCorrectPosture - You can start working
20:51:42.494 [Other people] c.TestCorrectPosture - You can start working
20:51:42.494 [Other people] c.TestCorrectPosture - You can start working
20:51:42.494 [Other people] c.TestCorrectPosture - You can start working
20:51:43.490 [Cigarette delivery] c.TestCorrectPosture - Here comes the smoke!
20:51:43.490 [Xiaonan] c.TestCorrectPosture - Any smoke?[true] 
20:51:43.490 [Xiaonan] c.TestCorrectPosture - You can start working

Question:

  • It solves the blocking problem of other working threads, but what if other threads are also waiting for conditions?
  • Because notify will wake up a sleeping thread randomly, what if it wakes up a thread that doesn't want to wake up

False wake-up problem demonstration

import lombok.extern.slf4j.Slf4j;

import static java.lang.Thread.sleep;

@Slf4j
public class Main {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeout = false;

    public static void main(String[] args)
    {
        new Thread(() -> {
            synchronized (room) {
                log.debug("Any smoke?[{}]", hasCigarette);
                if (!hasCigarette) {
                    log.debug("No smoke, take a break!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("Any smoke?[{}]", hasCigarette);
                if (hasCigarette) {
                    log.debug("You can start working");
                } else {
                    log.debug("No dry survival...");
                }
            }
        }, "Xiaonan").start();
        
        new Thread(() -> {
            synchronized (room) {
                Thread thread = Thread.currentThread();
                log.debug("Did you deliver the takeout?[{}]", hasTakeout);
                if (!hasTakeout) {
                    log.debug("No takeout, take a break!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("Did you deliver the takeout?[{}]", hasTakeout);
                if (hasTakeout) {
                    log.debug("You can start working");
                } else {
                    log.debug("No dry survival...");
                }
            }
        }, "my daughter").start();
        
        try {
            sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        new Thread(() -> {
            synchronized (room) {
                hasTakeout = true;
                log.debug("Here's the takeout!");
                room.notify();
            }
        }, "Delivery").start();
        

    }
}

output

20:53:12.173 [Xiaonan] c.TestCorrectPosture - Any smoke?[false] 
20:53:12.176 [Xiaonan] c.TestCorrectPosture - No smoke, take a break!
20:53:12.176 [my daughter] c.TestCorrectPosture - Did you deliver the takeout?[false] 
20:53:12.176 [my daughter] c.TestCorrectPosture - No takeout, take a break!
20:53:13.174 [Delivery] c.TestCorrectPosture - Here's the takeout!
20:53:13.174 [Xiaonan] c.TestCorrectPosture - Any smoke?[false] 
20:53:13.174 [Xiaonan] c.TestCorrectPosture - No dry survival...
  • notify can only randomly wake up one thread in the WaitSet. At this time, if other threads are also waiting, the correct thread may not be awakened Cheng, call it "false awakening"
  • Solution, change to notifyAll

Case optimization 2

new Thread(() -> {
 synchronized (room) {
 hasTakeout = true;
 log.debug("Here's the takeout!");
 room.notifyAll();
 }
}, "Delivery").start();

output

20:55:23.978 [Xiaonan] c.TestCorrectPosture - Any smoke?[false] 
20:55:23.982 [Xiaonan] c.TestCorrectPosture - No smoke, take a break!
20:55:23.982 [my daughter] c.TestCorrectPosture - Did you deliver the takeout?[false] 
20:55:23.982 [my daughter] c.TestCorrectPosture - No takeout, take a break!
20:55:24.979 [Delivery] c.TestCorrectPosture - Here's the takeout!
20:55:24.979 [my daughter] c.TestCorrectPosture - Did you deliver the takeout?[true] 
20:55:24.980 [my daughter] c.TestCorrectPosture - You can start working
20:55:24.980 [Xiaonan] c.TestCorrectPosture - Any smoke?[false] 
20:55:24.980 [Xiaonan] c.TestCorrectPosture - No dry survival...
  • notifyAll only solves the wake-up problem of a thread, but if + wait is used to judge that there is only one chance. Once the condition is not tenable, it will not be restarted A chance to judge
  • The solution is to use while + wait. When the condition is not tenable, wait again

Case optimization 3

Change if to while

if (!hasCigarette) {
 log.debug("No smoke, take a break!");
 try {
 room.wait();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
}

After modification

while (!hasCigarette) {
 log.debug("No smoke, take a break!");
 try {
 room.wait();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
}

output

20:58:34.322 [Xiaonan] c.TestCorrectPosture - Any smoke?[false] 
20:58:34.326 [Xiaonan] c.TestCorrectPosture - No smoke, take a break!
20:58:34.326 [my daughter] c.TestCorrectPosture - Did you deliver the takeout?[false] 
20:58:34.326 [my daughter] c.TestCorrectPosture - No takeout, take a break!
20:58:35.323 [Delivery] c.TestCorrectPosture - Here's the takeout!
20:58:35.324 [my daughter] c.TestCorrectPosture - Did you deliver the takeout?[true] 
20:58:35.324 [my daughter] c.TestCorrectPosture - You can start working
20:58:35.324 [Xiaonan] c.TestCorrectPosture - No smoke, take a break!

wait and notify templates

synchronized(lock) {
 while(The condition is not tenable) {
 lock.wait();
 }
 // work
}
//Another thread
synchronized(lock) {
 lock.notifyAll();
}

Protective pause in synchronization mode

1. Definitions

That is, Guarded Suspension, which is used to wait for the execution result of another thread

main points

  • One result needs to be passed from one thread to another to associate them with the same GuardedObject
  • If there are results continuously from one thread to another, you can use message queuing (see producer / consumer)
  • In JDK, this mode is adopted for the implementation of join and Future
  • Because we have to wait for the results of the other party, we are classified into synchronous mode

2. Realization

Protected object:

class GuardedObject 
{
    private Object response;
    private final Object lock = new Object();

    public Object get() {
        synchronized (lock) {
          // Wait if the conditions are not met - prevent false wake-up, because other threads may also call the notifyall method
            while (response == null) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return response;
        }
    }

    public void complete(Object response) {
        synchronized (lock) {
// If the condition is met, notify the waiting thread
            this.response = response;
            lock.notifyAll();
        }
    }
}

application

One thread waits for the execution result of another thread

import lombok.extern.slf4j.Slf4j;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;

@Slf4j
public class Main
{
    public static void main(String[] args) {
        GuardedObject guardedObject = new GuardedObject();
        new Thread(() -> {
            try {
              // The child thread performs the download
                List<String> response = download();
                log.debug("download complete...");
                guardedObject.complete(response);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
        log.error("waiting...");
        // Main thread blocking wait
        Object response = guardedObject.get();
        log.error("get response: [{}] lines", ((List<String>) response).size());
    }

    private static List<String> download() throws IOException {
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL("https://www.baidu.com").openConnection();
        List<String> res=new ArrayList<>();
        InputStream in;
        try(BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream(),"UTF-8"));)
        {
         String line;
         while((line=bufferedReader.readLine())!=null)
         {
             res.add(line);
         }
        }
        return res;
    }
}

GuardedObject with timeout version

What if you want to control the timeout

import lombok.extern.slf4j.Slf4j;

@Slf4j
class GuardedObjectV2 {
    private Object response;
    private final Object lock = new Object();

    public Object get(long millis) {
        synchronized (lock) {
             // 1) Record initial time
            long begin = System.currentTimeMillis();
             // 2) Time that has passed
            long timePassed = 0;
            while (response == null) {
                  // 4) Assuming that the millis is 1000, and the result wakes up at 400, there is still 600 to wait
                long waitTime = millis - timePassed;
                log.debug("waitTime: {}", waitTime);
                if (waitTime <= 0) {
                    log.debug("break...");
                    break;
                }
                try {
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                 // 3) If you are awakened in advance, the elapsed time is assumed to be 400
                timePassed = System.currentTimeMillis() - begin;
                log.debug("timePassed: {}, object is null {}",
                        timePassed, response == null);
            }
            return response;
        }
    }

    public void complete(Object response) {
        synchronized (lock) {
             // If the condition is met, notify the waiting thread
            this.response = response;
            log.debug("notify...");
            lock.notifyAll();
        }
    }
}
  • Test, no timeout
    public static void main(String[] args) {
        GuardedObjectV2 v2 = new GuardedObjectV2();
        new Thread(() -> {
            try {
                sleep(1);
                v2.complete(null);
                sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            v2.complete(Arrays.asList("a", "b", "c"));
        }).start();
        Object response = v2.get(2500);
        if (response != null) {
            log.debug("get response: [{}] lines", ((List<String>) response).size());
        } else {
            log.debug("can't get response");
        }
    }

output

08:49:39.917 [main] c.GuardedObjectV2 - waitTime: 2500
08:49:40.917 [Thread-0] c.GuardedObjectV2 - notify...
08:49:40.917 [main] c.GuardedObjectV2 - timePassed: 1003, object is null true
08:49:40.917 [main] c.GuardedObjectV2 - waitTime: 1497
08:49:41.918 [Thread-0] c.GuardedObjectV2 - notify...
08:49:41.918 [main] c.GuardedObjectV2 - timePassed: 2004, object is null false
08:49:41.918 [main] c.TestGuardedObjectV2 - get response: [3] lines
  • Test, timeout
// Insufficient waiting time
List<String> lines = v2.get(1500);

output

08:47:54.963 [main] c.GuardedObjectV2 - waitTime: 1500
08:47:55.963 [Thread-0] c.GuardedObjectV2 - notify...
08:47:55.963 [main] c.GuardedObjectV2 - timePassed: 1002, object is null true
08:47:55.963 [main] c.GuardedObjectV2 - waitTime: 498
08:47:56.461 [main] c.GuardedObjectV2 - timePassed: 1500, object is null true
08:47:56.461 [main] c.GuardedObjectV2 - waitTime: 0
08:47:56.461 [main] c.GuardedObjectV2 - break...
08:47:56.461 [main] c.TestGuardedObjectV2 - can't get response
08:47:56.963 [Thread-0] c.GuardedObjectV2 - notify...

Principle join

Is the caller polling to check the thread alive status

t1.join();

Equivalent to the following code

synchronized (t1) {
 // The caller thread enters the waitSet of t1 and waits until t1 runs
 while (t1.isAlive()) {
 t1.wait(0);
 }
}

join source code

    public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

Pay attention to the difference between the monitored thread and the sleeping waiting thread:

//t1 thread is the monitored thread
Thread t1 = new Thread(() -> {
 sleep(2);
 });

//The join method is invoked in the main thread, so the main thread is the thread waiting to sleep.
t1.join();

be careful join embodies the [protective pause] mode

Multi tasking GuardedObject

In the picture, Futures is like a mailbox on the first floor of a residential building (each mailbox has a room number). t0, t2 and t4 on the left are like residents waiting for mail, right Side t1, t3, t5 are like postmen

If you need to use GuardedObject objects between multiple classes, it is not convenient to pass them as parameters. Therefore, design an intermediate class for decoupling, This can not only decouple the result wait and result producer, but also support the management of multiple tasks at the same time

realization

The new id is used to identify the Guarded Object

class GuardedObject {
    // Identifies the Guarded Object
    private int id;
    public GuardedObject(int id) {
        this.id = id;
    }
    public int getId() {
        return id;
    }
    // result
    private Object response;
    // Get results
    // timeout indicates how long to wait
    public Object get(long timeout) {
        synchronized (this) {
            // Start time: 15:00:00
            long begin = System.currentTimeMillis();
            // Time experienced
            long passedTime = 0;
            while (response == null) {
                // How long should this cycle wait
                long waitTime = timeout - passedTime;
                // Exit the loop when the elapsed time exceeds the maximum waiting time
                if (timeout - passedTime <= 0) {
                    break;
                }
                try {
                    this.wait(waitTime); // False wake-up 15:00:01
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // Find the experience time
                passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s
            }
            return response;
        }
    }
    // Produce results
    public void complete(Object response) {
        synchronized (this) {
            // Assign a value to the result member variable
            this.response = response;
            this.notifyAll();
        }
    }
}

Intermediate decoupling class

import java.util.Hashtable;
import java.util.Map;
import java.util.Set;

class Mailboxes {
 //Hashtable is thread safe
 private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
 //id of each resident
 private static int id = 1;

 // Generate unique id
 private static synchronized int generateId() 
 {
  return id++;
 }

 public static GuardedObject getGuardedObject(int id)
 {
  return boxes.remove(id);
 }

 public static GuardedObject createGuardedObject() 
 {
  GuardedObject go = new GuardedObject(generateId());
  boxes.put(go.getId(), go);
  return go;
 }

 public static Set<Integer> getIds()
 {
  return boxes.keySet();
 }
 

Business related

Diao Min:

import lombok.extern.slf4j.Slf4j;

@Slf4j
class People extends Thread {
    @Override
    public void run() 
    {
        // Receive a letter
        GuardedObject guardedObject = Mailboxes.createGuardedObject();
        log.debug("Start receiving letters id:{}", guardedObject.getId());
        //Residents can wait up to five seconds. If they can't receive a letter during these five seconds, diaomin will leave
        Object mail = guardedObject.get(5000);
        log.debug("Received letter id:{}, content:{}", guardedObject.getId(), mail);
    }
}

Post office staff

import lombok.extern.slf4j.Slf4j;

@Slf4j
class Postman extends Thread {
    private int id;
    private String mail;

    public Postman(int id, String mail) {
        this.id = id;
        this.mail = mail;
    }

    @Override
    public void run() {
        GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
        log.debug("Deliver a letter id:{}, content:{}", id, mail);
        guardedObject.complete(mail);
    }
}

test

public class Main
{
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            new People().start();
        }
        Thread.sleep(1);
        for (Integer id : Mailboxes.getIds()) {
            new Postman(id, "content" + id).start();
        }
    }
}

Result of a run

10:35:05.689 c.People [Thread-1] - Start receiving letters id:3
10:35:05.689 c.People [Thread-2] - Start receiving letters id:1
10:35:05.689 c.People [Thread-0] - Start receiving letters id:2
10:35:06.688 c.Postman [Thread-4] - Deliver a letter id:2, content:Content 2
10:35:06.688 c.Postman [Thread-5] - Deliver a letter id:1, content:Content 1
10:35:06.688 c.People [Thread-0] - Received letter id:2, content:Content 2
10:35:06.688 c.People [Thread-2] - Received letter id:1, content:Content 1
10:35:06.688 c.Postman [Thread-3] - Deliver a letter id:3, content:Content 3
10:35:06.689 c.People [Thread-1] - Received letter id:3, content:Content 3

Producer / consumer of asynchronous mode

1. Definitions

main points

  • Unlike the GuardObject in the previous protective pause, there is no need for one-to-one correspondence between the threads that generate and consume results
  • Consumption queues can be used to balance thread resources for production and consumption
  • The producer is only responsible for generating the result data and does not care about how to deal with the data, while the consumer focuses on dealing with the result data
  • Message queues have capacity limits. When full, data will not be added, and when empty, data will not be consumed
  • This mode is adopted for various blocking queues in JDK

2. Realization

Message class

//Message entity classes stored in message queues
final class Message 
{
    private int id;
    private Object message;

    public Message(int id, Object message) 
    {
        this.id = id;
        this.message = message;
    }

    public int getId() {
        return id;
    }

    public Object getMessage() {
        return message;
    }
}

Message queue

package producerAndCustomer;

import lombok.extern.slf4j.Slf4j;

import java.util.LinkedList;

//Message queue
@Slf4j
class MessageQueue {
 //Bidirectional linked list queue for storing messages
    private LinkedList<Message> queue;
 //Queue capacity
    private int capacity;

    public MessageQueue(int capacity) {
        this.capacity = capacity;
        queue = new LinkedList<>();
    }

    public Message take() {
        synchronized (queue) {
            //If there is no message in the current queue, put the current thread into sleep
            while (queue.isEmpty()) {
                log.debug("It's out of stock, wait");
                try {
                    //When a thread puts a message into the queue, it will wake up the waiting consumer
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //There are remaining messages for consumers to consume
            Message message = queue.removeFirst();
            //After consumption, wake up all sleeping producers
            queue.notifyAll();
            return message;
        }
    }

    public void put(Message message) {
        synchronized (queue) {
            //If the queue is full
            while (queue.size() == capacity) {
                log.debug("Inventory has reached the upper limit, wait");
                try {
                    //The current thread enters a waiting state
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //The queue is not full. Put a message into the queue
            queue.addLast(message);
            //Wake up consumers in sleep and tell them the news is coming
            queue.notifyAll();
        }
    }
}

application

package producerAndCustomer;

import lombok.extern.slf4j.Slf4j;


@Slf4j
public class Main {
    public static void main(String[] args) {
        //The maximum capacity of the message queue is 2
        MessageQueue messageQueue = new MessageQueue(2);
        //Three producers
        for (int i = 0; i < 3; i++) {
            int id = i;
            new Thread(() -> {
                messageQueue.put(new Message(id, "Huyou " + id + "number"));
                  System.out.println("Producer production message");
            }, "thread " + i).start();
        }
        //Consumers continue to consume news
        new Thread(() ->
        {
            while (true) {
                try {
                    Thread.sleep(1);
                    //Consumer News
                    Message take = messageQueue.take();
                    System.out.println("The consumption message is: " + take.getMessage());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

At the end of execution, there is another consumer thread in the lounge, so the program does not end, but the main thread and the three producer threads have finished execution

Park & Unpark

Basic use

They are methods in the LockSupport class

// Pauses the current thread
LockSupport.park(); 
// Resume a thread
LockSupport.unpark(Pause thread object)

park before unpark

@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            log.debug("start...");
            try {
                sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("park...");
            LockSupport.park();
            log.debug("resume...");
        }, "t1");
        t1.start();
        sleep(2);
        log.debug("unpark...");
        LockSupport.unpark(t1);
    }
}

output

18:42:52.585 c.TestParkUnpark [t1] - start... 
18:42:53.589 c.TestParkUnpark [t1] - park... 
18:42:54.583 c.TestParkUnpark [main] - unpark... 
18:42:54.583 c.TestParkUnpark [t1] - resume...

unpark first and then park

Thread t1 = new Thread(() -> {
 log.debug("start...");
 sleep(2);
 log.debug("park...");
 LockSupport.park();
 log.debug("resume...");
}, "t1");
t1.start();
sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);

output

18:43:50.765 c.TestParkUnpark [t1] - start... 
18:43:51.764 c.TestParkUnpark [main] - unpark... 
18:43:52.769 c.TestParkUnpark [t1] - park... 
18:43:52.769 c.TestParkUnpark [t1] - resume...

characteristic

Compared with wait & notify of Object

  • wait, notify and notifyAll must be used together with Object Monitor, but park and unpark do not
  • Park & unpark is used to block and wake up threads by thread, while notify can only wake up one waiting thread randomly, notifyAll It is not so accurate to wake up all waiting threads
  • Park & unpark can unpark first, while wait & notify cannot notify first and then wait

Park & unpark principle

Each thread has its own Parker object, which is composed of three parts_ counter , _ cond and_ mutex, make an analogy

  • Thread is like a traveler, Parker is like his backpack, and condition variables are like tents in his backpack_ counter is like the spare dry food in the backpack (0 is exhausted and 1 is sufficient)
  • Call park to see if you need to stop and rest
  • If the spare dry food runs out, get into the tent and rest
  • If there is enough spare dry food, don't stop and move on
  • Calling unpark is like making enough dry food
  • If the thread is still in the tent, wake up and let him move on
  • If the thread is still running at this time, the next time he calls park, he will only consume the spare dry food without staying and moving on
  • Because the backpack space is limited, calling unpark multiple times will only supplement one spare dry food

Parse of park method call procedure

  1. The current thread calls unsafe Park () method ----- > see if you need to rest
  2. Check_ counter, this case is 0. In this case, the_ mutex mutual exclusion lock ----- > dry food is gone, rest
  3. Thread entry_ cond conditional variable blocking ----- > enter the tent to rest
  4. Set_ Counter = 0 --- > marked dry food is exhausted

Call park first and then unpark

  1. Call unsafe Unpark (thread_0) method, setting_ counter is 1 --- > supplement dry food
  2. Awaken_ Thread in cond condition variable_ Wake up the rest and continue on the road
  3. Thread_0 resume operation ----- > continue on the road
  4. Set_ counter is 0 --- > it will consume physical strength on the road and need to be supplemented by dry food

Call unpark first and then park

  1. Call unsafe Unpark (thread_0) method, setting_ counter is 1 ----- > supplement dry food first
  2. The current thread calls unsafe Park () method ----- > pedestrians are tired and want to rest
  3. Check_ counter, this case is 1. At this time, the thread does not need to be blocked and continues to run. - > if you find that there is still dry food, you eat dry food to supplement your physical stren gt h, so you don't need to rest
  4. Set_ counter is 0 ------ > when the dry food is finished, set the flag

Re understand thread state transitions

Suppose there is a thread Thread t

  • Case 1 new -- > runable

When the t.start() method is called, new -- > runable

  • Case 2 runnable < -- > waiting

After the t thread obtains the object lock with synchronized(obj)

  • Call obj When using the wait() method, the t thread starts from runnable -- > waiting
  • Call obj notify() , obj. When notifyall(), t.interrupt()

The contention lock succeeds, and the t thread starts running -- > from waiting Contention lock failed, t thread from waiting -- > blocked

package park;

import lombok.extern.slf4j.Slf4j;

import static java.lang.Thread.sleep;

@Slf4j
public class Main {

    final static Object obj = new Object();

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            synchronized (obj) {
                log.debug("implement....");
                try {
                    obj.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("Other codes...."); // breakpoint
            }
        }, "t1").start();
        new Thread(() -> {
            synchronized (obj) {
                log.debug("implement....");
                try {
                    obj.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("Other codes...."); // breakpoint
            }
        }, "t2").start();

        sleep((long) 0.5);
        log.debug("awaken obj Other threads on");
        synchronized (obj) {
            obj.notify(); // Randomly wakes up obj the last waiting thread breakpoint
            System.out.println("123");
        }
    }
}
  • Case 3 runnable < -- > waiting

When the current thread calls the t.join() method, the current thread starts from runnable -- > waiting

Note that the current thread is waiting on the monitor of the t thread object

t when the thread ends running, or the interrupt() of the current thread is called, the current thread starts from waiting -- > runnable

  • Case 4 runnable < -- > waiting

The current thread calls locksupport The park () method will make the current thread from runnable -- > waiting

Call locksupport Unpark (target thread) or interrupt() of the thread is called to make the target thread from waiting -- > runnable

  • Case 5 runnable < -- > timed_ WAITING

After the t thread obtains the object lock with synchronized(obj)

Call obj When using the wait (long n) method, the t thread starts from runnable -- > timed_ WAITING

T thread waiting time exceeds n milliseconds, or call obj notify() , obj. When notifyall(), t.interrupt()

Contention lock successful, t thread from timed_ WAITING --> RUNNABLE

Contention lock failed, t thread from timed_ WAITING --> BLOCKED

  • Case 6 runnable < -- > timed_ WAITING

When the current thread calls the t.join(long n) method, the current thread starts from runnable -- > timed_ WAITING

Note that the current thread is waiting on the monitor of the t thread object

When the waiting time of the current thread exceeds n milliseconds, or the running of t thread ends, or the interrupt() of the current thread is called, the current thread starts from TIMED_WAITING --> RUNNABLE

  • Case 7 runnable < -- > timed_ WAITING

The current thread calls thread Sleep (long n), the current thread from runnable -- > timed_ WAITING

The waiting time of the current thread has exceeded n milliseconds. The current thread is from timed_ WAITING --> RUNNABLE

  • Case 8 runnable < -- > timed_ WAITING

The current thread calls locksupport Parknanos (long nanos) or locksupport Parkuntil (long miles) Program from runnable -- > timed_ WAITING

Call locksupport Unpark (target thread) or call thread interrupt() or wait timeout will cause the target thread to TIMED_WAITING--> RUNNABLE

  • Case 9 runnable < -- > blocked

When the t thread obtains the object lock with synchronized(obj), if the contention fails, it will start from runnable -- > blocked

After the synchronization code block of the obj lock thread is executed, all BLOCKED threads on the object will be awakened to compete again. If t threads compete Successful, from BLOCKED -- > runnable, other failed threads are still BLOCKED

  • Case 10 runnable < -- > terminated

After all the codes of the current thread have been run, enter TERMINATED

Multiple locks

Multiple irrelevant locks

A big room has two functions: sleep and study.

Now Xiao Nan has to learn and the little girl has to sleep, but if only one room (one object lock) is used, the concurrency is very low

The solution is to prepare multiple rooms (multiple object locks)

for example

@Slf4j
class BigRoom {
    public void sleep() throws InterruptedException {
        synchronized (this) {
            log.debug("sleeping 2 hour");
            Thread.sleep(2);
        }
    }
    public void study() throws InterruptedException {
        synchronized (this) {
            log.debug("study 1 hour");
            Thread.sleep(1);
        }
    }
}

implement

        BigRoom bigRoom = new BigRoom();
        new Thread(() -> {
            try {
                bigRoom.study();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"Xiaonan").start();
        new Thread(() -> {
            try {
                bigRoom.sleep();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"my daughter").start();

A result

12:13:54.471 [Xiaonan] c.BigRoom - study 1 hour
12:13:55.476 [my daughter] c.BigRoom - sleeping 2 hour

Obviously, Xiao Nan's behavior belongs to standing in the manger without taking a shit

Therefore, the lock of the study and the lock of the bedroom must not be the same lock

improvement

@Slf4j
class BigRoom {

    private final Object studyRoom = new Object();
    private final Object bedRoom = new Object();

    public void sleep() throws InterruptedException {
        synchronized (bedRoom) {
            log.debug("sleeping 2 hour");
            Thread.sleep(2);
        }
    }
    public void study() throws InterruptedException {
        synchronized (studyRoom) {
            log.debug("study 1 hour");
            Thread.sleep(1);
        }
    }
}

Execution result

12:15:35.069 [Xiaonan] c.BigRoom - study 1 hour
12:15:35.069 [my daughter] c.BigRoom - sleeping 2 hour

Subdivide the granularity of locks

  • The advantage is that it can enhance concurrency
  • The disadvantage is that if a thread needs to obtain multiple locks at the same time, it is prone to deadlock

Activity

deadlock

There is such a situation: a thread needs to obtain multiple locks at the same time, and deadlock is easy to occur

Thread t1 obtains the lock of object A, and then wants to obtain the lock of object B. thread t2 obtains the lock of object B, and then wants to obtain the lock of object A

Example:

package park;

import lombok.extern.slf4j.Slf4j;

import static java.lang.Thread.sleep;


@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException
    {
        Object A = new Object();
        Object B = new Object();
        Thread t1 = new Thread(() -> {
            synchronized (A) {
                log.debug("lock A");
                try {
                    sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (B) {
                    log.debug("lock B");
                    log.debug("operation...");
                }
            }
        }, "t1");
        Thread t2 = new Thread(() -> {
            synchronized (B) {
                log.debug("lock B");
                try {
                    sleep((long) 0.5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (A) {
                    log.debug("lock A");
                    log.debug("operation...");
                }
            }
        }, "t2");
        t1.start();
        t2.start();
    }
}

result

12:22:06.962 [t2] c.TestDeadLock - lock B 
12:22:06.962 [t1] c.TestDeadLock - lock A

Positioning deadlock

  • To detect deadlocks, you can use the * * jconsole * * tool, or use jps to locate the process id, and then use jstack to locate deadlocks
cmd > jps
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
12320 Jps
22816 KotlinCompileDaemon
33200 TestDeadLock // JVM process
11508 Main
28468 Launcher
cmd > jstack 33200
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
2018-12-29 05:51:40
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.91-b14 mixed mode):
"DestroyJavaVM" #13 prio=5 os_prio=0 tid=0x0000000003525000 nid=0x2f60 waiting on condition 
[0x0000000000000000]
 java.lang.Thread.State: RUNNABLE
"Thread-1" #12 prio=5 os_prio=0 tid=0x000000001eb69000 nid=0xd40 waiting for monitor entry 
[0x000000001f54f000]
 java.lang.Thread.State: BLOCKED (on object monitor)
 at thread.TestDeadLock.lambda$main$1(TestDeadLock.java:28)
 - waiting to lock <0x000000076b5bf1c0> (a java.lang.Object)
 - locked <0x000000076b5bf1d0> (a java.lang.Object)
 at thread.TestDeadLock$$Lambda$2/883049899.run(Unknown Source)
 at java.lang.Thread.run(Thread.java:745)
"Thread-0" #11 prio=5 os_prio=0 tid=0x000000001eb68800 nid=0x1b28 waiting for monitor entry 
[0x000000001f44f000]
 java.lang.Thread.State: BLOCKED (on object monitor)
 at thread.TestDeadLock.lambda$main$0(TestDeadLock.java:15)
 - waiting to lock <0x000000076b5bf1d0> (a java.lang.Object)
- locked <0x000000076b5bf1c0> (a java.lang.Object)
 at thread.TestDeadLock$$Lambda$1/495053715.run(Unknown Source)
 at java.lang.Thread.run(Thread.java:745)
 
// Omit some output
Found one Java-level deadlock:
=============================
"Thread-1":
 waiting to lock monitor 0x000000000361d378 (object 0x000000076b5bf1c0, a java.lang.Object),
 which is held by "Thread-0"
"Thread-0":
 waiting to lock monitor 0x000000000361e768 (object 0x000000076b5bf1d0, a java.lang.Object),
 which is held by "Thread-1"
Java stack information for the threads listed above:
===================================================
"Thread-1":
 at thread.TestDeadLock.lambda$main$1(TestDeadLock.java:28)
 - waiting to lock <0x000000076b5bf1c0> (a java.lang.Object)
 - locked <0x000000076b5bf1d0> (a java.lang.Object)
 at thread.TestDeadLock$$Lambda$2/883049899.run(Unknown Source)
 at java.lang.Thread.run(Thread.java:745)
"Thread-0":
 at thread.TestDeadLock.lambda$main$0(TestDeadLock.java:15)
 - waiting to lock <0x000000076b5bf1d0> (a java.lang.Object)
 - locked <0x000000076b5bf1c0> (a java.lang.Object)
 at thread.TestDeadLock$$Lambda$1/495053715.run(Unknown Source)
 at java.lang.Thread.run(Thread.java:745)
Found 1 deadlock.
  • To avoid deadlock, pay attention to the locking sequence
  • In addition, if a thread enters an endless loop, causing other threads to wait all the time, in this case, linux can locate it first through top For Java processes with high CPU consumption, use the top -Hp process id to locate which thread it is, and then use jstack to check

Dining problem of philosophers

There are five philosophers sitting around the round table.

  • They only do two things, thinking and eating. They think for a while, have a meal, and then think after dinner.
  • When eating, you should use two chopsticks. There are five chopsticks on the table. Each philosopher has one chopstick on his left and right hands.
  • If the chopsticks are held by the people around you, you have to wait

Chopsticks

package park;

class Chopstick 
{

   String name;

   public Chopstick(String name) {
  this.name = name;
 }

   @Override
   public String toString() {
  return "chopsticks{" + name + '}';
 }

}

Philosophers

package park;

import lombok.extern.slf4j.Slf4j;

@Slf4j
class Philosopher extends Thread {
    Chopstick left;
    Chopstick right;

    public Philosopher(String name, Chopstick left, Chopstick right) {
        super(name);
        this.left = left;
        this.right = right;
    }

    private void eat() {
        log.debug("eating...");
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true) {
            // Get left-hand chopsticks
            synchronized (left) {
                // Get right-hand chopsticks
                synchronized (right) {
                    // having dinner
                    eat();
                }
                // Put down your right chopsticks
            }
            // Put down your left chopsticks
        }
    }
}

eat

Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("Socrates", c1, c2).start();
new Philosopher("Plato", c2, c3).start();
new Philosopher("Aristotle", c3, c4).start();
new Philosopher("Heraclitus", c4, c5).start();
new Philosopher("Archimedes", c5, c1).start();

If it is not implemented for many meetings, it cannot be implemented

12:33:15.575 [Socrates] c.Philosopher - eating... 
12:33:15.575 [Aristotle] c.Philosopher - eating... 
12:33:16.580 [Archimedes] c.Philosopher - eating... 
12:33:17.580 [Archimedes] c.Philosopher - eating... 
// Stuck here, not running down

Using jconsole to detect deadlocks, it is found that

-------------------------------------------------------------------------
name: Archimedes
 state: cn.itcast.Chopstick@1540e19d (Chopsticks 1) Upper BLOCKED, Owner: Socrates
 Total blocked: 2, Total wait: 1
 stack trace :
cn.itcast.Philosopher.run(TestDinner.java:48)
 - Locked cn.itcast.Chopstick@6d6f6e28 (Chopsticks 5)
-------------------------------------------------------------------------
name: Socrates
 state: cn.itcast.Chopstick@677327b6 (Chopsticks 2) Upper BLOCKED, Owner: Plato
 Total blocked: 2, Total wait: 1
 stack trace :
cn.itcast.Philosopher.run(TestDinner.java:48)
 - Locked cn.itcast.Chopstick@1540e19d (Chopsticks 1)
-------------------------------------------------------------------------
name: Plato
 state: cn.itcast.Chopstick@14ae5a5 (Chopsticks 3) Upper BLOCKED, Owner: Aristotle
 Total blocked: 2, Total wait: 0
 stack trace :
cn.itcast.Philosopher.run(TestDinner.java:48)
 - Locked cn.itcast.Chopstick@677327b6 (Chopsticks 2)
-------------------------------------------------------------------------
name: Aristotle
 state: cn.itcast.Chopstick@7f31245a (Chopsticks 4) Upper BLOCKED, Owner: Heraclitus
 Total blocked: 1, Total wait: 1
 stack trace :
cn.itcast.Philosopher.run(TestDinner.java:48)
 - Locked cn.itcast.Chopstick@14ae5a5 (Chopsticks 3)
-------------------------------------------------------------------------
name: Heraclitus
 state: cn.itcast.Chopstick@6d6f6e28 (Chopsticks 5) Upper BLOCKED, Owner: Archimedes
 Total blocked: 2, Total wait: 0
 stack trace :
cn.itcast.Philosopher.run(TestDinner.java:48)
 - Locked cn.itcast.Chopstick@7f31245a (Chopsticks 4)

This kind of thread does not end as expected and cannot be executed. It is classified as [activity] problem. In addition to deadlock, there are also two kinds of situations: Live lock and hungry condition

Livelock

A livelock occurs when two threads change each other's end conditions, and finally no one can end, for example

package park;

import lombok.extern.slf4j.Slf4j;

import static java.lang.Thread.sleep;

@Slf4j
public class TestLiveLock {
 static volatile int count = 10;
 static final Object lock = new Object();

 public static void main(String[] args) {
  new Thread(() -> {
   // Expect to decrease to 0 and exit the loop
   while (count > 0) {
    try {
     sleep((long) 0.2);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
    count--;
    log.debug("count: {}", count);
   }
  }, "t1").start();
  
  new Thread(() -> {
   // Expect more than 20 to exit the cycle
   while (count < 20) {
    try {
     sleep((long) 0.2);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
    count++;
    log.debug("count: {}", count);
   }
  }, "t2").start();
 }
}

In the above code, two threads perform addition and subtraction operations on a variable count in parallel, resulting in the exit condition always being unable to be met, resulting in livelock

hunger

In many tutorials, hunger is defined as that a thread cannot be scheduled and executed by the CPU because its priority is too low, and it cannot end. Hunger is not a problem Easy to demonstrate. Hunger will be involved when talking about read-write locks

Sequential locking solves the previous deadlock problem

Sequential locking solves the previous deadlock problem

Solution of sequential locking

ReentrantLock

Compared with synchronized, it has the following characteristics

  • Interruptible
  • You can set the timeout
  • It can be set as fair lock (first in first out, which can be used to solve the problem of hunger)
  • Support multiple conditional variables (there are multiple waitset s, which can avoid the problem of false wake-up)

Like synchronized, reentrant is supported (the current thread repeatedly locks the same object)

Basic grammar

// Acquire lock
reentrantLock.lock();
try {
 // Critical zone
} finally {
 // Release lock
 reentrantLock.unlock();
}

Here reentrantlock lock(); This line of code is written in the try... catch statement block. It's OK. It's usually written outside

Reentrant

Reentrant means that if the same thread obtains the lock for the first time, it has the right to obtain the lock again because it is the owner of the lock

If it is a non reentrant lock, you will be blocked by the lock the second time you obtain the lock

@Slf4j
public class TestLiveLock {
    static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        method1();
    }

    public static void method1() {
        lock.lock();
        try {
            log.debug("execute method1");
            method2();
        } finally {
            lock.unlock();
        }
    }

    public static void method2() {
        lock.lock();
        try {
            log.debug("execute method2");
            method3();
        } finally {
            lock.unlock();
        }
    }

    public static void method3() {
        lock.lock();
        try {
            log.debug("execute method3");
        } finally {
            lock.unlock();
        }
    }
}

output

17:59:11.862 [main] c.TestReentrant - execute method1 
17:59:11.865 [main] c.TestReentrant - execute method2 
17:59:11.865 [main] c.TestReentrant - execute method3

Interruptible

    //Indicates that the current thread is interruptible
    lock.lockInterruptibly();
    //The lock() method cannot be interrupted
        lock.lock();

Review: interrupt interrupt

Calling the park method will also block the current thread and interrupt the park thread. The interrupt state will not be cleared, that is, no exception will be thrown When the interrupt flag is false, the park() method will block the current thread; When the interrupt flag is true, the park() method does not block the current thread

package park;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

import static java.lang.Thread.sleep;


@Slf4j
public class TestLiveLock {
 static ReentrantLock lock = new ReentrantLock();

 public static void main(String[] args) {

  Thread t1 = new Thread(() -> {
   log.debug("start-up...");
   try {
    //Indicates that the current thread is interruptible
    lock.lockInterruptibly();
   } catch (InterruptedException e) {
    e.printStackTrace();
    log.debug("The process of waiting for the lock is interrupted");
    return;
   }
   try {
    log.debug("Got the lock");
   } finally {
    lock.unlock();
   }
   
  }, "t1");

//The main route gets the lock first
  lock.lock();
  log.debug("Got the lock");
 //t1 thread start
  t1.start();

  try {
   sleep(1);
   //Interrupt the t1 thread --- t1 is still blocking, waiting to obtain the lock
   t1.interrupt();
   log.debug("Execution interrupt");
  } catch (InterruptedException e) {
   e.printStackTrace();
  } finally {
   lock.unlock();
  }
 }

}

output

18:02:40.520 [main] c.TestInterrupt - Got the lock
18:02:40.524 [t1] c.TestInterrupt - start-up... 
18:02:41.530 [main] c.TestInterrupt - Execution interrupt
java.lang.InterruptedException 
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchr
onizer.java:898) 
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchron
izer.java:1222) 
 at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) 
 at cn.itcast.n4.reentrant.TestInterrupt.lambda$main$0(TestInterrupt.java:17) 
 at java.lang.Thread.run(Thread.java:748) 
18:02:41.532 [t1] c.TestInterrupt - The process of waiting for the lock is interrupted

Note that if it is a non interruptible mode, even if interrupt is used, it will not make the wait interrupt

interrupt

18:06:56.261 [main] c.TestInterrupt - Got the lock
18:06:56.265 [t1] c.TestInterrupt - start-up... 
18:06:57.266 [main] c.TestInterrupt - Execution interrupt // At this time, t1 is not really interrupted, but continues to wait for the lock
18:06:58.267 [main] c.TestInterrupt - The lock was released
18:06:58.267 [t1] c.TestInterrupt - Got the lock

Lock timeout

Immediate failure

@Slf4j
public class TestLiveLock {
    static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) 
    {
        Thread t1 = new Thread(() -> {
            log.debug("start-up...");
            //tryLock() attempts to obtain the lock. If it cannot be obtained, it returns false
            if (!lock.tryLock()) {
                log.debug("Get immediate failure, return");
                return;
            }
            try {
                log.debug("Got the lock");
            } finally {
                lock.unlock();
            }
        }, "t1");
        
        lock.lock();
        log.debug("Got the lock");
        t1.start();
        try {
            sleep(2);
        } catch (InterruptedException e) {
         e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}

output

18:15:02.918 [main] c.TestTimeout - Got the lock
18:15:02.921 [t1] c.TestTimeout - start-up... 
18:15:02.921 [t1] c.TestTimeout - Get immediate failure, return

Unlike lock and lockInterruptibly, the tryLock method immediately returns a boolean value indicating whether a lock has been obtained if it cannot obtain a lock

Timeout failed

package park;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import static java.lang.Thread.sleep;


@Slf4j
public class TestLiveLock {
    static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            log.debug("start-up...");
            try {
                if (!lock.tryLock(1, TimeUnit.SECONDS)) {
                    log.debug("Get wait 1 s Failed after, return");
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                log.debug("Got the lock");
            } finally {
                lock.unlock();
            }
        }, "t1");
        lock.lock();
        log.debug("Got the lock");
        t1.start();
        try {
            sleep(2);
        } catch (InterruptedException e) {
         e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}

output

18:19:40.537 [main] c.TestTimeout - Got the lock
18:19:40.544 [t1] c.TestTimeout - start-up... 
18:19:41.547 [t1] c.TestTimeout - Get wait 1 s Failed after, return

For the tryLock method, if the waiting time limit is added, it means that if the lock is not obtained during the maximum waiting time, it returns false. Otherwise, once the lock is obtained during the waiting process, it returns true immediately Because there is a waiting process, it can be interrupted by other threads, so an interrupt exception will be thrown

Solving dining problems with tryLock

class Chopstick extends ReentrantLock {
 String name;

 public Chopstick(String name) {
  this.name = name;
 }

 @Override
 public String toString() {
  return "chopsticks{" + name + '}';
 }
}
package park;

import lombok.extern.slf4j.Slf4j;

@Slf4j
class Philosopher extends Thread {
    Chopstick left;
    Chopstick right;

    public Philosopher(String name, Chopstick left, Chopstick right) {
        super(name);
        this.left = left;
        this.right = right;
    }

    @Override
    public void run() {
        while (true) {
            // Try to get left-handed chopsticks
            if (left.tryLock()) {
                try {
                    // Try to get right-hand chopsticks
                    if (right.tryLock()) {
                        try {
                            eat();
                        } finally {
                            right.unlock();
                        }
                    }
                } finally {
                    left.unlock();
                }
            }
        }
    }

    private void eat() {
        log.debug("eating...");
     try {
      Thread.sleep(1);
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
}

What is the idea above to solve the problem of philosophers eating?

Fair lock

ReentrantLock is unfair by default

The fairness here is reflected in. The first in, first out (FIFO) in the blocking queue means that the threads entering the blocking queue first. When the lock is released and the threads in the blocking queue are awakened, they can obtain the lock first, rather than all threads competing for the lock

package park;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

import static java.lang.Thread.sleep;


@Slf4j
public class Main {
//Set to unfair first
    private static ReentrantLock lock=new ReentrantLock(fasle);
    public static void main(String[] args) throws InterruptedException
    {
        lock.lock();
        for (int i = 0; i < 500; i++) {
            new Thread(() -> {
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " running...");
                } finally {
                    lock.unlock();
                }
            }, "t" + i).start();
        }
       // Fight for the lock after 1s
        Thread.sleep(1000);
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " start...");
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " running...");
            } finally {
                lock.unlock();
            }
        }, "Force insertion").start();
        lock.unlock();
    }
}

Forcibly insert and have the opportunity to output in the middle

Note: this experiment may not always be repeated

t39 running... 
t40 running... 
t41 running... 
t42 running... 
t43 running... 
Force insertion start... 
Force insertion running... 
t44 running... 
t45 running... 
t46 running... 
t47 running... 
t49 running...

After changing to fair lock

ReentrantLock lock = new ReentrantLock(true);

Force insertion, always output at the end

t465 running... 
t464 running... 
t477 running... 
t442 running... 
t468 running... 
t493 running... 
t482 running... 
t485 running... 
t481 running... 
Force insertion running..

Fair locks are generally unnecessary and reduce concurrency. The principles will be explained later

Conditional variable

There are also conditional variables in synchronized, that is, the waitSet lounge when we talk about the principle. When the conditions are not met, enter the waitSet and wait

The advantage of ReentrantLock's conditional variables over synchronized is that it supports multiple conditional variables, which is like

  • synchronized means that those threads that do not meet the conditions are waiting for messages in a lounge
  • ReentrantLock supports multiple lounges, including a lounge dedicated to waiting for cigarettes and a lounge dedicated to waiting for breakfast. It also wakes up according to the lounge when waking up

Key points of use:

  • You need to obtain a lock before await
  • After await is executed, it will release the lock and enter the conditionObject to wait
  • await's thread is awakened (or interrupted, or timed out) to re compete for the lock lock
  • After the contention lock is successful, the execution continues after await
  • Contention lock failed, continue to wait

await can be easily understood by referring to wait

example:

package park;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import static java.lang.Thread.sleep;


@Slf4j
public class Main {
    static ReentrantLock lock = new ReentrantLock();
    static Condition waitCigaretteQueue = lock.newCondition();
    static Condition waitbreakfastQueue = lock.newCondition();
    static volatile boolean hasCigrette = false;
    static volatile boolean hasBreakfast = false;

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            try {
                lock.lock();
                while (!hasCigrette) {
                    try {
                        waitCigaretteQueue.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("Wait until its smoke");
            } finally {
                lock.unlock();
            }
        }).start();

        new Thread(() -> {
            try {
                lock.lock();
                while (!hasBreakfast) {
                    try {
                        waitbreakfastQueue.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("Wait until it's breakfast");
            } finally {
                lock.unlock();
            }
        }).start();
        sleep(1);
        sendBreakfast();
        sleep(1);
        sendCigarette();
    }

    private static void sendCigarette() {
        lock.lock();
        try {
            log.debug("Here comes the cigarette");
            hasCigrette = true;
            waitCigaretteQueue.signal();
        } finally {
            lock.unlock();
        }
    }

    private static void sendBreakfast() {
        lock.lock();
        try {
            log.debug("Breakfast is coming");
            hasBreakfast = true;
            waitbreakfastQueue.signal();
        } finally {
            lock.unlock();
        }
    }
}

output

18:52:27.680 [main] c.TestCondition - Breakfast is coming
18:52:27.682 [Thread-1] c.TestCondition - Wait until it's breakfast
18:52:28.683 [main] c.TestCondition - Here comes the cigarette
18:52:28.683 [Thread-0] c.TestCondition - Wait until its smoke

Sequential control of synchronous mode

1. Fixed operation sequence

For example, you must print 2 before 1

wait notify version

package park;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Main {

    // Objects used for synchronization
    static Object obj = new Object();
    // t2 run flag, indicating whether t2 has been executed
    static boolean t2runed = false;

    public static void main(String[] args) throws InterruptedException
    {
      //Make sure t2 runs before t1
        new Thread(()->{
            synchronized (obj)
            {
                //t2 has not been executed
                while (!t2runed)
                {
                    try {
                        //Go to the lounge and wait
                        obj.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("t1 implement");
            }
        },"t1").start();

      new Thread(()->{
          synchronized (obj)
          {
              System.out.println("t2 implement");
              //Modify run tag
              t2runed=true;
              // Notify the waiting thread on obj (there may be multiple threads, so you need to use notifyAll)
              obj.notifyAll();
          }
      },"t2").start();
    }

}

Park Unpark version

It can be seen that the implementation is troublesome:

  • First, you need to ensure that you wait before notify ing, otherwise the wait thread will never wake up. Therefore, the "run flag" is used to judge whether to wait
  • Second, if some interfering threads incorrectly notify the wait thread and wait again when the conditions are not met, a while loop is used to solve this problem
  • Finally, notifyAll needs to be used to wake up the wait thread on the object, because there may be more than one waiting thread on the synchronization object
  • You can use park and unpark of LockSupport class to simplify the above problem:
package park;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.LockSupport;

@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException
    {
        Thread t1 = new Thread(() -> {
            try { Thread.sleep(1000); } catch (InterruptedException e) { }
            // When there is no "license", the current thread is suspended; When there is a "license", use the "license" and the current thread will resume operation
            LockSupport.park();
            System.out.println("1");
        });
        Thread t2 = new Thread(() -> {
            System.out.println("2");
            // Issue a "license" to thread t1 (only one "license" will be issued if unpark is called repeatedly)
            LockSupport.unpark(t1);
        });
        t1.start();
        t2.start();
    }

}

The park and unpark methods are flexible. They will call first and who will call later. And "pause" and "resume" are performed in threads, "Synchronization object" and "operation flag" are not required

Alternate output

Thread 1 outputs a 5 times, thread 2 outputs b 5 times, and thread 3 outputs c 5 times. Now you need to output abcabcabcabcabc. How do you implement abcabc

wait notify version

package park;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.LockSupport;

@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException
    {
        SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1, 5);
        new Thread(() -> {
            try {
                syncWaitNotify.print(1, 2, "a");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                syncWaitNotify.print(2, 3, "b");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                syncWaitNotify.print(3, 1, "c");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }


}

/*
   Output order: ABC ABC ABC

   Thread name output next output corresponding number next number
*  Thread a b 0 1
*  Thread B C 1 2
   Thread C a 2 0
*
* */
class SyncWaitNotify
{
    //Record whether the current thread is executing the method
    private Integer flag;
    //Number of cycles
    private Integer loopNum;

    public SyncWaitNotify(Integer flag, Integer loopNum) {
        this.flag = flag;
        this.loopNum = loopNum;
    }

    //Output method
    public void print(Integer curNum, Integer nextNum,String str) throws InterruptedException {
       //How many times does the current thread need to cycle through printing
        for(int i=0;i<loopNum;i++)
        {
            synchronized (this)
            {
                //If it is inconsistent with the expected thread, let the current thread rest first and release the lock
                while(flag!=curNum)
                {
                    this.wait();
                }
                //In line with expectations
                System.out.print(str);
                //Set next expected thread
                this.flag=nextNum;
                //Wake up waiting threads
                this.notifyAll();
            }
        }
    }
}

Await & Signal version - Lock conditional variable version

abc thread ----- > go to the lounge to rest at the beginning - > the main thread wakes up thread a, a executes first, gets the lock ----- > thread a executes, wakes up b, and then releases the lock. In the next cycle, if you don't get the lock, you will block the wait and get the next execution, or go to the lounge first - > after b executes, wake up c, the same process as a ----- > after c executes, wake up a, the same as above

package park;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException
    {
        AwaitSignal as = new AwaitSignal(5);
        Condition aWaitSet = as.newCondition();
        Condition bWaitSet = as.newCondition();
        Condition cWaitSet = as.newCondition();
        new Thread(() -> {
            as.print("a", aWaitSet, bWaitSet);
        }).start();
        new Thread(() -> {
            as.print("b", bWaitSet, cWaitSet);
        }).start();
        new Thread(() -> {
            as.print("c", cWaitSet, aWaitSet);
        }).start();
        as.start(aWaitSet);
    }
}


class AwaitSignal extends ReentrantLock
{
    //Number of cycles
    private Integer loopNumber;

    public AwaitSignal(int loopNumber) {
        this.loopNumber = loopNumber;
    }

    public void start(Condition first) {
        this.lock();
        try {
            System.out.println("start....");
            //Wake up first
            first.signal();
        } finally {
            this.unlock();
        }
    }

    public void print(String str, Condition current, Condition next) {
        for (int i = 0; i < loopNumber; i++) {
            this.lock();
            try {
                current.await();
                System.out.print(str);
                next.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                this.unlock();
            }
        }
    }

}

be careful: The implementation does not consider that a, b and c threads are ready to start

Park Unpark version

package park;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.LockSupport;


@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException
    {
        SyncPark syncPark = new SyncPark(5);
        Thread t1 = new Thread(() -> {
            syncPark.print("a");
        });
        Thread t2 = new Thread(() -> {
            syncPark.print("b");
        });
        Thread t3 = new Thread(() -> {
            syncPark.print("c\n");
        });
        syncPark.setThreads(t1, t2, t3);
        syncPark.start();
    }
}


class SyncPark {
    private int loopNumber;
    private Thread[] threads;
    public SyncPark(int loopNumber) {
        this.loopNumber = loopNumber;
    }
    public void setThreads(Thread... threads) {
        this.threads = threads;
    }
    public void print(String str) {
        for (int i = 0; i < loopNumber; i++) {
            LockSupport.park();
            System.out.print(str);
            LockSupport.unpark(nextThread());
        }
    }
    private Thread nextThread() {
        Thread current = Thread.currentThread();
        int index = 0;
        for (int i = 0; i < threads.length; i++) {
            if(threads[i] == current) {
                index = i;
                break;
            }
        }
        if(index < threads.length - 1) {
            return threads[index+1];
        } else {
            return threads[0];
        }
    }
    public void start() {
        for (Thread thread : threads) {
            thread.start();
        }
        LockSupport.unpark(threads[0]);
    }
}

Summary of this chapter

What we need to focus on in this chapter is

  • Analyze which code fragments belong to critical areas when multithreading accesses shared resources
  • Using synchronized mutex to solve thread safety problems in critical areas

Master the syntax of synchronized lock object Master the syntax of synchronized loading member method and static method Master the wait/notify synchronization method

  • Using lock mutual exclusion to solve the thread safety problem of critical area

Master the use details of lock: interruptible, lock timeout, fair lock and condition variable Learn to analyze the thread safety of variables and master the use of common thread safety classes

  • Understand thread activity issues: deadlocks, livelocks, starvation
  • Application aspect

Mutual exclusion: use synchronized or Lock to achieve the mutual exclusion effect of shared resources Synchronization: use the condition variable of wait/notify or Lock to achieve the effect of inter thread communication

  • Principle aspect

monitor, synchronized, wait/notify principles synchronized advanced principle Park & unpark principle

  • Mode aspect

Protective pause in synchronization mode Producer consumer of asynchronous mode Sequential control of synchronous mode

Added by stone on Sat, 25 Dec 2021 08:11:33 +0200