2018.05. 08-1. Thread foundation, sharing and collaboration between threads - Golden Lion | may 2018 08-2. Thread concurrency tool class - Golden Lion mp4

Sharing between threads can prevent shared variable conflicts and thread safety problems. Therefore, there must be a locking mechanism. These are used to solve synchronized built-in locks, volatile and ThreadLocal. What if it is necessary to cooperate before threads?

Inter thread collaboration

Sometimes we need a thread to modify the value of an object, and then another thread will work after sensing the change of the value. The former thread is like a producer, and the latter thread senses the change like a consumer. How can consumers perceive the change?

The first kind of rotation training} is that consumers regularly rotate the object to see whether the value changes. The method of using rotation training has two disadvantages:

  • It is difficult to guarantee timeliness
  • High resource overhead

The other is the waiting and notification mechanism. After the producer modifies the value of the object, it actively sends a semaphore to notify the consumer waiting for the change of the value of the object.

java uses wait and notify/notifyAll methods (note that these Thread collaboration methods are defined in Object and are specific to each Object). For example, sleep is a Thread thread class method, and wait releases the lock, while sleep does not release the lock. It should be clearly distinguished.

In the actual development process, the thread cooperation part generally follows a standard paradigm

Standard paradigm for waiting and notification

Waiting party:

1. Obtain the lock of the object;

2. Judge whether the conditions are met in the loop. If not, call the wait method,

3. Execute business logic when conditions are met

Notifying party:

1. Obtain the lock of the object;

2. Change conditions

3. Notifies all threads waiting on the object

We will complete an example of thread collaboration according to this standard paradigm:

 1 /**
 2  * Class description: the express class defines how many kilometers the express has traveled and which site it has reached
 3  * It also defines the notification method after changing km / arrival and the waiting km / arrival change method
 4  */
 5 public class Express {
 6     public final static String CITY = "ShangHai";
 7     private int km;/*Express transportation mileage*/
 8     private String site;/*Express arrival place*/
10     public Express() {
11     }
13     public Express(int km, String site) {
14         this.km = km;
15         this.site = site;
16     }
18     /* Change the kilometers, and then notify the thread in the wait state that needs to process the kilometers for business processing*/
19     public synchronized void changeKm() {
20         this.km = 101;
21         notifyAll();
22         //Other business codes
23     }
25     /* Change the location, and then notify the thread that is in the wait state and needs to process the location for business processing*/
26     public synchronized void changeSite() {
27         this.site = "BeiJing";
28         notifyAll();
29     }
31     /*Wait for the change of kilometers and make other business codes after meeting the conditions*/
32     public synchronized void waitKm() {
33         while (this.km <= 100) {
34             try {
35                 System.out.println("check Km thread[" + Thread.currentThread().getId()
36                         + "] is will wait.");
37                 wait();
38                 //Execution to the following sentence indicates waiting to be awakened
39                 System.out.println("check km thread[" + Thread.currentThread().getId()
40                         + "] is be notified.");
41             } catch (InterruptedException e) {
42                 e.printStackTrace();
43             }
44         }
45         //Business operation
46         System.out.println("the km is" + this.km + ",I will  XXX...");
47     }
49     /*Wait for the arrival point to change and make other business codes after meeting the conditions*/
50     public synchronized void waitSite() {
51         while (CITY.equals(this.site)) {
52             try {
53                 System.out.println("check site thread[" + Thread.currentThread().getId()
54                         + "] is will wait.");
55                 wait();
56                 //Execution to the following sentence indicates waiting to be awakened
57                 System.out.println("check site thread[" + Thread.currentThread().getId()
58                         + "] is be notified.");
59             } catch (InterruptedException e) {
60                 e.printStackTrace();
61             }
62         }
63         //Business operation
64         System.out.println("the site is" + this.site + ",I will XXX...");
65     }
66 }

The thread executing the waitKm() method waits for the mileage to be greater than 100 before performing other operations. After the thread executing the changeKm() method changes the mileage to be greater than 100, it notifies the thread waiting for the mileage to continue to execute. The method of changing the arrival point is similar to that of waiting for the arrival point. The lock should be obtained before waiting for wake-up, so the synchronized keyword is added to the four main methods.

