Java multithreading combat series notes - Implementation of multithreading

Implementation of multithreading

Implementing threads is the basis of concurrent programming, because we must implement threads before we can continue a series of subsequent operations.

Basic implementation mode

Runable

public class ImplementRunable implements Runnable {
    @Override
    public void run() {
        while (true) {
            // The name of the output thread, which is distinguished from the name of the main thread
            System.out.println(Thread.currentThread().getName());
            try {
                // Thread sleep for one second
                Thread.sleep(1000);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new ImplementRunable()).start();
    }
}
Copy code

In essence, it is a new Thread, just passing the Runable object as a constructor

Thread

The most common method is to inherit the Thread class or directly implement the new Thread object. However, we know that Java inherits from the Thread class, that is, if we inherit the Thread class, we can't inherit other classes. Therefore, this is why we often use Runable to implement multithreading. At this time, we can pass the Runable object to the Thread constructor. Here are the common constructors for Thread

Let's look at an example

public class ExtendsThread extends Thread {
    
    public ExtendsThread() {
        // Sets the name of the current thread
        this.setName("MyThread");
    }

    @Override
    public void run() {
        // Output the name of the current thread every 1s
        while (true) {
            // The name of the output thread, which is distinguished from the name of the main thread
            System.out.println(Thread.currentThread().getName());
            try {
                // Thread sleep for one second
                Thread.sleep(1000);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static void main(String[] args) {
        new ExtendsThread().start();
    }

}

Callable

Both runnable and Callable represent tasks to be executed in different threads. Runnable has been available since JDK 1.0, and Callable is added in JDK 1.5.

The main difference between them is that Callable's call() method can return values and throw exceptions, while Runnable's run() method does not have these functions, and Callable can return Future objects loaded with calculation results.

public class ImplementCallable implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        return new Random().nextInt();
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //Create thread pool
        ExecutorService service = Executors.newFixedThreadPool(1);
        //Submit the task and submit the returned results with Future
        Future<Integer> future = service.submit(new ImplementCallable());
        Integer integer = future.get();
        System.out.println(integer);
    }
}

However, it should be noted that Callable cannot be used directly with Thread. It needs to be used with Thread pool. We also demonstrated how to obtain results through Future object. In fact, we can also use Callable with FutureTask here. We will demonstrate it later when we demonstrate FutureTask

TimerTask

public class TimerTaskDemo {
    /**
     * After a delay of 100ms, print out hello world at an interval of 1s
     *
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        Timer t = new Timer();
        t.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                System.out.println("hello world");
            }
        }, 100, 1000);
    }
}

This looks like a new implementation method, but when you look at the implementation of TimerTask, you will find that this class actually inherits from Runnable, that is, it is actually implemented through Runnable, that is, put the TimerTask object into the queue of a TaskQueue object, and then wait until the scheduling time comes, Then execute the run method of TimerTask.

It should be noted that when we use the Timer class, we are prompted to use ScheduledExecutorService to replace it. Why? This is because the Timer does not use thread pool and the scheduling of the whole Timer is single thread, so we still need to look at the specific implementation principle of Timer

Let's sort out a task scheduling process

We created a Timer object

 public Timer() {
     this("Timer-" + serialNumber());
 }
 public Timer(String name) {
     thread.setName(name);
     thread.start();
}

We can see that a thread is started in the constructor, which is TimerThread

The scheduleAtFixedRate method is used to create and schedule a scheduled task. In fact, the task is only added to the queue of the TaskQueue

public void scheduleAtFixedRate(TimerTask task, Date firstTime,
                                long period) {
    if (period <= 0)
        throw new IllegalArgumentException("Non-positive period.");
    sched(task, firstTime.getTime(), period);
}
private void sched(TimerTask task, long time, long period) {
    if (time < 0)
        throw new IllegalArgumentException("Illegal execution time.");

    // Constrain value of period sufficiently to prevent numeric
    // overflow while still being effectively infinitely large.
    if (Math.abs(period) > (Long.MAX_VALUE >> 1))
        period >>= 1;

    synchronized(queue) {
        if (!thread.newTasksMayBeScheduled)
            throw new IllegalStateException("Timer already cancelled.");

        synchronized(task.lock) {
            if (task.state != TimerTask.VIRGIN)
                throw new IllegalStateException(
                    "Task already scheduled or cancelled");
            task.nextExecutionTime = time;
            task.period = period;
            task.state = TimerTask.SCHEDULED;
        }
      	// Add the task to the queue of the timer,
        queue.add(task);
        if (queue.getMin() == task)
            queue.notify();
    }
}

Timer implements its NB scheduling function. We are not involved in this part. It belongs to the black box. In fact, it is not black. Let's uncover it. The secret is in TimerThread. We know that this thread object is started when timer is created

private TaskQueue queue;

TimerThread(TaskQueue queue) {
    this.queue = queue;
}
// Thread entry, run method
public void run() {
    try {
      	// Call the core method -- loop
        mainLoop();
    } finally {
        // Someone killed this Thread, behave as if Timer cancelled
        synchronized(queue) {
            newTasksMayBeScheduled = false;
            queue.clear();  // Eliminate obsolete references
        }
    }
}

/**
 * The main timer loop.  (See class comment.)
 */
private void mainLoop() {
    while (true) {
        try {
            TimerTask task;
            boolean taskFired;
            synchronized(queue) {
                // If the task queue is empty, wait until
                while (queue.isEmpty() && newTasksMayBeScheduled)
                    queue.wait();
                if (queue.isEmpty())
                    break; // Queue is empty and will forever remain; die

                // Queue nonempty; look at first evt and do the right thing
                long currentTime, executionTime;
              	// Get task from task queue
                task = queue.getMin();
                synchronized(task.lock) {
                    if (task.state == TimerTask.CANCELLED) {
                        queue.removeMin();
                        continue;  // No action required, poll queue again
                    }
                    currentTime = System.currentTimeMillis();
                    executionTime = task.nextExecutionTime;
                    if (taskFired = (executionTime<=currentTime)) {
                        if (task.period == 0) { // Non-repeating, remove
                            queue.removeMin();
                            task.state = TimerTask.EXECUTED;
                        } else { // Repeating task, reschedule
                            queue.rescheduleMin(
                              task.period<0 ? currentTime   - task.period
                                            : executionTime + task.period);
                        }
                    }
                }
                if (!taskFired) // Task hasn't yet fired; wait
                    queue.wait(executionTime - currentTime);
            }
          	// When executing a task, we see that instead of starting a new thread, it is blocking execution
            if (taskFired)  // Task fired; run it, holding no locks
                task.run();
        } catch(InterruptedException e) {
        }
    }
}

From here, we know that Timer executes timed tasks in a single thread, that is, your task may not be executed when the time is up, because the last task has not been executed. Here we write an example to see the above example, which is simple to change

public static void main(String[] args) throws InterruptedException {
    timer();
}
// On the whole, we still want to 1s implement it once
public static void timer(){
    Timer t = new Timer();
    t.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+ ": hello world");
            // We waited in this task
            try {
                TimeUnit.SECONDS.sleep(1000000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, 100, 1000);
}

In fact, after you execute it, you will find that your task TimerTask object 1s cannot be guaranteed to execute once, because the last task has not been completed. That's why idea suggests you use the scheduled executorservice, which essentially belongs to the category of thread pool. We'll discuss it when we learn about the implementation of thread pool.

FutureTask

In Java Concurrent Programs, FutureTask represents an asynchronous operation that can be cancelled. It has the methods of starting and canceling the operation, querying whether the operation is completed and retrieving the operation results.

The result can be retrieved only when the operation is completed. If the operation is not completed, the get method will be blocked.

A FutureTask object can wrap Callable and Runnable objects. Because FutureTask also implements the Runnable interface, it can be submitted to the Executor for execution or used with Thread. However, there is a problem here. We know that Runnable does not return a value. How does FutureTask return a value? Let's take a look at it below

public class FutureTaskDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        callableDemo();
        runableDemo();
    }

