Chapter 3 - management of shared model

Chapter 3 - management of shared model (3)

wait-notify

Little story - why wait

  • Because the conditions are not met, Xiaonan cannot continue the calculation
  • But if Xiaonan keeps occupying the lock, others will have to block all the time. The efficiency is too low

  • So Lao Wang opened a single Lounge (calling the wait method) and asked Xiao Nan to wait in the WaitSet, but at this time, the lock was released and others could enter the room at random
  • Until xiaom sends the cigarette, shout "your cigarette is here" (call the notify method)

  • Xiao Nan can then leave the lounge and re-enter the queue of competing locks

Principle of wait / notify

  • When the Owner thread finds that the conditions are not met, it calls the wait method to enter the WaitSet and change to the WAITING state
  • Both BLOCKED and WAITING threads are BLOCKED and do not occupy CPU time slices
  • The BLOCKED thread wakes up when the Owner thread releases the lock
  • The WAITING thread will wake up when the Owner thread calls notify or notifyAll, but waking up does not mean obtaining the lock immediately. It still needs to enter the EntryList to compete again

API introduction

  • obj.wait() causes the thread entering the object monitor to wait in the waitSet
  • obj.notify() selects one of the threads waiting for waitSet on the object to wake up
  • obj.notifyAll() wakes up all the threads waiting for waitSet on the object

They are all means of cooperation between threads and belong to the methods of Object objects. You must obtain a lock on this Object to call these methods

  • Call the wait() method directly
@Slf4j(topic = "c.Test18")
public class Test18 {
    static final Object lock = new Object();
    public static void main(String[] args) {

            try {
                lock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        
    }
}

Program running results

  • Because the lock of the object has not been obtained, the direct call will report an error.
  • You should obtain the lock of the object before calling the wait() method
@Slf4j(topic = "c.Test18")
public class Test18 {
    static final Object lock = new Object();
    public static void main(String[] args) {
        synchronized (lock) {
            try {
                lock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

notify and notifyAll

@Slf4j(topic = "c.TestWaitNotify")
public class TestWaitNotify {
    final static Object obj = new Object();

    public static void main(String[] args) {

        new Thread(() -> {
            synchronized (obj) {
                log.debug("implement....");
                try {
                    obj.wait(); // Let the thread wait on obj
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("Other codes....");
            }
        },"t1").start();

        new Thread(() -> {
            synchronized (obj) {
                log.debug("implement....");
                try {
                    obj.wait(); // Let the thread wait on obj
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("Other codes....");
            }
        },"t2").start();

        // The main thread executes in two seconds
        sleep(2);
        log.debug("awaken obj Other threads on");
        synchronized (obj) {
//            obj.notify(); //  Wake up obj previous thread
            obj.notifyAll(); // Wake up all waiting threads on obj
        }
    }
}

A result of notify (t1 and t2 are possible, depending on which thread executes first)

16:46:22.477 c.TestWaitNotify [t1] - implement....
16:46:22.480 c.TestWaitNotify [t2] - implement....
16:46:24.480 c.TestWaitNotify [main] - awaken obj Other threads on
16:46:24.481 c.TestWaitNotify [t1] - Other codes....

Results of notifyAll

16:47:23.572 c.TestWaitNotify [t1] - implement....
16:47:23.574 c.TestWaitNotify [t2] - implement....
16:47:25.575 c.TestWaitNotify [main] - awaken obj Other threads on
16:47:25.575 c.TestWaitNotify [t2] - Other codes....
16:47:25.576 c.TestWaitNotify [t1] - Other codes....
  • The wait() method will release the lock of the object and enter the WaitSet waiting area, so that other threads can get the lock of the object. Unlimited wait until notify
  • wait(long n) is a time limited wait that ends in n milliseconds or is notified
@Slf4j(topic = "c.TestWaitNotify")
public class TestWaitNotify {
    final static Object obj = new Object();

    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (obj) {
                log.debug("implement....");
                try {
                    //If no other thread of the same object wakes up t1 within 1 second, t1 will continue to execute downward after 1 second
                    obj.wait(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("Other codes....");
            }
        },"t1").start();
		}

}

Program running results

16:50:20.502 c.TestWaitNotify [t1] - implement....
16:50:21.506 c.TestWaitNotify [t1] - Other codes....

Correct posture of wait notify

The difference between sleep(long n) and wait(long n)

  • sleep is the method of Thread, while wait is the method of Object
  • sleep does not need to be used with synchronized forcibly, but wait needs to be used with synchronized
  • sleep will not release the object lock while sleeping, but wait will release the object lock while waiting
  • They are all TIMED_WAITING
@Slf4j(topic = "c.Test19")
public class Test19 {

    static final Object lock = new Object(); //It is recommended to add final to all shared variables
    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (lock) {
                log.debug("Acquire lock");
                try {
//                    Thread.sleep(20000);
                    lock.wait(20000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "t1").start();

        Sleeper.sleep(1);
        synchronized (lock) {
            log.debug("Acquire lock");
        }
    }
}

Thread.sleep(20000); The lock will not be released during sleep. It will be given to you when you wake up:

lock.wait(20000); The lock is released during the wait:

step 1

@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep1 {
    static final Object room = new Object();
    static boolean hasCigarette = false; // Is there any smoke
    static boolean hasTakeout = false;

    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (room) {
                log.debug("Any smoke?[{}]", hasCigarette);
                if (!hasCigarette) {
                    log.debug("No smoke, take a break!");
                    sleep(2);
                }
                log.debug("Any smoke?[{}]", hasCigarette);
                if (hasCigarette) {
                    log.debug("You can start working");
                }
            }
        }, "Xiaonan").start();

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                synchronized (room) {
                    log.debug("You can start working");
                }
            }, "Other people").start();
        }

        sleep(1);
        new Thread(() -> {
          // Can I add synchronized (room) here?
          hasCigarette = true;
          log.debug("Here comes the smoke!");
        }, "Cigarette delivery").start();
    }

}