Test class:

 1 public class TestWN {
 2     private static Express express = new Express(0, Express.CITY);
 4     /*Check the thread with mileage change. If the condition is not met, the thread has been waiting*/
 5     private static class CheckKm extends Thread {
 6         @Override
 7         public void run() {
 8             express.waitKm();
 9         }
10     }
12     /*Check the thread whose location changes. If the conditions are not met, the thread has been waiting*/
13     private static class CheckSite extends Thread {
14         @Override
15         public void run() {
16             express.waitSite();
17         }
18     }
20     public static void main(String[] args) throws InterruptedException {
21         for (int i = 0; i < 3; i++) {//Three threads monitor changes in the city
22             new CheckSite().start();
23         }
24         for (int i = 0; i < 3; i++) {//Monitor mileage changes
25             new CheckKm().start();
26         }
28         Thread.sleep(1000);//Sleep here for 1 second to ensure that the above threads have been started
29         express.changeKm();//After the express mileage change thread is started, it is in the waiting state
30     }
32 }

Three threads are started to monitor the mileage or arrival point city change respectively. What the thread does after starting is to wait for the same express mileage or city change. In line 29, we changed the express mileage to 101

Execution results:

 1 check site thread[12] is will wait.
 2 check site thread[13] is will wait.
 3 check site thread[14] is will wait.
 4 check Km thread[16] is will wait.
 5 check Km thread[15] is will wait.
 6 check Km thread[17] is will wait.
 7 check km thread[17] is be notified.
 8 the km is101,I will  XXX...
 9 check km thread[15] is be notified.
10 the km is101,I will  XXX...
11 check km thread[16] is be notified.
12 the km is101,I will  XXX...
13 check site thread[14] is be notified.
14 check site thread[14] is will wait.
15 check site thread[13] is be notified.
16 check site thread[13] is will wait.
17 check site thread[12] is be notified.
18 check site thread[12] is will wait.

Lines 1-6 are 6 threads that are waiting for Km / city change. After changing the Km, we notifyAll the 6 threads waiting for this object in changeKm (lines 7-18). Only the threads waiting for Km meet the business conditions of more than 100 Km, execute the industry code (8, 10, 12), and find the city after waiting for the city change method to be awakened

The shanghai thread continues to wait (14, 16, 18).

Use notify or notifyAll?

In the above example, we wake up with notityAll instead of notity. If we change the wake-up method in changeKm to notify, only one of the six waiting threads will wake up after receiving a semaphore, and the other five threads will not wake up because of semaphore loss. Therefore, unless it is clearly known that only one thread is waiting, others are recommended

NotifyAll method to prevent threads from waiting for a long time or even forever. Therefore, in order to prevent semaphore loss, try to use notifyAll.

Wait timeout mode

In fact, the above example has loopholes. We can think about it. If a thread never wakes up after it enters the waiting state or fails to meet the business conditions after it wakes up and enters the waiting state again, when it wakes up after it enters the waiting state, it becomes uncontrollable or even stuck here forever. Therefore, generally, we will introduce the waiting timeout mode to wake up after overtime waiting, As we know, connection pool fetches connections

There is a timeout, you can't wait forever.

The pseudo code of wait timeout mode is as follows:

//Suppose the waiting timeout is T   current time  now + T Timeout after time
long  overtime = now+T;
long  remain = T;//Initialize as T  Waiting time remaining