    public static void callableDemo() throws ExecutionException, InterruptedException {
        Callable<Integer> call = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("Calculating results...");
                Thread.sleep(3000);
                return 1;
            }
        };
        FutureTask<Integer> task = new FutureTask<>(call);
        Thread thread = new Thread(task);
        thread.start();
        Integer result = task.get();
        System.out.println("callableDemo The results are:" + result);
    }


    public static void runableDemo() throws ExecutionException, InterruptedException {
        Runnable run = new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                System.out.println("Calculating results...");
                Thread.sleep(3000);
            }
        };
      	// The return value is predefined by ourselves
        FutureTask<Integer> task = new FutureTask(run,1);
        Thread thread = new Thread(task);
        thread.start();
        Integer result = task.get();
        System.out.println("runableDemo The results are:" + result);
    }
}

Advanced implementation

Java8 lambda expression

This is actually a syntax sugar, which is still an old routine in essence. Let's take a brief look at what this is, which is essentially a functional interface

public class LambdaDemo {
    public static void main(String[] args) {
        lambdaThread();
        lambdaRunable();
    }


    public static void lambdaThread() {
        Thread t = new Thread(() -> {
            System.out.println("lambdaThread Implementation of");
        });
        t.start();
    }

    public static void lambdaRunable() {
        Runnable r = () -> {
            System.out.println("lambdaRunable Implementation of");
        };
        Thread t1 = new Thread(r);
        Thread t2 = new Thread(() -> {
            r.run();
        });

        t1.start();
        t2.start();
    }
}