  • Other working threads have to be blocked all the time, which is too inefficient
  • Xiaonan thread can't wake up until it has slept for 2 seconds. Even if the smoke is delivered in advance, it can't wake up immediately
  • After adding synchronized (room), it's like Xiao Nan sleeping inside locking the door, and the smoke can't be sent in at all. Without synchronized (main), it's like the main thread comes in through the window
  • The solution is to use the wait - notify mechanism

Note: if the main thread of cigarette delivery is also synchronized, Xiaonan doesn't leave, and the cigarette can't be delivered; When Xiao Nan left, the smoke was delivered, but the man left before he worked

		sleep(1);
    new Thread(() -> {
        // Can I add synchronized (room) here?
        synchronized (room) {
            hasCigarette = true;
            log.debug("Here comes the smoke!");
        }
    }, "Cigarette delivery").start();
}

step 2

Think about the following implementation, okay? Why?

@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep2 {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeout = false;

    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (room) {
                log.debug("Any smoke?[{}]", hasCigarette);
                if (!hasCigarette) {
                    log.debug("No smoke, take a break!");
                    try {
                        room.wait(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("Any smoke?[{}]", hasCigarette);
                if (hasCigarette) {
                    log.debug("You can start working");
                }
            }
        }, "Xiaonan").start();

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                synchronized (room) {
                    log.debug("You can start working");
                }
            }, "Other people").start();
        }

        sleep(1);
        new Thread(() -> {
            synchronized (room) {
                hasCigarette = true;
                log.debug("Here comes the smoke!");
                room.notify();
            }
        }, "Cigarette delivery").start();
    }

}

  • It solves the problem of thread blocking of other working threads
  • But what if there are other threads waiting for conditions? Will notify, the main thread of cigarette delivery, wake up other threads by mistake?

step 3

@Slf4j(topic = "c.TestCorrectPosture")
public class TestCorrectPostureStep3 {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeout = false;

    // spurious wakeup 
    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (room) {
                log.debug("Any smoke?[{}]", hasCigarette);
                if (!hasCigarette) {
                    log.debug("No smoke, take a break!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("Any smoke?[{}]", hasCigarette);
                if (hasCigarette) {
                    log.debug("You can start working");
                } else {
                    log.debug("No dry survival...");
                }
            }
        }, "Xiaonan").start();

        new Thread(() -> {
            synchronized (room) {
                Thread thread = Thread.currentThread();
                log.debug("Did you deliver the takeout?[{}]", hasTakeout);
                if (!hasTakeout) {
                    log.debug("No takeout, take a break!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("Did you deliver the takeout?[{}]", hasTakeout);
                if (hasTakeout) {
                    log.debug("You can start working");
                } else {
                    log.debug("No dry survival...");
                }
            }
        }, "my daughter").start();

        sleep(1);
        new Thread(() -> {
            synchronized (room) {
                hasTakeout = true;
                log.debug("Here's the takeout!");
//                room.notifyAll();
                room.notify();
            }
        }, "Delivery").start();

    }

}

