Implementation of multithreading
Implementing threads is the basis of concurrent programming, because we must implement threads before we can continue a series of subsequent operations.
Basic implementation mode
Runable
public class ImplementRunable implements Runnable { @Override public void run() { while (true) { // The name of the output thread, which is distinguished from the name of the main thread System.out.println(Thread.currentThread().getName()); try { // Thread sleep for one second Thread.sleep(1000); } catch (Exception e) { throw new RuntimeException(e); } } } public static void main(String[] args) { new Thread(new ImplementRunable()).start(); } } Copy code
In essence, it is a new Thread, just passing the Runable object as a constructor
Thread
The most common method is to inherit the Thread class or directly implement the new Thread object. However, we know that Java inherits from the Thread class, that is, if we inherit the Thread class, we can't inherit other classes. Therefore, this is why we often use Runable to implement multithreading. At this time, we can pass the Runable object to the Thread constructor. Here are the common constructors for Thread
Let's look at an example
public class ExtendsThread extends Thread { public ExtendsThread() { // Sets the name of the current thread this.setName("MyThread"); } @Override public void run() { // Output the name of the current thread every 1s while (true) { // The name of the output thread, which is distinguished from the name of the main thread System.out.println(Thread.currentThread().getName()); try { // Thread sleep for one second Thread.sleep(1000); } catch (Exception e) { throw new RuntimeException(e); } } } public static void main(String[] args) { new ExtendsThread().start(); } }
Callable
Both runnable and Callable represent tasks to be executed in different threads. Runnable has been available since JDK 1.0, and Callable is added in JDK 1.5.
The main difference between them is that Callable's call() method can return values and throw exceptions, while Runnable's run() method does not have these functions, and Callable can return Future objects loaded with calculation results.
public class ImplementCallable implements Callable<Integer> { @Override public Integer call() throws Exception { return new Random().nextInt(); } public static void main(String[] args) throws ExecutionException, InterruptedException { //Create thread pool ExecutorService service = Executors.newFixedThreadPool(1); //Submit the task and submit the returned results with Future Future<Integer> future = service.submit(new ImplementCallable()); Integer integer = future.get(); System.out.println(integer); } }
However, it should be noted that Callable cannot be used directly with Thread. It needs to be used with Thread pool. We also demonstrated how to obtain results through Future object. In fact, we can also use Callable with FutureTask here. We will demonstrate it later when we demonstrate FutureTask
TimerTask
public class TimerTaskDemo { /** * After a delay of 100ms, print out hello world at an interval of 1s * * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { Timer t = new Timer(); t.scheduleAtFixedRate(new TimerTask() { @Override public void run() { System.out.println("hello world"); } }, 100, 1000); } }
This looks like a new implementation method, but when you look at the implementation of TimerTask, you will find that this class actually inherits from Runnable, that is, it is actually implemented through Runnable, that is, put the TimerTask object into the queue of a TaskQueue object, and then wait until the scheduling time comes, Then execute the run method of TimerTask.
It should be noted that when we use the Timer class, we are prompted to use ScheduledExecutorService to replace it. Why? This is because the Timer does not use thread pool and the scheduling of the whole Timer is single thread, so we still need to look at the specific implementation principle of Timer
Let's sort out a task scheduling process
We created a Timer object
public Timer() { this("Timer-" + serialNumber()); } public Timer(String name) { thread.setName(name); thread.start(); }
We can see that a thread is started in the constructor, which is TimerThread
The scheduleAtFixedRate method is used to create and schedule a scheduled task. In fact, the task is only added to the queue of the TaskQueue
public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) { if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, firstTime.getTime(), period); } private void sched(TimerTask task, long time, long period) { if (time < 0) throw new IllegalArgumentException("Illegal execution time."); // Constrain value of period sufficiently to prevent numeric // overflow while still being effectively infinitely large. if (Math.abs(period) > (Long.MAX_VALUE >> 1)) period >>= 1; synchronized(queue) { if (!thread.newTasksMayBeScheduled) throw new IllegalStateException("Timer already cancelled."); synchronized(task.lock) { if (task.state != TimerTask.VIRGIN) throw new IllegalStateException( "Task already scheduled or cancelled"); task.nextExecutionTime = time; task.period = period; task.state = TimerTask.SCHEDULED; } // Add the task to the queue of the timer, queue.add(task); if (queue.getMin() == task) queue.notify(); } }
Timer implements its NB scheduling function. We are not involved in this part. It belongs to the black box. In fact, it is not black. Let's uncover it. The secret is in TimerThread. We know that this thread object is started when timer is created
private TaskQueue queue; TimerThread(TaskQueue queue) { this.queue = queue; } // Thread entry, run method public void run() { try { // Call the core method -- loop mainLoop(); } finally { // Someone killed this Thread, behave as if Timer cancelled synchronized(queue) { newTasksMayBeScheduled = false; queue.clear(); // Eliminate obsolete references } } } /** * The main timer loop. (See class comment.) */ private void mainLoop() { while (true) { try { TimerTask task; boolean taskFired; synchronized(queue) { // If the task queue is empty, wait until while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); if (queue.isEmpty()) break; // Queue is empty and will forever remain; die // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; // Get task from task queue task = queue.getMin(); synchronized(task.lock) { if (task.state == TimerTask.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; if (taskFired = (executionTime<=currentTime)) { if (task.period == 0) { // Non-repeating, remove queue.removeMin(); task.state = TimerTask.EXECUTED; } else { // Repeating task, reschedule queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } if (!taskFired) // Task hasn't yet fired; wait queue.wait(executionTime - currentTime); } // When executing a task, we see that instead of starting a new thread, it is blocking execution if (taskFired) // Task fired; run it, holding no locks task.run(); } catch(InterruptedException e) { } } }
From here, we know that Timer executes timed tasks in a single thread, that is, your task may not be executed when the time is up, because the last task has not been executed. Here we write an example to see the above example, which is simple to change
public static void main(String[] args) throws InterruptedException { timer(); } // On the whole, we still want to 1s implement it once public static void timer(){ Timer t = new Timer(); t.scheduleAtFixedRate(new TimerTask() { @Override public void run() { System.out.println(Thread.currentThread().getName()+ ": hello world"); // We waited in this task try { TimeUnit.SECONDS.sleep(1000000); } catch (InterruptedException e) { e.printStackTrace(); } } }, 100, 1000); }
In fact, after you execute it, you will find that your task TimerTask object 1s cannot be guaranteed to execute once, because the last task has not been completed. That's why idea suggests you use the scheduled executorservice, which essentially belongs to the category of thread pool. We'll discuss it when we learn about the implementation of thread pool.
FutureTask
In Java Concurrent Programs, FutureTask represents an asynchronous operation that can be cancelled. It has the methods of starting and canceling the operation, querying whether the operation is completed and retrieving the operation results.
The result can be retrieved only when the operation is completed. If the operation is not completed, the get method will be blocked.
A FutureTask object can wrap Callable and Runnable objects. Because FutureTask also implements the Runnable interface, it can be submitted to the Executor for execution or used with Thread. However, there is a problem here. We know that Runnable does not return a value. How does FutureTask return a value? Let's take a look at it below
public class FutureTaskDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { callableDemo(); runableDemo(); } public static void callableDemo() throws ExecutionException, InterruptedException { Callable<Integer> call = new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("Calculating results..."); Thread.sleep(3000); return 1; } }; FutureTask<Integer> task = new FutureTask<>(call); Thread thread = new Thread(task); thread.start(); Integer result = task.get(); System.out.println("callableDemo The results are:" + result); } public static void runableDemo() throws ExecutionException, InterruptedException { Runnable run = new Runnable() { @SneakyThrows @Override public void run() { System.out.println("Calculating results..."); Thread.sleep(3000); } }; // The return value is predefined by ourselves FutureTask<Integer> task = new FutureTask(run,1); Thread thread = new Thread(task); thread.start(); Integer result = task.get(); System.out.println("runableDemo The results are:" + result); } }
Advanced implementation
Java8 lambda expression
This is actually a syntax sugar, which is still an old routine in essence. Let's take a brief look at what this is, which is essentially a functional interface
public class LambdaDemo { public static void main(String[] args) { lambdaThread(); lambdaRunable(); } public static void lambdaThread() { Thread t = new Thread(() -> { System.out.println("lambdaThread Implementation of"); }); t.start(); } public static void lambdaRunable() { Runnable r = () -> { System.out.println("lambdaRunable Implementation of"); }; Thread t1 = new Thread(r); Thread t2 = new Thread(() -> { r.run(); }); t1.start(); t2.start(); } }
Java8 stream
This mainly uses the stream api in Java 8
public class StreamDemo { public static void main(String[] args) { Stream.of(1,2,3,4,5,6,7,8,9,10).parallel().forEach(ele->{ System.out.println(Thread.currentThread().getName()+":"+ele); }); } }
Output: we see multiple threads started
ForkJoinPool.commonPool-worker-1:3 ForkJoinPool.commonPool-worker-1:4 ForkJoinPool.commonPool-worker-5:5 ForkJoinPool.commonPool-worker-5:10 ForkJoinPool.commonPool-worker-4:1 ForkJoinPool.commonPool-worker-2:9 ForkJoinPool.commonPool-worker-3:2 ForkJoinPool.commonPool-worker-5:6 ForkJoinPool.commonPool-worker-1:8 main:7
The default number of threads used by concurrent streams is equal to the number of processor cores on your machine. Through this method, you can modify this value, which is a global property, system.setproperty(“
java.util.concurrent.ForkJoinPool.common.parallelism", "12");
The fork/join framework was introduced by jdk1.7. Stream multithreading in java8 is not a stream. It is based on this framework. Therefore, if you want to deeply understand concurrent streams, you need to learn the fork/join framework. The purpose of the fork/join framework is to recursively split parallel tasks into smaller tasks, and then combine the results of each subtask to generate the overall results. It is executorservic An implementation of the e interface, which allocates subtasks to worker threads in the ForkJoinPool. To submit tasks to this thread pool, you must create a subclass of RecursiveTask. If the task does not return results, it is a subclass of RecursiveAction.
Thread pool
Here, we will not explain in detail the things related to thread pool. We will say how thread pool realizes multi thread, that is, how thread pool creates threads. We know that the purpose of using thread pool is to avoid manually creating a large number of threads and hand over control to thread pool, so as to achieve the purpose of thread reuse.
First, let's take a look at how we use thread pools
public class ThreadPoolDemo { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); while (true) { executorService.submit(() -> { while (true) { System.out.println(Thread.currentThread().getName()); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } }); TimeUnit.SECONDS.sleep(1); } } }
From here, we can see that we only need to submit a Runnable object to the thread pool. In fact, according to the way we implemented threads through Runnable, we can guess that the thread pool uses the Runnable object we submitted to create threads for us. When we create a thread pool, we actually have such a parameter, which is the factory used to create threads,
Executors.newCachedThreadPool() newCachedThreadPool method is just a convenient method provided by java. In fact, the following constructor will be called in the end.
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler); }
Now we can take a look at this factory
/** * The default thread factory */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } // Here is how we create threads public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
For thread pools, threads are essentially created through the thread factory. DefaultThreadFactory is adopted by default. It will set some default values for the threads created by the thread pool, such as the name of the thread, whether it is a guard thread, and the priority of the thread. However, no matter how these properties are set, the thread is finally created through new Thread(), but there are more parameters passed in by the constructor here. It can be seen that creating a thread through the thread pool does not deviate from the first two basic creation methods, because it is essentially realized through new Thread().
Thread startup and status
start method and run method
This is actually a very old question. That is to say, we only call the start method to start a thread for us. If you call the run method directly, it is actually a synchronous call.
Multiple starts
Let's take a look at what happens after multiple starts
public class ThreadStartTimes { public static void main(String[] args) { Runnable target; Thread thread = new Thread(()->{ System.out.println(Thread.currentThread().getName()); }); thread.start(); System.out.println(1); thread.start(); System.out.println(2); thread.start(); System.out.println(3); } }
The output is as follows:
1 Thread-0 Exception in thread "main" java.lang.IllegalThreadStateException at java.lang.Thread.start(Thread.java:708) at thread.thread.ThreadStartTimes.main(ThreadStartTimes.java:12)
When we see the error report, let's take a look at the implementation of this method
public synchronized void start() { /** * In other words, if our thread is in the new state (NEW), the threadStatus is 0, and threads in other states cannot call the start method * A zero status value corresponds to state "NEW". */ if (threadStatus != 0) throw new IllegalThreadStateException(); /* Notify the group that this thread is about to be started * so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */ group.add(this); boolean started = false; try { start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } }
In other words, if the threadStatus of our Thread is not 0 after calling the start method for the first time, you will report an error if you call this method again at this time. However, it should be noted that you cannot see the state change in the Thread code, that is, the state change is maintained by the local method
Correctly understand the relationship between thread and Runnable
It is important to understand this point, which is why the thread pool exists. Let's look at the following code. We can see that the core logic of the thread is actually to call target.run(), and the target here is runnable
@Override public void run() { if (target != null) { target.run(); } }
In general, our run method or our thread object only holds one runnable object. At this time, it often gives people a wrong situation, that is, runnable is functionally equivalent to thread. In fact, it is not. We can look at the following code, a core code of the thread pool, which will be explained in detail later when learning the thread pool
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // Here's the point while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
We can see that a thread does not simply hold a Runnable object and then run. After running, the thread ends. The implementation method here is that the thread runs a Runnable object and obtains a Runnable object from the queue after running. That is, a thread runs multiple Runnable objects. This is the principle of thread pool, This is the relationship between thread and Runnable
summary
- Both Callable and FutureTask, like Runnable, are tasks that need to be executed, not threads themselves. They can be executed in the Thread pool. As shown in the code, the submit() method puts the task into the Thread pool and creates a Thread by the Thread pool. No matter what method is used, it is ultimately executed by the Thread, and the creation method of sub threads is still inseparable from the two basic methods mentioned at the beginning, that is, implementing the Runnable interface and inheriting the Thread class. Always remember that a Thread is a Thread, a task is a task, a Thread is a Thread, and a task is Runnable. Only in this way can you understand the meaning of the Thread pool, that is, we give the task to the Thread pool, and the Thread pool uses the threads in the pool to run our tasks. When a task runs, the Thread can run other tasks, So as to achieve the reuse of threads.
- Callable, FutureTask and Future were introduced in JDK1.5. Therefore, when we look back, there is only one implementation method of multithreading. Because other implementation methods must also use Thread class to implement multithreading, we can understand that other classes such as callable, FutureTask and Runnable are task objects, Then the task object needs to be put into the Thread to execute.
- Which is better between thread and runnable? Runnable is good first because Java can only inherit, so we can use runnable to implement functions. Our objects can also inherit other required classes. Second, every creation of thread is a real thread overhead, so we can use one thread object to execute multiple runnable, This is very interesting. This is also the principle of thread pool; The last part is the design of code. Multithreading is realized through runnable, which decouples runnable from thread class. Thread class is responsible for thread startup and property setting. Runnable encapsulates our business logic, which is actually why we can use thread to execute multiple runnable objects, Because their positioning and original design intention are different. Thread is a physical thread, and runnable is the business logic to be executed on the thread.