Concurrency - the Java platform has APIs to help you develop multithreaded programs
Computer users take it for granted that their system can do more than one thing at a time. They assume that they can continue to work in the word processor, while other applications download files, manage print queues, and stream audio. Even a single application is often expected to do more than one thing at a time. For example, a streaming audio application must simultaneously read digital audio from the network, decompress, manage playback, and update its display. Even a word processor should always be ready to respond to keyboard and mouse events, no matter how busy it is reformatting text or updating the display. Software that can do these things is called concurrent software.
The Java platform is designed from scratch to support concurrent programming, and provides basic concurrency support in the Java programming language and Java class library. Starting with version 5.0, the Java platform also includes advanced concurrency APIs. This lesson introduces the basic concurrency support of the platform and summarizes Java util. Some advanced APIs in the concurrent package.
Processes and threads
In concurrent programming, there are two basic execution units: process and thread. In the Java programming language, concurrent programming mainly focuses on threads. However, the process is also important.
Computer systems usually have many active processes and threads. This is true even in a single core system, so there is only one thread actually executing at any given time. The processing time of a single core is shared between processes and threads through an operating system feature called time slicing
Computer systems with multiple processors or processors with multiple execution cores are becoming more and more common. This greatly enhances the ability of the system to execute processes and threads concurrently -- but concurrency is possible even on a simple system without multiple processors or execution cores
process
A process has an independent execution environment. A process usually has a complete and private set of basic runtime resources; In particular, each process has its own memory space.
Processes are often considered synonymous with programs or applications. However, a single application seen by a user may actually be a set of collaborative processes. To facilitate communication between processes, most operating systems support Inter Process Communication(IPC) resources, such as pipes and sockets. IPC is used not only for the communication between processes on the same system, but also for the communication between processes on different systems.
Most implementations of the Java virtual machine run as a single process. Java applications can use ProcessBuilder objects to create other processes. Multi process applications are beyond the scope of this lesson.
thread
Threads are sometimes referred to as lightweight processes. But creating a new thread requires less resources than creating a new process.
Threads exist in processes -- at least one per process. Threads share the resources of the process, including memory and open files. This makes communication more efficient, but there are also potential problems.
Multithreading is a basic feature of the Java platform. If you calculate "system threads" like threads for memory management and signal processing, you will find that each application has at least one thread - or several. But from the perspective of application programmers, only one thread is required at the beginning, which is called the main thread. This thread can create additional threads, which we will demonstrate in the next section.
Thread object
Each Thread is associated with an instance of the class. There are two basic strategies for creating concurrent applications using Thread objects.
- To directly control the creation and management of threads, only the Thread needs to be instantiated every time the application needs to start an asynchronous task.
- To abstract thread management from the rest of the application, pass the application's tasks to the executor.
This section describes the use of Thread objects. executor is discussed with other advanced concurrency objects.
Define and start threads
The application that creates the Thread instance must provide the code that will run in that Thread. There are two ways to do this:
-
Implement the Runnable interface. The Runnable interface defines a separate method run, which is used to contain the code executed in the Thread. The Runnable object is passed to the Thread constructor, as in the helloronnable example:
public class HelloRunnable implements Runnable { public void run() { System.out.println("Hello from a thread!"); } public static void main(String args[]) { (new Thread(new HelloRunnable())).start(); } }
-
Inherits the Thread class. The Thread class itself implements Runnable, although its run method does nothing. Applications can inherit threads and provide their own run implementation, such as the example of HelloThread:
public class HelloThread extends Thread { public void run() { System.out.println("Hello from a thread!"); } public static void main(String args[]) { (new HelloThread()).start(); } }
Note that both examples call thread Start() to start a new thread.
Which one should you use? The first usage uses the Runnable object, which is more general because the Runnable object can inherit classes other than Thread. The second usage is easier to use in simple applications, but is limited to that the task class must be a subclass of Thread. This lesson focuses on the first method, which separates the Runnable task from the Thread object that executes the task. This method is not only more flexible, but also suitable for the advanced Thread management api introduced later.
The Thread class defines many methods for Thread management. These methods include static methods that provide information about the Thread calling the method or methods that affect its state. Other methods are used by other threads for Thread management and other Thread objects. We will study some of these methods in the following sections.
Pause a thread with sleep
Thread.sleep causes the current thread to suspend execution within the specified time. This is an effective way to make processors available to other threads of an application or other applications that may be running on a computer system. The sleep method can also be used to adjust the rhythm (as shown in the following example) and wait for another thread with time requirements (such as the SimpleThreads example in the later section).
Two overloaded versions of sleep methods are provided: one specifies the sleep time as milliseconds and the other specifies the sleep time as nanoseconds. However, there is no guarantee that these sleep times are accurate because they are limited by the tools provided by the underlying operating system. In addition, sleep cycles can be terminated by interrupts, as we will see in the later section. In any case, you cannot assume that calling sleep will just suspend the thread within the specified time.
The SleepMessages sample uses sleep to print messages at 4-second intervals:
public class SleepMessages { public static void main(String args[]) throws InterruptedException { String importantInfo[] = { "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" }; for (int i = 0; i < importantInfo.length; i++) { //Pause for 4 seconds Thread.sleep(4000); //Print a message System.out.println(importantInfo[i]); } } }
Notice that the main method throws an InterruptedException exception. This is an exception thrown by sleep when another thread interrupts the current thread when sleep is active. Since the application does not define another thread that causes an interrupt, it does not need to catch an InterruptedException.
Interrupts
An interrupt is an indication to a thread that it should stop doing something and do something else. It is up to the programmer to determine how the thread responds to interrupts accurately, but thread termination is common. This is the usage emphasized in this lesson.
The thread sends an interrupt by calling interrupt on the thread object. In order for the interrupt mechanism to work properly, the interrupted thread must support its own interrupt.
Support interrupt
How do threads support their own interrupts? It depends on what it is currently doing. If a thread frequently calls a method that throws an InterruptedException, it only needs to return from the run method after catching the exception. For example, suppose the for loop in the SleepMessages example is in the run method of the Runnable object of the thread. Then it may be modified as follows to support interrupts:
for (int i = 0; i < importantInfo.length; i++) { // Pause for 4 seconds try { Thread.sleep(4000); } catch (InterruptedException e) { // We've been interrupted: no more messages. return; } // Print a message System.out.println(importantInfo[i]); }
Many methods that throw InterruptedException, such as sleep, are designed to cancel the current operation and return immediately when an interrupt is received.
What if a thread runs for a long time without calling the method that throws InterruptedException? Then it must call thread periodically Interrupted. Returns true if an interrupt is received. For example:
for (int i = 0; i < inputs.length; i++) { heavyCrunch(inputs[i]); if (Thread.interrupted()) { // We've been interrupted: no more crunching. return; } }
In this simple example, the code simply tests the interrupt and exits the thread when it is received. In more complex applications, throwing InterruptedException may make more sense:
if (Thread.interrupted()) { throw new InterruptedException(); }
The code to handle the operation after interruption is concentrated in the catch clause.
Interrupt status flag
The interrupt mechanism is implemented using an internal flag called interrupt state. Call thread Interrupt sets this flag. When a thread calls a static method thread When interrupted is used to check the interrupt, the interrupt status will be reset. The default is false. The non static isInterrupted method is used by one thread to query the interrupt status of another thread. It will not change the interrupt status flag.
By convention, any method that throws an InterruptedException and exits will reset the interrupt state on exit. However, it is always possible that the interrupt state will be set immediately again by another thread calling the interrupt.
Joins
The join method allows one Thread to wait for another Thread to complete. If t is a Thread object being executed by a Thread, t.join() causes the current Thread to suspend execution until t's Thread terminates. The overloaded method of join allows the programmer to specify a waiting time. However, like sleep, join depends on the timing of OS, so you should not assume that join will wait for the time you specify.
Like sleep, join responds to an interrupt by exiting with an InterruptedException.
SimpleThreads example
The following example brings together some of the concepts in this section. SimpleThreads consists of two threads. The first is the main thread that every Java application has. The main thread creates a new thread from the Runnable object MessageLoop and waits for it to complete. If the MessageLoop thread takes a long time to complete, the main thread will interrupt it.
The messageloop thread prints a series of messages. If you interrupt before printing all messages, the messageloop thread prints a message and exits.
public class SimpleThreads { // Display a message, preceded by // the name of the current thread static void threadMessage(String message) { String threadName = Thread.currentThread().getName(); System.out.format("%s: %s%n", threadName, message); } private static class MessageLoop implements Runnable { public void run() { String importantInfo[] = { "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" }; try { for (int i = 0; i < importantInfo.length; i++) { // Pause for 4 seconds Thread.sleep(4000); // Print a message threadMessage(importantInfo[i]); } } catch (InterruptedException e) { threadMessage("I wasn't done!"); } } } public static void main(String args[]) throws InterruptedException { // Delay, in milliseconds before // we interrupt MessageLoop // thread (default one hour). long patience = 1000 * 60 * 60; // If command line argument // present, gives patience // in seconds. if (args.length > 0) { try { patience = Long.parseLong(args[0]) * 1000; } catch (NumberFormatException e) { System.err.println("Argument must be an integer."); System.exit(1); } } threadMessage("Starting MessageLoop thread"); long startTime = System.currentTimeMillis(); Thread t = new Thread(new MessageLoop()); t.start(); threadMessage("Waiting for MessageLoop thread to finish"); // loop until MessageLoop // thread exits while (t.isAlive()) { threadMessage("Still waiting..."); // Wait maximum of 1 second // for MessageLoop thread // to finish. t.join(1000); if (((System.currentTimeMillis() - startTime) > patience) && t.isAlive()) { threadMessage("Tired of waiting!"); t.interrupt(); // Shouldn't be long now // -- wait indefinitely t.join(); } } threadMessage("Finally!"); } }
synchronization
Threads communicate mainly by sharing related fields and accessing object reference fields. This communication method is very effective, but it can also lead to two kinds of errors: thread conflict and memory consistency error. The tool needed to prevent these errors is synchronization.
However, synchronization may introduce thread contention, which occurs when two or more threads try to access the same resources at the same time, resulting in slow execution of one or more threads or even suspension of their execution at the Java runtime. Starvation and livelock are both forms of thread competition. For more information, see the liveness section.
This section covers the following topics:
- Thread conflict: describes how to introduce errors when multiple threads access shared data.
- Memory consistency error: describes the error caused by inconsistent views of shared memory.
- Synchronization method: describes a simple idiom that can effectively prevent thread conflicts and memory consistency errors.
- Implicit Locks and Synchronization: describes more general synchronization idioms and describes how to synchronize based on strictly implemented locks.
- Atomic access: discusses the general concept of operations that cannot be disturbed by other threads.
Thread conflict
Consider a simple class called Counter
class Counter { private int c = 0; public void increment() { c++; } public void decrement() { c--; } public int value() { return c; } }
The design of Counter is as follows: c is increased by 1 every time the increasing function is called, and c is decreased by 1 every time the decreasing function is called. However, if a Counter object is referenced by multiple threads, conflicts between threads may prevent this from happening.
When two operations run in different threads but operate on the same data, a conflict occurs. This means that the two operations consist of multiple steps, and the sequence of steps overlaps.
It seems impossible to interleave the operations on the Counter instance, because the two operations on c are single and simple statements. However, even simple statements can be converted into multiple steps by the virtual machine. We won't check the specific steps of the virtual machine -- it's enough to know that an expression c + + can be broken down into three steps:
- Retrieve the current value of c
- Increase the retrieved value by 1
- Store the incremented value back into c
Expression c -- can be decomposed in the same way, except that the second step is decreasing rather than increasing.
Suppose thread A calls incrementally and thread B calls decrementally at the same time. If the initial value of c is 0, their interleaving actions may follow the following order:
- Thread A: retrieve c
- Thread B: retrieve c
- Thread A: increase the search value; The result is 1.
- Thread B: minus the retrieved value; The result is 1.
- Thread A: store the result in C; C is now 1.
- Thread B: store the result in C; C is now - 1.
The result of thread A is lost and overwritten by thread B. This particular crossover sequence is only A possibility. In different cases, the result of thread B may be lost or there may be no error at all. Because thread conflict bug s are unpredictable, they are difficult to detect and fix.
Memory consistency error
Memory consistency errors occur when different threads have inconsistent views of the same data. The causes of memory consistency errors are complex and beyond the scope of this tutorial. Fortunately, programmers don't need to understand these reasons in detail. All that is needed is a strategy to avoid them.
The key to avoiding memory consistency errors is to understand the happens before relationship. This relationship only ensures that the memory write of one specific statement is visible to another specific statement. Consider the following example. Suppose A simple int field is defined and initialized: int counter = 0; The counter field is shared between two threads A and B. Suppose thread A adds A counter. counter++; Then, soon after, thread B prints out counter; System.out.println(counter); If these two statements are executed in the same thread, it is safe to assume that the output value is "1". However, if the two statements are executed in different threads, the output value is likely to be "0", because there is no guarantee that the changes made by thread A to counter will be visible to thread B -- unless the programmer establishes A happens before relationship between the two statements.
Several behaviors are happens before relationships. One of them is synchronization, which we will see in the following section.
There are two behaviors that conform to the happens before relationship.
- When a statement calls thread Start, each statement that has a happens before relationship with the statement also has a happens before relationship with each statement executed by the new thread. The effect of the code that causes the creation of a new thread is visible to the new thread.
- When a thread terminates and causes one thread to join to another thread to return, all statements executed by the terminated thread have a happens before relationship with all statements after the successful join. The effect of the code in the thread is now visible to the thread executing the join.
For a list of actions to create a happens before relationship, see Java util. The Summary page of the concurrent package.
Synchronization method
The Java programming language provides two basic synchronization idioms: synchronization methods and synchronization statements. The more complex synchronization statements in these two usages will be described in the next section. This section describes synchronization methods.
To synchronize a method, simply add the synchronized keyword to its declaration:
public class SynchronizedCounter { private int c = 0; public synchronized void increment() { c++; } public synchronized void decrement() { c--; } public synchronized int value() { return c; } }
If count is an instance of synchronized counter, synchronizing these methods has two effects:
- First, two synchronous method calls on the same object cannot be interleaved. When a thread executes a synchronization method for an object, all other threads calling the synchronization method for the same object suspend execution until the first thread completes the object.
- Secondly, when a synchronization method exits, it will automatically establish a happens before relationship with any subsequent synchronization method calls of the object. This ensures that changes to the object state are visible to all threads.
Note that constructors cannot be synchronized -- using the synchronized keyword in constructors can lead to syntax errors. Synchronous constructors make no sense because only the thread that created the object has access to it when it is constructed.
Warning: when constructing an object that will be shared between threads, be very careful not to "leak" references to the object too early. Suppose you want to maintain a List instance named instances, which contains each instance in the class. You might want to add the following line of code to the constructor: ` instances add(this);` However, other threads can use instances to access the object before it is constructed.The synchronization method enables a simple strategy to prevent thread conflicts and memory consistency errors: if an object is visible to multiple threads, all reads or writes to the object variables are completed through the synchronization method. (an important exception: the final field cannot be modified after the object is constructed, but can be safely read through asynchronous methods after the object is constructed). This strategy is effective, but it may lead to liveness problems, which we will see later in this lesson.
Internal lock and synchronization
Synchronization is built around an internal entity called an internal lock or monitor lock. (API specifications often refer to this entity simply as a "monitor".). Intrinsic locks play a role in both aspects of synchronization: enforcing exclusive access to object state and establishing happens before relationships that are critical to visibility.
Each object has an internal lock associated with it. Each object has an internal lock associated with it. By convention, threads that need exclusive and consistent access to the fields of an object must acquire the internal lock of the object before accessing them, and then release the internal lock after accessing them. A thread is said to have an internal lock during the period from obtaining the lock to releasing the lock. As long as one thread has an internal lock, other threads cannot obtain the same lock. When another thread tries to acquire a lock, it blocks. When a thread releases an intrinsic lock, a happens before relationship is established between the action and any subsequent locks.
Lock in synchronization method
When a thread calls a synchronous method, it automatically acquires the internal lock of the method object and releases the lock when the method returns. The lock is released even if the return is caused by an uncapped exception.
You might want to know what happens when static synchronous methods are called, because static methods are associated with classes rather than objects. In this case, the thread obtains the internal lock of the Class object associated with the Class. Therefore, access to static fields of a Class is controlled by a lock, which is different from the lock of any Class instance.
Synchronous statement
Another way to create synchronous code is to use synchronous statements. Unlike the synchronization method, the synchronization statement must specify the object that provides the internal lock:
public void addName(String name) { synchronized(this) { lastName = name; nameCount++; } nameList.add(name); }
In this example, the addName method needs to synchronize the change nameCount of lastNameand, but it also needs to avoid synchronously calling the methods of other objects. The way to call other objects from synchronous code can cause problems, as described in the Liveness section. If there is no synchronous statement, there must be a separate, asynchronous method to call namlist The only purpose of add.
Synchronization statements are also useful for improving the concurrency of fine-grained synchronization. For example, suppose the class MsLunch has two instance fields c1 and c2, which are never used together. All updates to these fields must be synchronized, but there is no reason to prevent c1 updates from alternating with c2 updates - doing so will cause unnecessary blocking and reduce concurrency. Instead of using synchronization methods or locks related to this, we created two objects to provide locks.
public class MsLunch { private long c1 = 0; private long c2 = 0; private Object lock1 = new Object(); private Object lock2 = new Object(); public void inc1() { synchronized(lock1) { c1++; } } public void inc2() { synchronized(lock2) { c2++; } } }
Be very careful when using this usage. You must be absolutely sure that cross access to the affected fields is indeed secure.
Reentrant synchronization
Recall that one thread cannot acquire a lock owned by another thread. But a thread can acquire a lock that it already owns. The same thread is allowed to re-enter the same lock multiple times. This describes a situation where the synchronization code directly or indirectly calls a method that also contains the synchronization code, and the two sets of code use the same lock. If there is no reentrant synchronization, the synchronization code will have to take many additional precautions to prevent the thread from blocking itself.
Atomic access
In programming, atomic action refers to an action that occurs effectively at one time. The action of an atom cannot be stopped halfway: it either happens completely or doesn't happen at all. The side effects of atomic operations are invisible until the operation is completed.
We have seen that self increasing expressions (such as c + +) do not describe atomic actions. Even very simple expressions can define complex operations that can be broken down into other operations. However, some actions can be specified as atomic:
- For reference variables and most basic variables (all types except long and double), reading and writing are atomic.
- For all variables declared volatile (including long and double variables), reading and writing are atomic.
Atomic operations cannot be used interchangeably, so you can use them without worrying about thread interference. However, this does not eliminate the need for all synchronous atomic operations, because memory consistency errors are still possible. Using volatile variables can reduce the risk of memory consistency errors, because any write operation to volatile variables will establish a happens before relationship with subsequent read operations to the variable. This means that changes to the volatile variable are always visible to other threads. More importantly, this also means that when a thread reads volatile variables, it will see not only the latest changes to volatile, but also the side effects of the code that caused the changes.
Using simple atomic variable access is more effective than accessing these variables through synchronous code, but programmers need to be more careful to avoid memory consistency errors. Whether the extra effort is worth it depends on the size and complexity of the application.
java. util. Some classes in the concurrent package provide atomic methods that do not depend on synchronization. We will discuss them in the advanced concurrent objects section.
activity
The ability of concurrent applications to execute in time is called their activity ability. This section describes one of the most common active problems, deadlock, and then briefly describes the other two active problems, hunger and livelock.
deadlock
Deadlock describes a situation in which two or more threads are blocked forever waiting for each other. Here's an example.
Alphonse and Gaston are friends and are very polite. The polite rule is that when you bow to a friend, you must bow until your friend has a chance to return. Unfortunately, this rule does not take into account the possibility of two friends bowing to each other at the same time. This example application deadlock simulates this possibility:
public class Deadlock { static class Friend { private final String name; public Friend(String name) { this.name = name; } public String getName() { return this.name; } public synchronized void bow(Friend bower) { System.out.format("%s: %s" + " has bowed to me!%n", this.name, bower.getName()); bower.bowBack(this); } public synchronized void bowBack(Friend bower) { System.out.format("%s: %s" + " has bowed back to me!%n", this.name, bower.getName()); } } public static void main(String[] args) { final Friend alphonse = new Friend("Alphonse"); final Friend gaston = new Friend("Gaston"); new Thread(new Runnable() { public void run() { alphonse.bow(gaston); } }).start(); new Thread(new Runnable() { public void run() { gaston.bow(alphonse); } }).start(); } }
When a deadlock runs, it is highly likely that both threads will block when trying to call bowBack. Neither block will end because each thread is waiting for the other to exit bow.
Hunger and locks
Starvation and livelocks are less common than deadlocks, but they are still problems that every concurrent software designer may encounter.
hunger
Hunger describes a situation in which threads cannot gain regular access to shared resources and therefore cannot make progress. This happens when "greedy" threads make shared resources unavailable for a long time. For example, suppose an object provides a synchronization method, which usually takes a long time to return. If a thread calls this method frequently, other threads that also need to frequently access the same object synchronously will often be blocked.
Livelock
One thread often responds to the actions of another thread. If the action of another thread is also a response to the action of another thread, it may lead to livelock. Like deadlocks, active lock threads cannot make further progress. However, these threads are not blocked -- they are just busy responding to each other and cannot continue working. It's like two people overtaking each other in the corridor: Alphonse moves left to let Gaston pass, and Gaston moves right to let Alphonse pass. Seeing that they are still blocking each other, Alphonse moves to the right and Gaston moves to the left. They're still blocking each other, so
Protection block
Threads usually have to coordinate their actions. The most common coordination idiom is the protection block. Such a block first polls for a condition that must be true before the block can continue. To complete this work correctly, many steps need to be followed.
For example, suppose guardedJoy is a method that cannot continue until another thread sets the shared variable joy. Theoretically, this method can simply cycle until the conditions are met, but this cycle is a waste of time because it is executed continuously while waiting.
public void guardedJoy() { // Simple loop guard. Wastes // processor time. Don't do this! while(!joy) {} System.out.println("Joy has been achieved!"); }
A more effective protection calls object Wait suspends the current thread. The call of wait will not return until another thread notifies that a special event may have occurred - although it is not necessarily the event that this thread is waiting for:
Note: wait is always invoked in the loop to test the conditions that are waiting. Don't assume that the interrupt is a specific condition waiting for you, or that the condition is still true.public synchronized void guardedJoy() { // This guard only loops once for each special event, which may not // be the event we're waiting for. while(!joy) { try { wait(); } catch (InterruptedException e) {} } System.out.println("Joy and efficiency have been achieved!"); }
Like many methods that pause execution, wait throws an InterruptedException exception. In this example, we can ignore this exception -- we only care about the value of joy.
Why is this version of guardedJoy synchronized? Suppose D is the object we use to call wait. When a thread calls d.wait, it must have the internal lock of D, otherwise it will throw an error. Calling wait in synchronization is a simple way to get internal locks.
When wait is called, the thread releases the lock and pauses execution. At some point in the future, another thread will acquire the same lock and call object NotifyAll notifies all threads waiting for locks that something important has happened:
public synchronized notifyJoy() { joy = true; notifyAll(); }
After the second thread releases the lock for a period of time, the first thread regains the lock and returns from the wait call.
Note: there is also a second notification method, notify, which wakes up a single thread. Because notify does not allow you to specify which thread to wake up, it is only useful in massively parallel applications -- that is, programs with a large number of threads, all of which are doing similar work. In such an application, you don't care which thread is awakened.Let's use protection blocks to create producer consumer applications. Such applications share data between two threads: the producer who creates the data and the consumer who processes the data. The two threads communicate using a shared object. Coordination is crucial: before the producer thread delivers the data, the consumer thread cannot attempt to retrieve the data; If the consumer has not retrieved the old data, the producer thread cannot attempt to submit new data.
In this example, the data is a series of text messages shared through a Drop type object:
public class Drop { // Message sent from producer // to consumer. private String message; // True if consumer should wait // for producer to send message, // false if producer should wait for // consumer to retrieve message. private boolean empty = true; public synchronized String take() { // Wait until message is // available. while (empty) { try { wait(); } catch (InterruptedException e) {} } // Toggle status. empty = true; // Notify producer that // status has changed. notifyAll(); return message; } public synchronized void put(String message) { // Wait until message has // been retrieved. while (!empty) { try { wait(); } catch (InterruptedException e) {} } // Toggle status. empty = false; // Store message. this.message = message; // Notify consumer that status // has changed. notifyAll(); } }
The Producer thread defined in Producer sends a series of ordinary messages. The string "DONE" indicates that all messages have been sent. To simulate the unpredictable nature of real-world applications, Producer threads pause randomly between messages.
public class Producer implements Runnable { private Drop drop; public Producer(Drop drop) { this.drop = drop; } public void run() { String importantInfo[] = { "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" }; Random random = new Random(); for (int i = 0; i < importantInfo.length; i++) { drop.put(importantInfo[i]); try { Thread.sleep(random.nextInt(5000)); } catch (InterruptedException e) {} } drop.put("DONE"); } }
The Consumer thread defined in the Consumer simply retrieves the message and prints it out until it retrieves the "DONE" string. This thread will also be suspended randomly.
public class Consumer implements Runnable { private Drop drop; public Consumer(Drop drop) { this.drop = drop; } public void run() { Random random = new Random(); for (String message = drop.take(); ! message.equals("DONE"); message = drop.take()) { System.out.format("MESSAGE RECEIVED: %s%n", message); try { Thread.sleep(random.nextInt(5000)); } catch (InterruptedException e) {} } } }
Finally, the main thread defined in ProducerConsumerExample starts the producer and consumer threads.
Note: the Drop class is written to demonstrate protected blocks. To avoid duplication of effort, check the existing data structures in the Java Collections Framework before writing your own data sharing objects. For more information, see the Questions and Exercises section.public class ProducerConsumerExample { public static void main(String[] args) { Drop drop = new Drop(); (new Thread(new Producer(drop))).start(); (new Thread(new Consumer(drop))).start(); } }
Immutable object
If the state of an object cannot be changed after construction, the object is considered immutable. Maximum dependence on immutable objects is widely considered to be a good strategy for creating simple and reliable code.
Immutable objects are particularly useful in concurrent applications. Because they cannot change state, they cannot be broken by thread conflicts or observed in inconsistent states.
Programmers are often reluctant to use immutable objects because they are more worried about the cost of creating new objects than when they are updated at other times. The impact of creating objects is often overestimated, and these can be offset by the efficiency of some immutable objects. This includes reducing the overhead of garbage collection and eliminating the code needed to protect mutable objects from corruption.
The following sections take a class whose instance is mutable as an example and derive a class with immutable instances. In doing so, they give the general rules of this transformation and show some advantages of immutable objects.
Synchronization class example
The SynchronizedRGB class defines objects that represent colors. Each object represents the color as three integers representing the main color value and a string giving the color name.
public class SynchronizedRGB { // Values must be between 0 and 255. private int red; private int green; private int blue; private String name; private void check(int red, int green, int blue) { if (red < 0 || red > 255 || green < 0 || green > 255 || blue < 0 || blue > 255) { throw new IllegalArgumentException(); } } public SynchronizedRGB(int red, int green, int blue, String name) { check(red, green, blue); this.red = red; this.green = green; this.blue = blue; this.name = name; } public void set(int red, int green, int blue, String name) { check(red, green, blue); synchronized (this) { this.red = red; this.green = green; this.blue = blue; this.name = name; } } public synchronized int getRGB() { return ((red << 16) | (green << 8) | blue); } public synchronized String getName() { return name; } public synchronized void invert() { red = 255 - red; green = 255 - green; blue = 255 - blue; name = "Inverse of " + name; } }
Synchronized RGB must be used with care to avoid being seen in an inconsistent state. For example, suppose a thread executes the following code:
SynchronizedRGB color = new SynchronizedRGB(0, 0, 0, "Pitch Black"); int myColorInt = color.getRGB(); //Statement 1 String myColorName = color.getName(); //Statement 2
If another thread calls color Set after statement 1 but before statement 2, the value of myColorInt will not match the value of myColorName. To avoid this result, the two statements must be combined:
synchronized (color) { int myColorInt = color.getRGB(); String myColorName = color.getName(); }
This inconsistency can only occur on mutable objects -- not a problem for the immutable version of synchronized RGB.
Define policies for immutable objects
The following rules define a simple strategy for creating immutable objects. Not all classes recorded as "immutable" follow these rules. This does not necessarily mean that the creators of these classes are hasty - they may have good reason to believe that the instances of their classes will never change after construction. However, this strategy requires complex analysis and is not suitable for beginners.
- Do not provide "setter" methods -- methods that modify fields or objects referenced by fields.
- Make all fields final and private.
- Do not allow subclasses to override methods. The simplest way is to declare the class final. A more sophisticated approach is to make the constructor private and construct an instance in the factory method.
- If the instance field contains references to mutable objects, they are not allowed to be modified:
1) Do not provide methods to modify mutable objects.
2) Do not share references to mutable objects. Never store a reference to an external mutable object passed to a constructor; If necessary, create a copy and store a reference to the copy. Similarly, create a copy of the internally mutable object if necessary to avoid returning the original object in the method.
Applying this policy to synchronized RGB results in the following:
- There are two setter methods in this class. The first set converts any object and has no place in the immutable version of the class. The second is convert, which can be adjusted by letting it create a new object instead of modifying an existing object.
- All fields have been private; They are further qualified as final.
- The class itself is declared final.
- Only one field points to an object, which itself is immutable. Therefore, there is no need to prevent changing the state of the "contained" variable object.
After these changes, we have ImmutableRGB:
final public class ImmutableRGB { // Values must be between 0 and 255. final private int red; final private int green; final private int blue; final private String name; private void check(int red, int green, int blue) { if (red < 0 || red > 255 || green < 0 || green > 255 || blue < 0 || blue > 255) { throw new IllegalArgumentException(); } } public ImmutableRGB(int red, int green, int blue, String name) { check(red, green, blue); this.red = red; this.green = green; this.blue = blue; this.name = name; } public int getRGB() { return ((red << 16) | (green << 8) | blue); } public String getName() { return name; } public ImmutableRGB invert() { return new ImmutableRGB(255 - red, 255 - green, 255 - blue, "Inverse of " + name); } }
Advanced concurrent object
So far, this lesson has focused on low-level APIs, which have been part of the Java platform from the beginning. These APIs are sufficient for very basic tasks, but higher-level modules are needed for more advanced tasks. This is especially true for large-scale concurrent applications that take full advantage of today's multiprocessor and multi-core systems.
In this section, we will introduce some advanced concurrency features introduced in version 5.0 of the Java platform. Most of these features are in the new Java util. Implemented in concurrent package. There are also new concurrent data structures in the Java collection framework.
- Lock object: supports simplifying lock idioms for many concurrent applications.
- Executor: defines an advanced API for starting and managing threads. java. util. The executor implementation provided by concurrent provides thread pool management suitable for large-scale applications.
- Concurrent Collections: make it easier to manage large data collections and greatly reduce the need for synchronization.
- Atomic variables: features that minimize synchronization and help avoid memory consistency errors.
- ThreadLocalRandom: (in JDK 7) provides the ability to efficiently generate pseudo-random numbers from multiple threads.
Lock object
The synchronization code relies on a simple reentrant Lock. This Lock is easy to use, but there are many limitations. More complex Lock usage by Java util. concurrent. Locks package support. We will not study this package in detail, but focus on its most basic interface Lock.
The lock object works much like an implicit lock used by synchronous code. Like implicit locks, only one thread can have a lock object at a time. Lock objects also support the wait/notify mechanism through their related Condition objects.
The biggest advantage of Lock objects over implicit locks is their ability to exit when trying to acquire a Lock. If the Lock cannot be used immediately, or before the timeout expires (if specified), the tryLock method exits. If another thread sends an interrupt before acquiring the Lock, the lockterruptible method exits.
Let's use the Lock object to solve the deadlock problem we saw in the active chapter. Alphonse and Gaston have trained themselves to notice when friends bow. We re modeled the rule that a Friend object must acquire a Lock before two participants bow. The following is the source code of the improved model Safelock. To demonstrate the versatility of this usage, let's assume that Alphonse and Gaston are so obsessed with their newly discovered ability to bow safely that they keep bowing to each other:
public class Safelock { static class Friend { private final String name; private final Lock lock = new ReentrantLock(); public Friend(String name) { this.name = name; } public String getName() { return this.name; } public boolean impendingBow(Friend bower) { Boolean myLock = false; Boolean yourLock = false; try { myLock = lock.tryLock(); yourLock = bower.lock.tryLock(); } finally { if (! (myLock && yourLock)) { if (myLock) { lock.unlock(); } if (yourLock) { bower.lock.unlock(); } } } return myLock && yourLock; } public void bow(Friend bower) { if (impendingBow(bower)) { try { System.out.format("%s: %s has" + " bowed to me!%n", this.name, bower.getName()); bower.bowBack(this); } finally { lock.unlock(); bower.lock.unlock(); } } else { System.out.format("%s: %s started" + " to bow to me, but saw that" + " I was already bowing to" + " him.%n", this.name, bower.getName()); } } public void bowBack(Friend bower) { System.out.format("%s: %s has" + " bowed back to me!%n", this.name, bower.getName()); } } static class BowLoop implements Runnable { private Friend bower; private Friend bowee; public BowLoop(Friend bower, Friend bowee) { this.bower = bower; this.bowee = bowee; } public void run() { Random random = new Random(); for (;;) { try { Thread.sleep(random.nextInt(10)); } catch (InterruptedException e) {} bowee.bow(bower); } } } public static void main(String[] args) { final Friend alphonse = new Friend("Alphonse"); final Friend gaston = new Friend("Gaston"); new Thread(new BowLoop(alphonse, gaston)).start(); new Thread(new BowLoop(gaston, alphonse)).start(); } }
Actuator
In all the previous examples, there is a close relationship between the tasks performed by the new thread (defined by its Runnable object) and the thread itself (defined by the thread object). This works well for small applications, but in large applications, it makes sense to separate thread management and creation from the rest of the application. The object that encapsulates these functions is called an actuator. The following sections describe the actuator in detail.
- Actuator interface: defines three types of actuator objects.
- Thread pool: it is the most common implementation of executor.
- Fork/Join: a framework that takes advantage of multiprocessor (updated in JDK 7).
Actuator interface
java. util. The concurrent package defines three actuator interfaces:
- Executor: a simple interface that supports starting new tasks.
- ExecutorService: the sub interface of the Executor, which adds functions to help manage the life cycle, including a single task and the Executor itself.
- ScheduledExecutorService: the sub interface of ExecutorService, which supports future tasks and / or periodic execution.
Typically, variables that reference an actuator object are declared as one of these three interface types instead of using an actuator class type.
Executor interface
The Executor interface provides a single method execute, which is designed to replace the common thread creation idiom. If R is a Runnable object and E is an Executor object, you can put (new thread (R)) start(); Replace with e.execute(r); However, the definition of execute is relatively general. Low level usage is to create a new thread and start it immediately. Depending on the Executor implementation, execute may do the same thing, but it is more likely to run R with an existing worker thread or put r in a queue waiting for the worker thread to be available. (we will describe worker threads in the thread pool section.)
java. util. The executor implementation in concurrent is designed to take full advantage of the more advanced ExecutorService and ScheduledExecutorService interfaces, although they also work with the basic executor interface.
ExecutorService interface
The ExecutorService interface complements execute with a similar but more generic submit method. Like execute, submit accepts the Runnable object, but also the Callable object, which allows the task to return a value. The submit method returns a Future object, which is used to retrieve the return value of Callable and manage the status of Callable and Runnable tasks.
ExecutorService also provides methods to submit a large collection of Callable objects. Finally, ExecutorService provides many ways to manage the shutdown of executors. To support immediate shutdown, tasks should handle interrupts correctly.
ScheduledExecutorService interface
The ScheduledExecutorService interface complements the method of its parent ExecutorService with schedule, which executes a Runnable or Callable task after the specified delay. In addition, the interface defines scheduleAtFixedRate and scheduleWithFixedDelay, which repeat the specified task at defined intervals.
Thread pool
java. util. Most executor implementations in concurrent use thread pools, which consist of worker threads. This thread exists separately from the Runnable and Callable tasks it executes, and is usually used to execute multiple tasks.
Using worker threads minimizes the overhead of thread creation. Thread objects use a lot of memory. In large-scale applications, allocating and recycling many Thread objects will produce huge memory management overhead.
A common type of thread pool is fixed thread pool. This type of pool always has a specified number of threads running; If a thread terminates in some way while still in use, it will automatically be replaced by a new thread. Tasks are submitted to the pool through the internal queue. Whenever there are more active tasks than threads, the internal queue will save additional tasks.
An important advantage of a fixed thread pool is that applications that use it can be degraded gracefully. To understand this, consider a web server application where each HTTP request is processed by a separate thread. If the application just creates a new thread for each new HTTP request, and the request received by the system exceeds its immediate processing capacity, when the overhead of all these threads exceeds the capacity of the system, the application will suddenly stop responding to all requests. By limiting the number of threads that can be created, the application will not serve HTTP requests as fast as they arrive, but it will serve them as fast as the system can maintain.
A simple way to create an actuator that uses a fixed thread pool is to call Java util. concurrent. newFixedThreadPool factory method in executors. This class also provides the following factory methods:
- The newCachedThreadPool method creates an actuator with an extensible thread pool. This executor is suitable for applications that start many short-term tasks.
- The newSingleThreadExecutor method creates an executor that executes one task at a time.
- Several factory methods are ScheduledExecutorService versions of the above actuators.
If the executors provided by the above factory methods cannot meet your needs, construct Java util. concurrent. ThreadPoolExecutor or Java util. concurrent. The instance of scheduledthreadpoolexecutor will give you additional options.
Fork/Join
The fork/join framework is an implementation of the ExecutorService interface that helps you leverage multiple processors. It is designed for work that can be recursively broken down into smaller pieces. The goal is to use all available processing power to enhance application performance.
Like any ExecutorService implementation, the fork/join framework assigns tasks to worker threads in the thread pool. The fork/join framework is different because it uses the work stealing algorithm. When the worker thread runs out of things to do, it can steal tasks from other threads that are still busy.
The center of the fork/join framework is the ForkJoinPool class, which is an extension of the AbstractExecutorService class. ForkJoinPool implements the core work stealing algorithm and can execute the ForkJoinTask process.
Basic use
The first step in using the fork/join framework is to write code that performs some of the work. Your code should be similar to the following pseudo code:
if (My workload is small enough) Work directly else Divide my work into two parts Call these two parts and wait for the result
Wrap this code in the ForkJoinTask subclass, usually using one of its more specialized types, recursivetask (which can return results) or RecursiveAction.
After your ForkJoinTask subclass is ready, create an object representing all the work to be done and pass it to the invoke() method of the ForkJoinPool instance.
Fuzzy clarity
To help you understand how the fork/join framework works, consider the following example. Suppose you want to blur the image. The original source image is represented by an array of integers, each of which contains the color value of a single pixel. The blurred target image is also represented by an integer array with the same size as the source image.
Blurring is performed by processing the source array one pixel at a time. Each pixel is averaged with its surrounding pixels (red, green, and blue component averages), and the results are placed in the target array. Since the image is a large array, this process may take a long time. By implementing the algorithm using the fork/join framework, you can take advantage of concurrent processing on multiprocessor systems. Here is a possible implementation:
public class ForkBlur extends RecursiveAction { private int[] mSource; private int mStart; private int mLength; private int[] mDestination; // Processing window size; should be odd. private int mBlurWidth = 15; public ForkBlur(int[] src, int start, int length, int[] dst) { mSource = src; mStart = start; mLength = length; mDestination = dst; } protected void computeDirectly() { int sidePixels = (mBlurWidth - 1) / 2; for (int index = mStart; index < mStart + mLength; index++) { // Calculate average. float rt = 0, gt = 0, bt = 0; for (int mi = -sidePixels; mi <= sidePixels; mi++) { int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1); int pixel = mSource[mindex]; rt += (float)((pixel & 0x00ff0000) >> 16) / mBlurWidth; gt += (float)((pixel & 0x0000ff00) >> 8) / mBlurWidth; bt += (float)((pixel & 0x000000ff) >> 0) / mBlurWidth; } // Reassemble destination pixel. int dpixel = (0xff000000 ) | (((int)rt) << 16) | (((int)gt) << 8) | (((int)bt) << 0); mDestination[index] = dpixel; } } ...
Now implement the abstract compute() method, which either performs fuzzy directly or divides it into two smaller tasks. A simple array length threshold can help determine whether work is performed or split.
protected static int sThreshold = 100000; protected void compute() { if (mLength < sThreshold) { computeDirectly(); return; } int split = mLength / 2; invokeAll(new ForkBlur(mSource, mStart, split, mDestination), new ForkBlur(mSource, mStart + split, mLength - split, mDestination)); }
If the previous method is in a subclass of the RecursiveAction class, setting the task to run in ForkJoinPool is simple and involves the following steps:
-
Create a task that represents all the work to be done.
ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
-
Create a ForkJoinPool that will run the task.
ForkJoinPool pool = new ForkJoinPool();
-
Run the task.
pool.invoke(fb);
For the complete source code, including some additional code to create the target image file, see the ForkBlur example.
public class ForkBlur extends RecursiveAction { private int[] mSource; private int mStart; private int mLength; private int[] mDestination; private int mBlurWidth = 15; // Processing window size, should be odd. public ForkBlur(int[] src, int start, int length, int[] dst) { mSource = src; mStart = start; mLength = length; mDestination = dst; } // Average pixels from source, write results into destination. protected void computeDirectly() { int sidePixels = (mBlurWidth - 1) / 2; for (int index = mStart; index < mStart + mLength; index++) { // Calculate average. float rt = 0, gt = 0, bt = 0; for (int mi = -sidePixels; mi <= sidePixels; mi++) { int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1); int pixel = mSource[mindex]; rt += (float) ((pixel & 0x00ff0000) >> 16) / mBlurWidth; gt += (float) ((pixel & 0x0000ff00) >> 8) / mBlurWidth; bt += (float) ((pixel & 0x000000ff) >> 0) / mBlurWidth; } // Re-assemble destination pixel. int dpixel = (0xff000000) | (((int) rt) << 16) | (((int) gt) << 8) | (((int) bt) << 0); mDestination[index] = dpixel; } } protected static int sThreshold = 10000; @Override protected void compute() { if (mLength < sThreshold) { computeDirectly(); return; } int split = mLength / 2; invokeAll(new ForkBlur(mSource, mStart, split, mDestination), new ForkBlur(mSource, mStart + split, mLength - split, mDestination)); } // Plumbing follows. public static void main(String[] args) throws Exception { String srcName = "red-tulips.jpg"; File srcFile = new File(srcName); BufferedImage image = ImageIO.read(srcFile); System.out.println("Source image: " + srcName); BufferedImage blurredImage = blur(image); String dstName = "blurred-tulips.jpg"; File dstFile = new File(dstName); ImageIO.write(blurredImage, "jpg", dstFile); System.out.println("Output image: " + dstName); } public static BufferedImage blur(BufferedImage srcImage) { int w = srcImage.getWidth(); int h = srcImage.getHeight(); int[] src = srcImage.getRGB(0, 0, w, h, null, 0, w); int[] dst = new int[src.length]; System.out.println("Array size is " + src.length); System.out.println("Threshold is " + sThreshold); int processors = Runtime.getRuntime().availableProcessors(); System.out.println(Integer.toString(processors) + " processor" + (processors != 1 ? "s are " : " is ") + "available"); ForkBlur fb = new ForkBlur(src, 0, src.length, dst); ForkJoinPool pool = new ForkJoinPool(); long startTime = System.currentTimeMillis(); pool.invoke(fb); long endTime = System.currentTimeMillis(); System.out.println("Image blur took " + (endTime - startTime) + " milliseconds."); BufferedImage dstImage = new BufferedImage(w, h, BufferedImage.TYPE_INT_ARGB); dstImage.setRGB(0, 0, w, h, dst, 0, w); return dstImage; } }
Standard implementation
In addition to using the fork/join framework to implement custom algorithms for tasks executed concurrently on multiprocessor systems (such as the ForkBlur.java example in the previous section), some common features in Java SE have been implemented using the fork/join framework. Java SE 8 introduces such an implementation, Java util. The arrays class uses its parallelSort() method. These methods are similar to sort(), but take advantage of concurrency through the fork/join framework. On multiprocessor systems, parallel sorting is faster than sequential sorting. However, how these methods utilize the fork/join framework is beyond the scope of the java tutorial. Refer to the Java API documentation for this information.
Another implementation of fork/join framework is implemented by Java util. The methods in the streams package are used as part of Project Lambda, which is planned for release in the Java SE 8 release. For more information, see the Lambda expressions section.
Concurrent set
java. util. The concurrent package contains many additions to the Java collection framework. They are most easily classified according to the set interface provided:
- BlockingQueue: defines a first in first out data structure that blocks or times out when you try to add to a complete queue or retrieve from an empty queue.
- ConcurrentMap: Java util. The sub interface of map, which defines useful atomic operations. These operations remove or replace the key value pair only when the key exists, or add the key value pair only when the key does not exist. Atomizing these operations helps avoid synchronization. The standard general implementation of ConcurrentMap is ConcurrentHashMap, which is the concurrent simulation of HashMap.
- ConcurrentNavigableMap: a sub interface of ConcurrentMap, which supports approximate matching. The standard general implementation of ConcurrentNavigableMap is ConcurrentSkipListMap, which is the concurrent simulation of TreeMap.
All these collections help avoid memory consistency errors by defining happens before relationships between the operation of adding an object to the collection and subsequent operations of accessing or deleting the object.
Atomic variable
java. util. concurrent. The atomic package defines classes that support atomic operations on a single variable. All classes have get and set methods, which work similar to reading and writing volatile variables. In other words, a set has a happens before relationship with any subsequent get on the same variable. The compareAndSet method of atom also has these memory consistency characteristics, and so does the simple atomic arithmetic method applied to integer atomic variables.
To understand how this package is used, let's go back to the Counter class we originally used to demonstrate thread interference:
class Counter { private int c = 0; public void increment() { c++; } public void decrement() { c--; } public int value() { return c; } }
One way to make Counter secure is to synchronize its methods, such as synchronized Counter:
class SynchronizedCounter { private int c = 0; public synchronized void increment() { c++; } public synchronized void decrement() { c--; } public synchronized int value() { return c; } }
Synchronization is an acceptable solution for this simple class. But for more complex classes, we may want to avoid the activity impact of unnecessary synchronization. Replacing the int field with AtomicInteger allows us to prevent thread conflicts without using synchronization, such as AtomicCounter:
class AtomicCounter { private AtomicInteger c = new AtomicInteger(0); public void increment() { c.incrementAndGet(); } public void decrement() { c.decrementAndGet(); } public int value() { return c.get(); } }
Concurrent random number
In JDK 7, Java util. Concurrent contains a convenient class ThreadLocalRandom for applications that want to use multiple threads or random numbers in ForkJoinTasks.
For concurrent access, use ThreadLocalRandom instead of math Random () can reduce contention and ultimately achieve better performance.
All you need to do is call threadlocalrandom Current (), and then call one of its methods to retrieve the random number. Here is an example:
int r = ThreadLocalRandom.current() .nextInt(4, 77);
Further references
- Concurrent Programming in Java: Design Principles and Pattern (2nd Edition) by Doug Lea. This is the comprehensive research result of an authoritative expert. He is also the architect of the concurrency framework of the Java platform.
- Java Concurrency in Practice by Brian Goetz, Tim Peierls, Joshua Bloch, Joseph Bowbeer, David Holmes, and Doug Lea. Practical guide is a practical guide designed for beginners.
- Effective Java Programming Language Guide (2nd Edition) by Joshua Bloch. Although this is a general programming guide, its chapter on threads contains the basic "best practices" of concurrent programming.
- Concurrency: State Models & Java Programs (2nd Edition), by Jeff Magee and Jeff Kramer. Concurrent programming is introduced by combining modeling and examples.
- Java Concurrent Animated: an animation that displays the usage of concurrency features. (the link is gone)