while(result Conditions not met&&remain>0){//If remain<0 If you have not met the conditions, you will not continue to wait
      wait(remain);//Here we are no longer direct wait(),It indicates how long to wait
      remain = overtime – now;//Because the last step wait It is possible that the waiting time of being awakened in advance by other threads has not been updated forever, and the remaining duration is not updated here

return result;

Next, we use the wait timeout mode to simulate and implement a database connection pool.

First, implement the Connection interface to simulate the implementation of a database Connection, and define the fetchConnection method to create a database Connection. (Connection implementation is not the focus of this example, which is simply simulated here)

 *Class description: simulate the implementation of database connection
public class SqlConnectImpl implements Connection{
    /*Get a database connection*/
    public static final Connection fetchConnection(){
        return new SqlConnectImpl();

Then we implement the custom connection pool DBPool class, which mainly implements the fetchConn method to fetch the connection in the wait timeout mode, and defines the releaseConn method to put back the pool connection after use.

 1 /**
 2  *Class description: implement a database connection pool
 3  */
 4 public class DBPool {
 6     //Container for database pool
 7     private static LinkedList<Connection> pool = new LinkedList<>();
 8     //When instantiating the pool, initialize the number of connections and put them into the pool
 9     public DBPool(int initalSize) {
10         if(initalSize>0) {
11             for(int i=0;i<initalSize;i++) {
12                 pool.addLast(SqlConnectImpl.fetchConnection());
13             }
14         }
15     }
17     //Take the connection method and wait for the timeout mode mills If the database connection cannot be obtained within the time, an error is returned null,Discard fetch connection
18     public Connection fetchConn(long mills) throws InterruptedException {
19         synchronized (pool) {
20             //If you take the time parameter passed in during connection<0 It means never timeout
21             if (mills<0) {
22                 while(pool.isEmpty()) {
23                     pool.wait();
24                 }
25                 return pool.removeFirst();
26             }else {
27                 //Core code of wait timeout mode
28                 long overtime = System.currentTimeMillis()+mills;
29                 long remain = mills;
30                 while(pool.isEmpty()&&remain>0) {
31                     pool.wait(remain);
32                     remain = overtime - System.currentTimeMillis();
33                 }
34                 Connection result  = null;
35                 if(!pool.isEmpty()) {
36                     result = pool.removeFirst();
37                 }
38                 return result;
39             }
40         }
41     }
43     //Method of putting back database connection
44     public void releaseConn(Connection conn) {
45         if(conn!=null) {
46             synchronized (pool) {
47                 pool.addLast(conn);
48                 //When a thread releases a connection, it wakes up the thread waiting for another connection
49                 pool.notifyAll();
50             }
51         }
52     }
53 }

Lines 27-38 are the core code for waiting for the timeout time when taking the connection, that is, the implementation of the pseudo code above. Line 49 when putting back the database connection, finally call notifyAll to wake up the thread waiting to get the connection.

We write code to test. The scenario is to use 50 threads to get the connections in the connection pool. Each thread attempts to get 20 times for operation, that is, it will try to get 1000 times in total. Finally, we count how many times it is successful and how many times it has timed out and waited for no connection.

 1 /**
 2  *
 3  *Class description: test class of connection pool
 4  */
 5 public class DBPoolTest {
 6     static DBPool pool  = new DBPool(10);
 7     // controller:control main The thread will wait for all Woker Execution can continue only after completion
 8     static CountDownLatch end;
10     public static void main(String[] args) throws Exception {
11         // Number of threads
12         int threadCount = 50;
13         end = new CountDownLatch(threadCount);
14         int count = 20;//The number of attempts to connect to the operation database per thread
15         AtomicInteger got = new AtomicInteger();//Counter: counts the number of connections
16         AtomicInteger notGot = new AtomicInteger();//Counter: counts the number of connections not obtained
17         for (int i = 0; i < threadCount; i++) {
18             Thread thread = new Thread(new Worker(count, got, notGot), 
19                     "worker_"+i);
20             thread.start();
21         }
22         end.await();// main The thread waits here for the operation of all threads to complete main The thread performs the following statistical operations
23         System.out.println("Total attempts: " + (threadCount * count));
24         System.out.println("Number of connections obtained:  " + got);
25         System.out.println("Number of failed connections: " + notGot);
26     }
28     static class Worker implements Runnable {
29         int           count;
30         AtomicInteger got;
31         AtomicInteger notGot;
33         public Worker(int count, AtomicInteger got,
34                                AtomicInteger notGot) {
35             this.count = count;
36             this.got = got;
37             this.notGot = notGot;
38         }
40         public void run() {
41             while (count > 0) {
42                 try {
43                     // Get a connection from the thread pool, if 1000 ms Unable to get in, will return null
44                     // Count the number of connections obtained respectively got And quantity not obtained notGot
45                     Connection connection = pool.fetchConn(1000);
46                     if (connection != null) {
47                         try {
48                             //Simulate database business operation
49                             connection.createStatement();
50                             connection.commit();
51                         } finally {
52                             pool.releaseConn(connection);
53                             got.incrementAndGet();
54                         }
55                     } else {
56                         notGot.incrementAndGet();
57                         System.out.println(Thread.currentThread().getName()
58                                 +"Get connection wait timeout!");
59                     }
60                 } catch (Exception ex) {
61                 } finally {
62                     count--;
63                 }
64             }
65             //After all 20 operations of this thread are completed, the counter is decremented by one
66             end.countDown();
67         }
68     }
69 }

50 worker s (threads) go to get the connection pool for 20 operations. 45 lines of 1000 milliseconds timeout if they do not get the connection. After each thread ends, the synchronization counter decreases by one (66 lines). In line 22, wait for all threads to complete, and then the main thread prints the statistics. got is the sum of the total number of connections obtained by all threads, and notGot is not obtained.

Test results:

worker_7 Get connection wait timeout!
worker_25 Get connection wait timeout!
worker_36 Get connection wait timeout!
worker_26 Get connection wait timeout!
worker_16 Get connection wait timeout!
worker_29 Get connection wait timeout!
worker_33 Get connection wait timeout!
worker_41 Get connection wait timeout!
worker_27 Get connection wait timeout!
Total attempts: 1000
Number of connections obtained:  875
Number of failed connections: 125

Process finished with exit code 0

A total of 125 connections were obtained. The waiting timeout did not get the connection. This is the application of waiting timeout to prevent waiting indefinitely.

 ====================================================================================================================2018.05. 08-2. Thread concurrency tool class (1) - Golden Lion mp4=====================================================================

Join method "queue jumping"

Sometimes we need to let other threads "jump in the queue" during the execution of one thread, and then execute its subsequent work. At this time, we can use the join method. (interview point)

Join: thread A executes the join method of thread B. thread A must wait for B to complete its execution before thread A can continue its work.

Of course, the thread joined can also join other threads. Just like you let others jump in front of you, this person may also let his roommate jump in front of him. When you can get dinner is not certain

Let's use an example to demonstrate the use of join

 1 /**
 2  *
 3  *Class description: demonstrates the use of the join method
 4  */
 5 public class UseJoin {
 8     static class JumpQueue implements Runnable {
 9         private Thread thread;//Thread used to jump the queue
11         public JumpQueue(Thread thread) {
12             this.thread = thread;
13         }
15         public void run() {
16             try {
17                 System.out.println(thread.getName()+" will be join before "
18                         +Thread.currentThread().getName());
19                 thread.join();
20             } catch (InterruptedException e) {
21                 e.printStackTrace();
22             }
23             System.out.println(Thread.currentThread().getName()+" terminted.");
24         }
25     }
27     public static void main(String[] args) throws Exception {
28         Thread previous = Thread.currentThread();//The thread that will jump the queue is initialized and is now the main thread
29         for (int i = 0; i < 10; i++) {
30             Thread thread = new Thread(new JumpQueue(previous),String.valueOf(i));
31             thread.start();
32             //I just want to sleep here to make the order of queue jumping logs regular, because start The method is just ready cpu It doesn't have to run. It sleeps and waits. It's almost confirmed that it's executed start Ready next thread queue jumping log
33             SleepTools.second(1);
34             //Update the thread to be queued to the current thread
35             previous=thread;
36         }
38         SleepTools.second(2);//Let the main thread sleep for 2 seconds
39         System.out.println(Thread.currentThread().getName() + " terminate.");
40     }
41 }

Each JumpQueue has a thread that wants to be cut in front of it. 10 threads are started to do JumpQueue.

Print results:

main will be join before 0
0 will be join before 1
1 will be join before 2
2 will be join before 3
3 will be join before 4
4 will be join before 5
5 will be join before 6
6 will be join before 7
7 will be join before 8
8 will be join before 9
main terminate.
0 terminted.
1 terminted.
2 terminted.
3 terminted.
4 terminted.
5 terminted.
6 terminted.
7 terminted.
8 terminted.
9 terminted.

Process finished with exit code 0

 main,0,1,2... Threads in front of the thread queue up to the next thread, waiting one by one. 9 waits for 8 to finish executing, 8 waits for 7 to finish executing Only the first main is executed, 0 is executed, 1 is executed, 2

Summarize the impact of} calling yield(), sleep(), wait(), notify() and other methods on locks

Finally, we summarize the impact of several main thread sharing and collaboration basic methods on locks.

Yield and sleep are both thread methods, thread yield(),Thread. Sleep () hands over the cpu execution right, but does not release the lock. The difference is that after the yield method hands over the cpu, it will enter the runnable state (ready state) and continue to compete for the cpu execution right, while sleep enters the blocking state during sleep and will enter the ready state after sleep.

The wait, notify/nofifyAll methods and Object methods need to obtain the lock before execution. After the wait method, the lock will be released and the cpu execution right will be handed over to enter the blocking state,

Notify will not actively release the lock, but will only release the lock after the code block of the lock is executed (because if there is business code after calling notify in the code block holding the lock, you have to continue to execute. It is impossible to lose the lock immediately, but generally, in order to wake up the waiting thread to better obtain the lock, you call notify wake up in the last line of the lock code block)

So the way to release the lock is wait. notify, yield and sleep do not release the lock.


20: Thread foundation, thread sharing and collaboration end



Added by Xpheyel on Mon, 03 Jan 2022 00:46:00 +0200