  • notify can only wake up one thread in the WaitSet randomly. If other threads are waiting at this time, it may not wake up the correct thread, which is called false wake-up
  • The solution is to change to notifyAll

step 4

sleep(1);
new Thread(() -> {
    synchronized (room) {
        hasTakeout = true;
        log.debug("Here's the takeout!");
        room.notifyAll();
    }
}, "Delivery").start();

  • notifyAll only solves the wake-up problem of a thread, but there is only one chance to judge with if + wait. Once the condition is not tenable, there is no chance to judge again
  • The solution is to use while + wait. When the condition is not tenable, wait again

step 5

Change if to while

new Thread(() -> {
    synchronized (room) {
        log.debug("Any smoke?[{}]", hasCigarette);
        if (!hasCigarette) {
            log.debug("No smoke, take a break!");
            try {
                room.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.debug("Any smoke?[{}]", hasCigarette);
        if (hasCigarette) {
            log.debug("You can start working");
        } else {
            log.debug("No dry survival...");
        }
    }
}, "Xiaonan").start();
  • After modification
new Thread(() -> {
    synchronized (room) {
        log.debug("Any smoke?[{}]", hasCigarette);
        while (!hasCigarette) {
            log.debug("No smoke, take a break!");
            try {
                room.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.debug("Any smoke?[{}]", hasCigarette);
        if (hasCigarette) {
            log.debug("You can start working");
        } else {
            log.debug("No dry survival...");
        }
    }
}, "Xiaonan").start();

Correct posture

synchronized(lock) {
  while(The condition is not tenable) {
    lock.wait();
  }
  // work
}

//Another thread
synchronized(lock) {
  lock.notifyAll();
}

Protective pause in synchronization mode

definition

That is, Guarded Suspension, which is used when one thread waits for the execution result of another thread

  • One result needs to be passed from one thread to another to associate them with the same GuardedObject
  • If there are results continuously from one thread to another, you can use message queuing (see producer / consumer)
  • In JDK, the implementation of join and Future adopts this mode
  • Because we have to wait for the result of the other party, we are classified into synchronous mode

realization

/**
 * @author xiexu
 * @create 2022-01-31 10:38 AM
 */
@Slf4j(topic = "c.Test20")
public class Test20 {

		// t1 waits for the download result of t2
    public static void main(String[] args) {
        GuardedObject guardedObject = new GuardedObject();
        new Thread(() -> {
            // Waiting for results
            log.debug("Waiting for results");
            List<String> list = (List<String>) guardedObject.get();
            log.debug("Result size:{}", list.size());
        }, "t1").start();

        new Thread(() -> {
            log.debug("Perform Download");
            try {
                List<String> list = Downloader.download();
                //Produce results
                guardedObject.complete(list);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }, "t2").start();
    }

}

class GuardedObject {

    // result
    private Object response;
    private final Object lock = new Object();

    // Get results
    public Object get() {
        synchronized (lock) {
            // If there is no result, wait all the time
						// Prevent false Awakening
            while (response == null) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return response;
        }
    }

    // Produce results
    public void complete(Object response) {
        synchronized (lock) {
            // Assign values to member variables
            this.response = response;
            // Results are generated and the waiting thread is notified
            lock.notifyAll();
        }
    }

}

Downloader class

public class Downloader {
    public static List<String> download() throws IOException {
        HttpURLConnection conn = (HttpURLConnection) new URL("https://www.baidu.com/").openConnection();
        List<String> lines = new ArrayList<>();
        try (BufferedReader reader =
                     new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {
            String line;
            while ((line = reader.readLine()) != null) {
                lines.add(line);
            }
        }
        return lines;
    }
}

GuardedObject with timeout version

// Increase timeout effect
class GuardedObject1 {

    // result
    private Object response;
    private final Object lock = new Object();

    // Get results
    // timeout: indicates how long to wait
    public Object get(long timeout) {
        synchronized (lock) {
            // 1. Start time
            long begin = System.currentTimeMillis();
            // 2. Time experienced
            long passesTime = 0;
						// If there is no result, wait all the time
						// Prevent false Awakening
            while (response == null) {
                // The waiting time of this cycle (assuming that the timeout is 1000 and the result is awakened at 400, there is still 600 to wait)
                long waitTime = timeout - passesTime;
                // Exit the loop when the elapsed time exceeds the maximum waiting time
                if (waitTime <= 0) {
                    break;
                }
                try {
										// this. The problem of wait (timeout): when the false wake-up occurs at 15:00:01, the response is still null, and the elapsed time is 1s,
                    // When entering the while loop, the response is still empty. At this time, judge 1s < = timeout 2S, and this again Wait (2S)? At this time, 1s has been experienced,
                    // So just wait a second So the waiting time should be timeout - elapsed time
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 3. Calculate the elapsed time (if you are awakened in advance, the elapsed time is assumed to be 400)
                passesTime = System.currentTimeMillis() - begin;
            }
            return response;
        }
    }

    // Produce results
    public void complete(Object response) {
        synchronized (lock) {
            // Assign values to member variables
            this.response = response;
            // Results are generated and the waiting thread is notified
            lock.notifyAll();
        }
    }

}

When the result is not timed out

		// t1 waits for the download result of t2
    public static void main(String[] args) {
        GuardedObject1 guardedObject = new GuardedObject1();
        new Thread(() -> {
            // Waiting for results
            log.debug("Waiting for results");
            Object response = guardedObject.get(2000);
            log.debug("Result size:{}", response);
        }, "t1").start();

        new Thread(() -> {
            log.debug("Perform Download");
            try {
                Thread.sleep(1000);
                //Produce results
                guardedObject.complete(new Object());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "t2").start();
    }

In case of result timeout

new Thread(() -> {
            log.debug("Perform Download");
            try {
                Thread.sleep(3000);
                //Produce results
                guardedObject.complete(new Object());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "t2").start();

Simulated false wake-up

new Thread(() -> {
            log.debug("Perform Download");
            try {
                Thread.sleep(1000);
                //Produce results
                guardedObject.complete(null);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "t2").start();

Principle join

t. The join () method blocks the calling thread calling this method from entering {TIMED_WAITING , state, and this thread will continue until the execution of thread t is completed;

It is usually used in the main() main thread to wait for other threads to finish before ending the main() main thread

  • Is the caller polling to check the thread alive status
t1.join();
  • Equivalent to the following code
synchronized (t1) {
  // The caller thread enters the waitSet of t1 and waits until t1 runs
  // Here, the t1 thread object is used as a lock
  while (t1.isAlive()) {
    // The calling thread entered the waitSet of lock t1
    // Note that the calling thread is not t1, where t1 is used as a lock rather than as a thread
    // The calling thread is other threads, generally the main thread
    t1.wait(0);
  }
}

join source code

public final void join() throws InterruptedException {
    join(0);
}
public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
						// The timeout enhancement principle is the same as above
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }
  • From the code, we can find. When millis==0, it will enter the while (isalive()) cycle; That is, as long as the child thread is alive, the main thread keeps waiting.
  • The function of wait() is to make the "current thread" wait, and the "current thread" here refers to the currently running thread. Although it calls the wait () method of the child thread, it is called through the "main thread"; Therefore, it is the main thread that sleeps, not the "child thread"!
  • The Thread t in the example is just an object. isAlive() determines whether the current object (the t object in the example) is alive. wait() blocks the thread currently executing (generally the main method)
  • As you can see, the Join method is implemented through wait(). When the main thread calls t.join, the main thread will obtain the lock of the thread object t (wait means getting the lock of the object) and call the wait() of the object until the object wakes up the main thread, such as after exiting. This means that when the main thread calls t.join, it must be able to get the lock of the thread t object.

Multi tasking GuardedObject

  • In the picture, Futures is like a mailbox on the first floor of a residential building (each mailbox has a room number). t0, t2 and t4 on the left are like residents waiting for mail, and t1, t3 and t5 on the right are like postmen
  • If you need to use the GuardedObject object object between multiple classes, it is not very convenient to pass it as a parameter. Therefore, design an intermediate class for decoupling, which can not only decouple the [result waiting] and the [result producer], but also support the management of multiple tasks at the same time

The new id is used to identify the Guarded Object

class GuardedObject {

    // Identify GuardedObject
    private int id;

    public GuardedObject(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }

    // result
    private Object response;
    private final Object lock = new Object();

    // Get results
    // timeout: indicates how long to wait
    public Object get(long timeout) {
        synchronized (lock) {
            // 1. Start time
            long begin = System.currentTimeMillis();
            // 2. Time experienced
            long passesTime = 0;
            // If there is no result, wait all the time
            // Prevent false Awakening
            while (response == null) {
                // The waiting time of this cycle (assuming that the timeout is 1000 and the result is awakened at 400, there is still 600 to wait)
                long waitTime = timeout - passesTime;
                // Exit the loop when the elapsed time exceeds the maximum waiting time
                if (waitTime <= 0) {
                    break;
                }
                try {
                    // this. The problem of wait (timeout): when the false wake-up occurs at 15:00:01, the response is still null, and the elapsed time is 1s,
                    // When entering the while loop, the response is still empty. At this time, judge 1s < = timeout 2S, and this again Wait (2S)? At this time, 1s has been experienced,
                    // So just wait a second So the waiting time should be timeout - elapsed time
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 3. Calculate the elapsed time (if you are awakened in advance, the elapsed time is assumed to be 400)
                passesTime = System.currentTimeMillis() - begin;
            }
            return response;
        }
    }
    
    // Produce results
    public void complete(Object response) {
        synchronized (lock) {
            // Assign values to member variables
            this.response = response;
            // Results are generated and the waiting thread is notified
            lock.notifyAll();
        }
    }

}

Intermediate decoupling class (Mailboxes is only used to produce GuardedObject with id, so that GuardedObject objects can be used between multiple classes, so that multiple objects, i.e. postman and recipient, can correspond one by one)

// Mailbox class
class Mailboxes {

    // Hashtable is thread safe
    private static Map<Integer, GuardedObject> boxes = new Hashtable<>();

    private static int id = 1;

    // Production unique id
    private static synchronized int generateId() {
        return id++;
    }

    // HashTable is thread safe and does not need to be synchronized
    public static GuardedObject getGuardedObject(int id) {
				//Get the box according to the id and delete the corresponding key and value to avoid heap memory explosion
        return boxes.remove(id);
    }

    // HashTable is thread safe and does not need to be synchronized
    public static GuardedObject createGuardedObject() {
        GuardedObject go = new GuardedObject(generateId());
        boxes.put(go.getId(), go);
        return go;
    }

    // HashTable is thread safe and does not need to be synchronized
    public static Set<Integer> getIds() {
        return boxes.keySet();
    }

}

Business related

@Slf4j(topic = "c.People")
// Resident class
class People extends Thread {

    @Override
    public void run() {
        // Receive a letter
        GuardedObject guardedObject = Mailboxes.createGuardedObject();
        log.debug("Start receiving letters id:{}", guardedObject.getId());
        Object mail = guardedObject.get(5000);
        log.debug("Received letter id:{}, content:{}", guardedObject.getId(), mail);
    }
}
@Slf4j(topic = "c.Postman")
// Mailman class
class Postman extends Thread {

    private int id;
    private String mail;

    public Postman(int id, String mail) {
        this.id = id;
        this.mail = mail;
    }

    @Override
    public void run() {
        GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
        log.debug("Deliver a letter id:{}, content:{}", id, mail);
        guardedObject.complete(mail);
    }
}

Test class

/**
 * @author xiexu
 * @create 2022-01-31 10:38 AM
 */
@Slf4j(topic = "c.GuardedObjectTest")
public class GuardedObjectTest {

    public static void main(String[] args) {
        // Create 3 residents
        for (int i = 0; i < 3; i++) {
            new People().start();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (Integer id : Mailboxes.getIds()) {
            new Postman(id, "content" + id).start();
        }
    }

}

Program running results

Producer / consumer of asynchronous mode (key)

definition

  • Unlike the GuardObject in the previous protective pause, there is no need for one-to-one correspondence between the threads that produce and consume results (one production and one consumption)
  • Consumption queues can be used to balance thread resources for production and consumption
  • The producer is only responsible for generating the result data and does not care about how to deal with the data, while the consumer focuses on dealing with the result data
  • Message queues are limited in capacity. When full, data will not be added, and when empty, data will not be consumed
  • Various blocking queues in JDK adopt this mode

In asynchronous mode, the message is not consumed immediately after the producer generates the message

In synchronous mode, messages are consumed immediately after they are generated

realization

/**
 * @author xiexu
 * @create 2022-01-31 12:36 PM
 */
@Slf4j(topic = "c.ProductConsumerTest")
public class ProductConsumerTest {

    public static void main(String[] args) {
        MessageQueue queue = new MessageQueue(2);
        for (int i = 0; i < 3; i++) {
            int id = i;
            new Thread(() -> {
                queue.put(new Message(id, "value" + id));
            }, "producer" + i).start();
        }

        new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Message message = queue.take();
            }
        }, "consumer").start();
    }

}

// Message queue class, which communicates between java threads
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
    // Queue collection of messages
    private LinkedList<Message> list = new LinkedList<>();
    // Capacity of queue
    private int capcity;

    public MessageQueue(int capcity) {
        this.capcity = capcity;
    }

    // Get message
    public Message take() {
        // Check whether the queue is empty
        synchronized (list) {
            while (list.isEmpty()) {
                try {
                    log.debug("Queue is empty, Consumer thread wait");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // Get the message from the queue header and return it
            Message message = list.removeFirst();
            log.debug("Consumed message {}", message);
            list.notifyAll();
            return message;
        }
    }

    // Save message
    public void put(Message message) {
        synchronized (list) {
            // Check whether the queue is full
            while (list.size() == capcity) {
                try {
                    log.debug("The queue is full, Producer thread wait");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // Add a message to the end of the queue
            list.addLast(message);
            log.debug("Produced message {}", message);
            list.notifyAll();
        }
    }

}

final class Message {
    private int id;
    private Object value;

    public Message(int id, Object value) {
        this.id = id;
        this.value = value;
    }

    public int getId() {
        return id;
    }

    public Object getValue() {
        return value;
    }

    @Override
    public String toString() {
        return "Message{" + "id=" + id + ", value=" + value + '}';
    }
}

Park & Unpark

Basic use

  • They are methods in the LockSupport class
// Pauses the current thread
LockSupport.park(); 
// Resume the operation of a thread
LockSupport.unpark(Pause thread object)

Call park first and then unpark

@Slf4j(topic = "c.TestParkUnpark")
public class TestParkUnpark {

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            log.debug("start...");
            sleep(1);
            log.debug("park...");
            LockSupport.park();
            log.debug("resume...");
        }, "t1");
        t1.start();

        sleep(2);
        log.debug("unpark...");
        LockSupport.unpark(t1);
    }

}

Call unpark first and then park. At this time, park will not pause the thread

@Slf4j(topic = "c.TestParkUnpark")
public class TestParkUnpark {

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            log.debug("start...");
            sleep(2); // Call park after 2 seconds
            log.debug("park...");
            LockSupport.park();
            log.debug("resume...");
        }, "t1");
        t1.start();

        sleep(1); // Call unpark after 1 second.
        log.debug("unpark...");
        LockSupport.unpark(t1);
    }

}

characteristic

Compared with wait & notify of Object

  • wait, notify and notifyAll must be used together with Object Monitor, while park and unpark do not
  • Park & unpark refers to [blocking] and [waking up] threads in the unit of threads, while notify can wake up only one waiting thread randomly. notifyAll refers to waking up all waiting threads, which is not so [accurate]
  • Park & unpark can unpark first, while wait & notify cannot notify first

Park & unpark principle

Each thread has its own Parker object, which is composed of three parts_ counter , _ cond and_ mutex, make an analogy

  • Thread is like a traveler, Parker is like his backpack, and condition variables are like tents in his backpack_ counter is like the spare dry food in the backpack (0 is exhausted and 1 is sufficient)
  • Call park to see if you need to stop and rest
    • If the spare dry food runs out, get into the tent and rest
    • If there is enough spare dry food, you don't need to stop and move on
  • Calling unpark is like making enough dry food
    • If the thread is still in the tent, wake up and let him move on
    • If the thread is still running at this time, the next time he calls park, he will only consume the spare dry food without staying and moving on
    • Because the backpack space is limited, calling unpark multiple times will only replenish one spare dry food

The procedure of calling park first and then upark

  • Call park first
    • The current thread calls {unsafe Park() method
    • Check_ counter, which is 0 in this case. In this case, the_ Mutex mutex (mutex object has a waiting queue _cond)
    • Thread entry_ cond conditional variable blocking
    • Set_ counter = 0 (no dry food)

  • Call unpark
    • Call unsafe Unpark (thread_0) method, setting_ counter is 1
    • Awaken_ Thread in cond condition variable_ 0
    • Thread_0 # resume operation
    • Set_ counter is 0

The procedure of calling upark first and then park

  • Call {unsafe Unpark (thread_0) method, setting_ counter is 1
  • The current thread calls {unsafe Park() method
  • Check_ counter, this case is , 1. At this time, the thread , does not need to be blocked and continues to run
  • Set_ counter is 0

Keywords: Java Multithreading synchronized

Added by celsoendo on Wed, 02 Feb 2022 11:43:17 +0200