Java8 stream

This mainly uses the stream api in Java 8

public class StreamDemo {
    public static void main(String[] args) {
        Stream.of(1,2,3,4,5,6,7,8,9,10).parallel().forEach(ele->{
            System.out.println(Thread.currentThread().getName()+":"+ele);
        });
    }
}

Output: we see multiple threads started

ForkJoinPool.commonPool-worker-1:3
ForkJoinPool.commonPool-worker-1:4
ForkJoinPool.commonPool-worker-5:5
ForkJoinPool.commonPool-worker-5:10
ForkJoinPool.commonPool-worker-4:1
ForkJoinPool.commonPool-worker-2:9
ForkJoinPool.commonPool-worker-3:2
ForkJoinPool.commonPool-worker-5:6
ForkJoinPool.commonPool-worker-1:8
main:7

The default number of threads used by concurrent streams is equal to the number of processor cores on your machine. Through this method, you can modify this value, which is a global property, system.setproperty(“
java.util.concurrent.ForkJoinPool.common.parallelism", "12");

The fork/join framework was introduced by jdk1.7. Stream multithreading in java8 is not a stream. It is based on this framework. Therefore, if you want to deeply understand concurrent streams, you need to learn the fork/join framework. The purpose of the fork/join framework is to recursively split parallel tasks into smaller tasks, and then combine the results of each subtask to generate the overall results. It is executorservic An implementation of the e interface, which allocates subtasks to worker threads in the ForkJoinPool. To submit tasks to this thread pool, you must create a subclass of RecursiveTask. If the task does not return results, it is a subclass of RecursiveAction.

Thread pool

Here, we will not explain in detail the things related to thread pool. We will say how thread pool realizes multi thread, that is, how thread pool creates threads. We know that the purpose of using thread pool is to avoid manually creating a large number of threads and hand over control to thread pool, so as to achieve the purpose of thread reuse.

First, let's take a look at how we use thread pools

public class ThreadPoolDemo {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();

        while (true) {
            executorService.submit(() -> {
                while (true) {
                    System.out.println(Thread.currentThread().getName());
                    try {
                        TimeUnit.SECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            TimeUnit.SECONDS.sleep(1);
        }

    }
}

From here, we can see that we only need to submit a Runnable object to the thread pool. In fact, according to the way we implemented threads through Runnable, we can guess that the thread pool uses the Runnable object we submitted to create threads for us. When we create a thread pool, we actually have such a parameter, which is the factory used to create threads,  
Executors.newCachedThreadPool() newCachedThreadPool method is just a convenient method provided by java. In fact, the following constructor will be called in the end.

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}

Now we can take a look at this factory

/**
 * The default thread factory
 */
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }
  	// Here is how we create threads
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

For thread pools, threads are essentially created through the thread factory. DefaultThreadFactory is adopted by default. It will set some default values for the threads created by the thread pool, such as the name of the thread, whether it is a guard thread, and the priority of the thread. However, no matter how these properties are set, the thread is finally created through new Thread(), but there are more parameters passed in by the constructor here. It can be seen that creating a thread through the thread pool does not deviate from the first two basic creation methods, because it is essentially realized through new Thread().

Thread startup and status

start method and run method

This is actually a very old question. That is to say, we only call the start method to start a thread for us. If you call the run method directly, it is actually a synchronous call.

Multiple starts

Let's take a look at what happens after multiple starts

public class ThreadStartTimes {
    public static void main(String[] args) {
        Runnable target;
        Thread thread = new Thread(()->{
            System.out.println(Thread.currentThread().getName());
        });

        thread.start();
        System.out.println(1);
        thread.start();
        System.out.println(2);
        thread.start();
        System.out.println(3);
    }
}

The output is as follows:

1
Thread-0
Exception in thread "main" java.lang.IllegalThreadStateException
	at java.lang.Thread.start(Thread.java:708)
	at thread.thread.ThreadStartTimes.main(ThreadStartTimes.java:12)

When we see the error report, let's take a look at the implementation of this method

public synchronized void start() {
    /**
     * In other words, if our thread is in the new state (NEW), the threadStatus is 0, and threads in other states cannot call the start method
     * A zero status value corresponds to state "NEW".
     */
    if (threadStatus != 0)
        throw new IllegalThreadStateException();

    /* Notify the group that this thread is about to be started
     * so that it can be added to the group's list of threads
     * and the group's unstarted count can be decremented. */
    group.add(this);

    boolean started = false;
    try {
        start0();
        started = true;
    } finally {
        try {
            if (!started) {
                group.threadStartFailed(this);
            }
        } catch (Throwable ignore) {
            /* do nothing. If start0 threw a Throwable then
              it will be passed up the call stack */
        }
    }
}

In other words, if the threadStatus of our Thread is not 0 after calling the start method for the first time, you will report an error if you call this method again at this time. However, it should be noted that you cannot see the state change in the Thread code, that is, the state change is maintained by the local method

Correctly understand the relationship between thread and Runnable

It is important to understand this point, which is why the thread pool exists. Let's look at the following code. We can see that the core logic of the thread is actually to call target.run(), and the target here is runnable

@Override
public void run() {
    if (target != null) {
        target.run();
    }
}

In general, our run method or our thread object only holds one runnable object. At this time, it often gives people a wrong situation, that is, runnable is functionally equivalent to thread. In fact, it is not. We can look at the following code, a core code of the thread pool, which will be explained in detail later when learning the thread pool

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
      	// Here's the point
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

We can see that a thread does not simply hold a Runnable object and then run. After running, the thread ends. The implementation method here is that the thread runs a Runnable object and obtains a Runnable object from the queue after running. That is, a thread runs multiple Runnable objects. This is the principle of thread pool, This is the relationship between thread and Runnable

summary

  1. Both Callable and FutureTask, like Runnable, are tasks that need to be executed, not threads themselves. They can be executed in the Thread pool. As shown in the code, the submit() method puts the task into the Thread pool and creates a Thread by the Thread pool. No matter what method is used, it is ultimately executed by the Thread, and the creation method of sub threads is still inseparable from the two basic methods mentioned at the beginning, that is, implementing the Runnable interface and inheriting the Thread class. Always remember that a Thread is a Thread, a task is a task, a Thread is a Thread, and a task is Runnable. Only in this way can you understand the meaning of the Thread pool, that is, we give the task to the Thread pool, and the Thread pool uses the threads in the pool to run our tasks. When a task runs, the Thread can run other tasks, So as to achieve the reuse of threads.
  2. Callable, FutureTask and Future were introduced in JDK1.5. Therefore, when we look back, there is only one implementation method of multithreading. Because other implementation methods must also use Thread class to implement multithreading, we can understand that other classes such as callable, FutureTask and Runnable are task objects, Then the task object needs to be put into the Thread to execute.
  3. Which is better between thread and runnable? Runnable is good first because Java can only inherit, so we can use runnable to implement functions. Our objects can also inherit other required classes. Second, every creation of thread is a real thread overhead, so we can use one thread object to execute multiple runnable, This is very interesting. This is also the principle of thread pool; The last part is the design of code. Multithreading is realized through runnable, which decouples runnable from thread class. Thread class is responsible for thread startup and property setting. Runnable encapsulates our business logic, which is actually why we can use thread to execute multiple runnable objects, Because their positioning and original design intention are different. Thread is a physical thread, and runnable is the business logic to be executed on the thread.

 

 

Keywords: Java Database Back-end Multithreading

Added by ttmt on Tue, 07 Dec 2021 10:25:18 +0200