Java concurrent programming basics

Concurrent basis

Principles of concurrent programming

  1. Atomicity

Atomicity refers to that in an operation, the cpu can't pause and then schedule in the middle of the process, neither be interrupted by the operation, that is to say, one operation or multiple operations are either all executed and the execution process will not be interrupted by any factors, or not be executed.

  1. visibility

For visibility, Java provides the volatile keyword to ensure visibility. When a shared variable is decorated with volatile, it will ensure that the modified value will be immediately updated to main memory. When other threads need to read, it will go to memory to read the new value. The visibility of common shared variables cannot be guaranteed because it is uncertain when the common shared variables will be written to main memory after being modified. When other threads read them, the memory may still be the original old value, so the visibility cannot be guaranteed. In addition, the visibility can also be guaranteed through synchronized and Lock. Synchronized and Lock can ensure that only one thread can acquire the Lock at the same time and execute the synchronization code. Before releasing the Lock, the changes to the variables will be refreshed into main memory.

  1. Orderliness

In Java memory model, compiler and processor are allowed to reorder instructions, but the reordering process will not affect the execution of single threaded programs, but will affect the correctness of multithreaded concurrent execution.

Runnable and Thread
Here's just the difference between implementing the Runnable interface and inheriting the Thread class: take the task of selling 10 tickets as an example. If inheriting the Thread class, starting three threads is equivalent to opening three windows, each window has a task of selling 10 tickets, each selling its own; if implementing the Runnable interface, starting three threads is equivalent to opening three windows to sell tickets, all of which sell 10 tickets. Zhang ticket.

synchronized keyword

1. synchronized object lock

Both synchronized(this) and synchronized methods lock the current object, while synchronized(obj) locks the critical object. With synchronized, it is best to lock critical objects. If you want to make any thread and any user access without any problems, you can consider the method of locking the current object. Because the locking of the current object is heavy, it is generally not used.

For example, in the following Sync class, the two methods, test ﹣ 01 and test ﹣ 02(), lock the Sync objects created by the program. Test ﹣ 02() is recommended for fine-grained control.

public synchronized void test_01() {
    System.out.println("Lock current object");
}
public void test_02() {
    synchronized (this) {
        System.out.println("Lock current object");
    }
}

The following method locks the object object (critical object) in the Sync object

public void test_03() {
    synchronized (object) {
        System.out.println("Lock critical object");
    }
}

2. synchronized uses to lock the current class in a static method

The static synchronization method locks the class object of the current type. For example, if the Sync lock synchronized is added to the static test () method in the Sync class, then the synchronized lock is Sync.class.

//The following two methods are static synchronization methods

public static synchronized void test_04() {
    System.out.println("lock Sync.class");
}
public static void test_05() {
    synchronized (Sync.class)     {
        System.out.println("lock Sync.class class");
    }
}

3. The difference between the static and non-static methods of synchronized

synchronized function and non static method are equivalent to locking a single object, and there is no competition between different objects; while when they are applied to static method, lock loading class, that is, lock class, is equivalent to all objects competing for the same lock.

  1. Exception thrown in sync block, lock released

In the following example, thread 1 will throw an exception when i=5. At this time, the lock of thread 1 is released and thread 2 starts to call the method.

public class Test {
    static class Test02 implements Runnable {
        private int i = 0;
        @Override
        public synchronized void run() {
            while (true) {
                System.out.println(Thread.currentThread().getName() + "_" + i++);
                if (i == 5) { // Exception thrown when i==5, lock released
                    i = 1 / 0;
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                }catch (InterruptedException ignored) { }
            }
        }
    }
public static void main(String[] args) {
    Test02 test02 = new Test02();
    new Thread(test02, "LQ").start();
    new Thread(test02, "WH").start();
}

}

  1. Example analysis

In the following code, the object is locked by LQ and blocked by WH.

public class Test {
static Object object = new Object();
void m() {
System.out.println(Thread.currentThread().getName() + " start...");
synchronized (object){
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception ignored) {}
System.out.println(Thread.currentThread().getName() + "-" + object.hashCode());
}
}
}

static class Test01 implements Runnable {
    @Override
    public void run() {
        new Test().m();
    }
}

static class Test02 implements Runnable {
    @Override
    public void run() {
        new Test().m();
    }
}

public static void main(String[] args) {
    Test01 test01 = new Test01();
    Thread thread = new Thread(test01, "LQ");
    thread.start();
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (Exception ignored) {}
    Test02 test02 = new Test02();
    thread = new Thread(test02, "WH");
    thread.start();
}

}
A new Object is created in the WH thread, and WH runs normally.

public class Test {
    static Object object = new Object();
    void m() {
        System.out.println(Thread.currentThread().getName() + " start...");
        synchronized (object) {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception ignored){}
                System.out.println(Thread.currentThread().getName() + "-" + object.hashCode());
            }
        }
    }
    static class Test01 implements Runnable {
        @Override
        public void run() {
            new Test().m();
        }
    }
    static class Test02 implements Runnable {
        @Override
        public void run() {
            object = new Object();
            new Test().m();
        }
    }

    public static void main(String[] args) {
        Test01 test01 = new Test01();
        Thread thread = new Thread(test01, "LQ");
        thread.start();
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (Exception ignored) {}
        Test02 test02 = new Test02();
        thread = new Thread(test02, "WH");
        thread.start();
    }
}

In the above code, after starting the WH thread, one will be in the waiting state, because the object is locked by the LQ thread, but if the new Object() is re created in the WH thread and assigned to the object, then the WH thread will be able to run normally. The reason is that the synchronization lock is for the objects in memory, so the LQ locks the objects in the first new and the WH locks the objects in the second new, such as The following is a picture.

 

For constants: String a = "aaa" and String b = "aaa" are the same object. Therefore, if method a locks method a and method B locks method B, start LQ thread to call method a, and then start WH thread to call method B, then WH thread will wait until LQ thread finishes executing. Therefore, when defining synchronization code blocks, do not use constants as the target objects of locks.

volatile keyword
There are CPU, memory and cache in the computer. When the CPU is running, the data in the cache is found by default. When the CPU is interrupted, according to the management characteristics of the operating system for the CPU, the cache may be emptied, and the data in the memory may be read into the cache again, or the cache may not be emptied, and the data in the cache is still used for subsequent calculation. If the CPU is not interrupted, the default CPU will only find the cache data. The keyword volatile does not change the characteristics of cache data, but directly changes the characteristics of data in memory. When an object is decorated with the keyword volatile, it is equivalent to informing the underlying OS operating system that it is better to see whether the memory data has changed every time the CPU performs calculation. This is the visibility of memory. Volatile keyword is to ensure the visibility of memory.

The following code will deadlock.

public class Volatile01 {
    private static boolean b = true;
    private void m() {
        System.out.println("start...");
        while (b) {}
        System.out.println("end...");
    }
    static class Volatile_01 implements Runnable {
        @Override
        public void run() {
            new Volatile01().m();
        }
    }
    public static void main(String[] args) {
        Volatile_01 = new Volatile_01();
        new Thread(volatile_01).start();
        try {
            TimeUnit.SECONDS.sleep(1);
        }catch (InterruptedException ignored) {}
        b = false;
    }
}

When the shared variable b in the above code block is decorated with volatile (ensuring visibility), it can jump out of the loop.

public class Volatile01 {
    private static volatile boolean b = true;
    private void m() {
        System.out.println("start...");
        while (b){}
        System.out.println("end...");
    }
    static class Volatile_01 implements Runnable {
        @Override
        public void run(){
            new Volatile01().m();
        }
    }
    public static void main(String[] args) {
        Volatile_01 = new Volatile_01();
        new Thread(volatile_01).start();
        try{
            TimeUnit.SECONDS.sleep(1);
        }catch (InterruptedException ignored){}
        b = false;
    }
}

join() method
Connect multiple threads together and block the thread until the thread calling the join finishes executing.

If you don't use join(), the result will be much less than 100000. join() can be used to wait for a group of threads to finish execution before subsequent logical processing to ensure the correct data.

public class Test {
    private static volatile int count = 0;

    private void m() {
        for (int i = 0; i < 10000; i++) {
            count++;
        }
    }

    static class Test02 implements Runnable {
        @Override
        public synchronized void run() {
            new Test().m();
        }
    }

    public static void main(String[] args) {
        Test02 test02 = new Test02();
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            threads.add(new Thread(test02));
        }
        for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(count);
    }
}

In the above code, the synchronized keyword is used to realize atomicity, or the AtomicInteger object can be used instead of synchronized, because AtomicInteger is an atomic operation object, and the code is as follows.

public class Test{
    private static AtomicInteger count = new AtomicInteger();
    private void m(){
        for (int i = 0; i < 10000; i++){
            count.incrementAndGet();
        }
    }
    static class Test02 implements Runnable{
        @Override
        public void run(){
            new Test().m();
        }
    }
    public static void main(String[] args){
        Test02 test02 = new Test02();
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10; i++){
            threads.add(new Thread(test02));
        }
        for (Thread thread : threads){
            thread.start();
            try{
                thread.join();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
        System.out.println(count);
    }
}

CountDownLatch object
CountDownLatch is equivalent to a latch. When creating a latch object, you can specify the number of locks. If a method calls the latch's await() method, the method will be blocked waiting for the latch to release when it executes to await(), and will continue to execute when there is no lock on the latch, that is, when the latch is open. countDown() when reducing the method of latch locking.

In the following example, when await() is invoked in m1, countDown() is called in m2, so according to m2 logic, the number of locks on the latch is 0 when m2 is executed, and the m1 method can continue to execute.

public class Test {
    private CountDownLatch countDownLatch = new CountDownLatch(5);

    private void m1() {
        try {
            countDownLatch.await(); // Wait for the latch to open
        } catch (Exception ignored) {
        }
        System.out.println("method m1.");
    }

    private void m2() {
        while (countDownLatch.getCount() != 0) {
            countDownLatch.countDown(); // Reduce the lock on the latch
            System.out.println("method m2");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException ignored) {
            }
        }
    }
public static void main(String[] args) {
    Test count01 = new Test();
    new Thread(count01::m2).start();
    new Thread(count01::m1).start();
}

}

The latch can be mixed with the lock, or it can replace the function of the lock, wait before the latch is opened, and execute after the latch is fully opened, which can avoid the low efficiency of the lock.

wait(), notify(), and notifyAll()
Wait(): calling wait() on an object causes the current thread to enter a wait state until another thread calls the notify() or notifyAll() method on the object to wake up the thread.

notify(): a thread the wake object is waiting for.

notifyAll(): when the notifyAll() method of an object is called, all threads in waiting state will be woken up.

(producer consumer) a custom synchronization container with a maximum of 10 containers, which can be used in multi-threaded applications and ensure the safety of data threads.

public class DeviceSingleton<E> {
    private DeviceSingleton() {
    }

    private final int max = 10;
    private int count = 0;
    private static final DeviceSingleton DEVICE_SINGLETON = new DeviceSingleton();

    public static DeviceSingleton getInstance() {
        return DEVICE_SINGLETON;
    }

    private final List<E> devices = new ArrayList<>();

    /**
     * Add to
     */
    public synchronized void add(E data) {
        // Wait when container is full
        while (devices.size() == max) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("add: " + data);
        ThreadUtils.sleep(1000);
        devices.add(data);
        count++;
        this.notify();
    }

    /**
     * Obtain
     */
    public synchronized E get() {
        E data = null;
        while (devices.size() == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        ThreadUtils.sleep(1000);
        data = devices.remove(0);
        count--;
        this.notifyAll();
        return data;
    }

    /**
     * Get length
     */
    public synchronized int size() {
        return count;
    }

    @Data
    static class Device {
        private int id;
        private String name;

        public Device(int id, String name) {
            this.id = id;
            this.name = name;
        }
    }

    static class ThreadUtils {
        public static void sleep(int millis) {
            try {
                Thread.sleep(millis);
            } catch (Exception ignore) {}
        }
    }

}
public class Test {
    public static void main(String[] args) throws InterruptedException {
        DeviceSingleton deviceSingleton = DeviceSingleton.getInstance();
        for (int i = 0; i < 10; i++) {
            new Thread(() ->
            {
                for (int j = 0; j < 5; j++) {
                    System.out.println(deviceSingleton.get());
                }
            }, "consumer-" + i).start();
        }
        Thread.sleep(2000);

        for (int i = 0; i < 2; i++) {
            new Thread(() ->
            {
                for (int j = 0; j < 25; j++) {
                    deviceSingleton.add(new DeviceSingleton.Device(j, "device " + j));
                }
            }, "producer").start();
        }
    }

}

ReentrantLock lock

  1. Reentrant lock

In order to avoid using synchronized and a multi-threaded lock mechanism of synchronization method as much as possible, the proposed synchronization method is more efficient than synchronized. When using a reentry lock, the lock needs to be released manually (lock.unlock()). An example is as follows:

public class ReentrantLockTest {
    private final Lock lock = new ReentrantLock();

    private void m1() {
        lock.lock(); // Lock up
        for (int i = 0; i < 10; i++) {
            System.out.println("method m1() " + i);
            ThreadUtils.sleep(1000);
        }
        lock.unlock(); // Unlock
    }

    private void m2() {
        lock.lock(); // Lock up
        System.out.println("method m2()");
        lock.unlock(); // Unlock
    }

    public static void main(String[] args) {
        ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
        new Thread(reentrantLockTest::m1).start();
        new Thread(reentrantLockTest::m2).start();
    }

}
  1. Try to lock.tryLock()

If the lock tag is not obtained, false will be returned, and the current thread will wait. If the lock tag is obtained, true will be returned, and the current thread will be locked for execution. An example is as follows:

public class ReentrantLockTest {
    private Lock lock = new ReentrantLock();

    private void m1() {
        lock.lock(); // Lock up
        for (int i = 0; i < 10; i++) {
            ThreadUtils.sleep(1000);
            System.out.println("method m1() " + i);
        }
        lock.unlock(); // Unlock
    }
private void m2() {
        boolean isLocked = false;
        try {
                /*
                Try to lock. If there is a lock, the lock tag cannot be obtained. false is returned. Otherwise, true is returned.
                If the lock tag cannot be obtained, the lock is in use by another thread, and the thread waits
                If the lock tag is acquired, the code block of the thread is locked
                The following is a nonparametric method to obtain the lock tag. When the statement is executed, obtain the lock tag immediately.
                It can also be used with parameters, that is, how long does it take to get the lock mark when the statement is executed? If it times out, it will return directly without waiting. For example, isLocked = lock.tryLock(5, TimeUnit.SECONDS); it means to acquire the lock mark within 5 seconds (any time to acquire the lock mark within 5 seconds will continue to execute). If it times out, it will return directly.
                 */
                isLocked = lock.tryLock();
                System.out.println(isLocked ? "m2() synchronized" : "m2() unsynchronized");
        } catch (Exception e) {
                e.printStackTrace();
        } finally {
                // When trying to unlock a lock, it is necessary to determine whether the lock tag is acquired.
                if (isLocked) {
                        lock.unlock();
                }
        }
}
public static void main(String[] args) {
        ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
        new Thread(reentrantLockTest::m1).start();
        new Thread(reentrantLockTest::m2).start();
}

}
  1. Interruptible lock. Lockinterruptible()

When the client calls the interrupt method, the non interruptible lock simply sets the interrupted interrupt state, and no further exception is thrown. When the interruptible lock detects the interrupt request, it will throw the InterruptedException, and then interrupt the thread execution. An example is as follows:

public class ReentrantLockTest {
    private Lock lock = new ReentrantLock();

    private void m1() {
        lock.lock(); // Lock up
        for (int i = 0; i < 5; i++) {
            ThreadUtils.sleep(1000);
            System.out.println("method m1() " + i);
        }
        lock.unlock(); // Unlock
    }

    private void m2() {
        try {
            /*
            Can interrupt lock, block waiting lock, can be interrupted by other threads
             */
            lock.lockInterruptibly(); // Try to interrupt
            System.out.println("method m2()");
        } catch (InterruptedException e) {
            System.out.println("The lock was interrupted.");
        } finally {
            try {
                lock.unlock();
            } catch (Exception ignored) {
            }
        }
    }
public static void main(String[] args) {
    ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
    Thread thread1 = new Thread(reentrantLockTest::m1);
    thread1.start();
    ThreadUtils.sleep(1000);
    Thread thread2 = new Thread(reentrantLockTest::m2);
    thread2.start();
    ThreadUtils.sleep(1000);
    thread2.interrupt(); // Break thread sleep
}

}
Note: use ReentrantLock to interrupt the lock. If you want to interrupt, use thread to interrupt. Unlike wake-up, notifyAll wake-up uses object area to wake up. (break thread. Interrupted(); wake up object.notifyAll()).

What's the use of thread interruption?

When we use Windows, we often encounter the problem of software lock. At this time, we often end the process by opening the task manager. This end process can be considered as breaking the blocking state of the lock (that is, abnormal end).

  1. Public lock

First come, first served. If there is no special case, it is not recommended to use fair locks. If fair locks are used, the concurrency is generally less than 10. If the concurrency is large and there is inevitable access sequence, other methods are recommended.

public class ReentrantLockTest {
    static class TestReentrantLock extends Thread {
        // When creating ReentrantLock object, passing parameter to true means creating fair lock.
        private ReentrantLock lock = new ReentrantLock(true);

        public void run() {
            for (int i = 0; i < 5; i++) {
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " get lock.");
                    ThreadUtils.sleep(1000);
                } finally {
                    lock.unlock();
                }
            }
        }
    }
public static void main(String[] args) {
    TestReentrantLock lock = new TestReentrantLock();
    lock.start();
    new Thread(lock).start();
    new Thread(lock).start();
}

}

  1. Condition

Add conditions for Lock. When the conditions are met, do something, such as Lock or unlock, wait or wake up. The following example is the producer consumer using the Condition implementation.

public class DeviceContainer<T> {
private DeviceContainer() {
}

private static final DeviceContainer DEVICE_CONTAINER = new DeviceContainer<>();

public static DeviceContainer getInstance() {
    return DEVICE_CONTAINER;
}

private final List<T> list = new LinkedList<>();

private final int max = 10;
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();

public void add(T t) {
    lock.lock();
    try {
        while (this.size() == max) {
            System.out.println(Thread.currentThread().getName() + " wait for");
            // When the data length is max, the producer enters the waiting queue and releases the lock mark.
            // Waiting queue entered by condition
            producer.await();
        }
        System.out.println(Thread.currentThread().getName() + " Add to");
        list.add(t);
        count++;
        // Awaken all consumers with conditions
        consumer.signalAll();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
}

public T get() {
    T t = null;
    lock.lock();
    try {
        while (this.size() == 0) {
            System.out.println(Thread.currentThread().getName() + " wait for");
            // Enabling consumers to enter the waiting queue with conditions
            consumer.await();
        }
        System.out.println(Thread.currentThread().getName() + " Obtain");
        t = list.remove(0);
        count--;
        // Awaken all producers with conditions
        producer.signalAll();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
    return t;
}

private int size() {
    return count;
}

}

public class Test {
public static void main(String[] args) throws InterruptedException {
DeviceContainer<Device> deviceSingleton = DeviceContainer.getInstance();
for (int i = 0; i < 10; i++) {
new Thread(() ->
{
for (int j = 0; j < 5; j++) {
System.out.println(deviceSingleton.get());
}
}, "consumer-" + i).start();
}
ThreadUtils.sleep(1000);
for (int i = 0; i < 2; i++) {
new Thread(() ->
{
for (int j = 0; j < 25; j++) {
deviceSingleton.add(new Device(j, "device " + j));
}
}, "producer-" + i).start();
}

}

}

Synchronization container in Java

  1. Map/Set

ConcurrentHashMap/ConcurrentHashSet: the Map/Set implemented by the underlying hash is of high efficiency, thread safety realized by the underlying technology, and its magnitude is lighter than synchronized. key and value cannot be null (different from HashMap and HashSet)

ConcurrentSkipListMap/ConcurrentSkipListSet: the Map/Set implemented by the underlying hop table is orderly, thread safe and less efficient than ConcurrentHashMap/ConcurrentHashSet.

CopyOnWriteArraySet: low level array, thread safety, low efficiency of addition and deletion, high efficiency of query.

  1. List

CopyOnWriteArrayList: low level array, thread safety, low efficiency of addition and deletion, high efficiency of query.

  1. Queue

ConcurrentLinkedQueue/ ConcurrentLinkedDeue: basic linked list synchronous queue, non blocking, ConcurrentLinkedQueue bottom one-way linked list, ConcurrentLinkedDeue bottom two-way linked list, all * * *.

ArrayBlockingQueue/LinkedBlockingQueue: the queue is blocked. If the queue capacity is insufficient, it will be automatically blocked. If the queue capacity is 0, it will be automatically blocked. ArrayBlockingQueue uses array at the bottom, bounded; LinkedBlockingQueue uses linked list at the bottom, default is * * *. ArrayBlockingQueue has different characteristics according to the different calling API. It has blocking ability when the capacity is insufficient. The add method throws an exception when the capacity is insufficient; the put method blocks and waits when the capacity is insufficient; the offer does not block by default, and returns false when the capacity is insufficient, otherwise returns true; the three parameter offer can set the blocking duration, if there is capacity idle within the blocking duration, then adds and returns true; if there is no capacity idle within the blocking duration, discards the new data and returns false . The add method of LinkedBlockingQueue throws an exception when the capacity is insufficient; the offer method returns false when the capacity is insufficient, otherwise it returns true; the three parameter offer can set the blocking duration, if there is capacity idle within the blocking duration, it adds and returns true; if there is no capacity idle within the blocking duration, it discards the new data and returns false.

PriorityQueue: finite set queue, underlying array, * * *.

PriorityBlockingQueue: priority blocking queue, underlying array, * * *.

LinkedTransferQueue: transfer the queue, and use the transfer method to realize the real-time processing of data. The queue uses add to save data without blocking. Transfer is a unique method of TransferQueue. The transfer queue must have a consumer (the caller of the take() method). If no threads consume data, the transfer method is blocked. It is generally used to process instant messages.

SynchronousQueue: blocked synchronous queue, bounded. Is a queue with a capacity of 0, which is a special TransferQuque. A queue that must have a consuming thread waiting before it can be used. The add method is not blocked. If no consuming thread is blocked waiting for data, an exception will be thrown. Put method is blocked. If no consuming thread is blocked waiting for data, put method is blocked.

DelayQueue: delay blocking queue, * * *. Similar to polling mechanism, it is generally used for timing tasks. Examples of business scenarios: cache with expiration time, automatic cancellation of order expiration, etc.

 

Thread pool
The thread pool is a process level resource. The default life cycle is the same as that of the JVM, that is, from the start of the thread pool to the end of the JVM, it is the default life cycle of the thread pool. If the shutdown method is called explicitly, the thread pool will shut down automatically after all tasks are performed.

Executor interface
Thread pool top level interface. There is only one method in Executor, execute, which is a service method used to process tasks. The caller provides the implementation of the Runnable interface, which is executed by the thread pool through the execution thread.

public class Executor01 {
public static void main(String[] args) {
new Executor_01().execute(() ->
System.out.println(Thread.currentThread().getName() + " test executor.")
);
}
static class Executor_01 implements Executor {br/>@Override
public void execute(@NotNull Runnable command) {
new Thread(command).start();
}
}
}
ExecutorService
Different from Executor, it also provides a service method submit with return value of Future.

Executors tool class
Executor's tool class provides tools and methods for thread pool, which can quickly create thread pool. All thread pool types implement this interface, which means that they have the ability to provide thread pool. Common methods are: void execute(), Future submit(Callable), Future submit(Runnable), void shutdown, boolean isShutdown(), boolean isTerminated().

public class Test {
public static void main(String[] args) throws InterruptedException {
//Create a thread pool object with a length of 5
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 6; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " executor.");
ThreadUtils.sleep(1000);
});
}
System.out.println(executorService);

    // Graceful closure
    executorService.shutdown();
    // Whether it has ended is equivalent to judging whether the resource has been recycled. Because the thread is sleeping, it has not been recycled at this time, so it is false.
    System.out.println(executorService.isTerminated());
    // Whether it has been closed, that is, whether the shutdown method has been called
    System.out.println(executorService.isShutdown());
    System.out.println(executorService);

    ThreadUtils.sleep(1000);

    // It is true because the task has been executed and the resource has been recycled after sleeping for 5 seconds.
    System.out.println(executorService.isTerminated());
    System.out.println(executorService.isShutdown());
    System.out.println(executorService);
}

}
Future
Future results, which represent the results after thread execution. Get the thread execution result through get method.

Common methods: get(), get(long, TimeUnit), and isDown().

get(): block and wait for the thread to finish executing and get the returned result;

get(long, TimeUnit): wait for the result after the end of the thread for a fixed blocking time. If the thread does not finish executing within the blocking time range, an exception will be thrown.

isDown(): judge whether the call method has been completed by judging whether the thread ends or not. Pay special attention to the difference between isDown and isShutdown in ExecutorService. isShutdown is used to judge whether the thread is closed or not.

public class ExecutorServiceTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
testExecutorService();
}

private static void testExecutorService() throws ExecutionException, InterruptedException {
    ExecutorService service = Executors.newFixedThreadPool(1);
    Future<String> future = service.submit(() -> {
        ThreadUtils.sleep(1000);
        return Thread.currentThread().getName() + " submit.";
    });

    // Check whether the task is completed or not, that is, whether the thread ends or whether the call method ends.
    // It should be noted that, unlike isShutDowm in ExecutorService, isShutDowm is used to determine whether the thread ends, and shutdown is used to close the thread.
    System.out.println(future.isDone());
    // Get the return value of the call method
    System.out.println(future.get()); // false

    System.out.println(future.isDone());
    System.out.println(future.get()); // true

    // Close thread pool
    service.shutdown();
}

}
Callable interface
Executable interface. Similar to the Runnable interface, it is also an interface that can start threads.

Interface method: call(), which is equivalent to the run method in Runnable. The difference is that the call method has a return value.

Selection of Callable and Runnable: when it is necessary to return a value or throw an exception, use Callable, and select any other case.

ThreadPoolExecutor creating thread pool
Create it through the new ThreadPoolExecutor. The following figure shows the three construction methods of ThreadPoolExecutor:

Parameter Description:

corePoolSize number of core threads

maximumPoolSize maximum number of threads

keepAliveTime thread maximum idle time

unitTimeUnit

Workqueueblockingqueue < runnable > thread waiting queue

threadFactoryThreadFactory thread create factory

handlerRejectedExecutionHandler reject policy
 

Number of core threads and maximum threads:

When a new task is submitted to the thread pool, first judge whether the core thread number corePoolSize has reached the upper limit. If it does not reach the upper limit, create a worker thread to execute the task. Otherwise, judge whether the work queue of the thread pool workQueueBlockingQueue is full. If not, store the newly submitted task in the work queue. Otherwise, the thread pool will judge the maximum line. Whether the number of programs has reached the maximum. If the maximum poolsize is not reached, a new worker thread will be created to execute the task. If it is full, the task will be handled by the saturation policy. If the number of threads in the thread pool is greater than the number of core threads corePoolSize, and the thread idle time exceeds the thread maximum idle time keepAliveTime, the thread will be terminated until the number of threads in the thread pool is not greater than corePoolSize.

Custom thread pool

public class ExecutorThreadPoolTest {
public static void main(String[] args) {
testExecutorThreadPool();
}

private static void testExecutorThreadPool() {
    // Create a thread pool with 2 core threads, 4 maximum threads and 10 maximum idle time
    ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
            4,
            10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2),
            new MyTreadFactory(),
            new MyIgnorePolicy());
    // Start all core threads and make them out of wait state
    executor.prestartAllCoreThreads();

    // Create and execute tasks
    for (int i = 1; i <= 10; i++) {
        MyTask task = new MyTask(String.valueOf(i));
        executor.execute(task);
    }
}

static class MyTreadFactory implements ThreadFactory {

    private final AtomicInteger mThreadNum = new AtomicInteger(1);

    @Override
    public Thread newThread(Runnable runnable) {
        Thread t = new Thread(runnable, "Thread [" + mThreadNum.getAndIncrement() + "]");
        System.out.println(t.getName() + " Already created");
        return t;
    }
}

public static class MyIgnorePolicy implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        doLog(runnable, executor);
    }

    private void doLog(Runnable runnable, ThreadPoolExecutor executor) {
        System.err.println(runnable.toString() + " Be refused");
    }
}

@Data
static class MyTask implements Runnable {
    private String name;

    public MyTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println(this.toString() + " Running");
        ThreadUtils.sleep(1000);
    }

    @Override
    public String toString() {
        return "Thread [" + name + "]";
    }
}

}
FixedThreadPool thread pool
The fixed capacity thread pool can be created by Executors. There is a upper limit for the activity state and the thread pool capacity. You need to destroy the thread pool manually. The construction method is as follows:

It can be seen that the number of core threads and the maximum number of threads in the thread pool are the construction parameter value nThreads, the maximum idle time of threads is 0, the task queue uses LinkedBlockingQueue, and the default upper limit of capacity is Integer.MAX_VALUE.

public class Test {
public static void main(String[] args) {
new Test().test();
}

public void test() {
    // Create a FixedThreadPool thread pool with a capacity of 10
    ExecutorService service = Executors.newFixedThreadPool(10);
    for (int i = 0; i < 100; i++) {
        service.execute(()-> System.out.println(Thread.currentThread().getName()));
    }
    // Destroy thread pool
    service.shutdown();
}

}
CachedThreadPool thread pool
Cache thread pool is created by Executors. The default maximum capacity is integer.max'value, which is automatically expanded and destroyed after execution (this is different from FixedThreadPool, which needs to call the shutdown method manually to destroy FixedThreadPool). The construction method is as follows:

It can be seen from the construction method that the number of core threads is 0, the maximum number of threads is Integer.MAX_VALUE, the maximum idle time is 60 seconds, and the task queue uses SynchronousQueue.

public class Test {
public static void main(String[] args) {
new Test().test();
}

public void test() {
    // Create cache thread pool
    ExecutorService service = Executors.newCachedThreadPool();
    System.out.println(service);
    for (int i = 0; i < 5; i++) {
        service.execute(() -> {
            ThreadUtils.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " executor.");
        });
    }
    System.out.println(service);
    ThreadUtils.sleep(65);
    System.out.println(service);
}

}
ScheduledThreadPool thread pool
The scheduled task thread pool, which can be automatically executed according to the task, is created by Executors and needs to be destroyed manually. Select when planning tasks, such as the need to regularly organize data, the server to regularly clear invalid files, etc. The construction method is as follows:

The number of core threads is the size of construction parameters, the maximum number of threads is integer.max'value, the maximum idle time is 0, and the task queue uses DelayedWorkQuquq.

Common methods include: scheduledAtFixedRate, schedule, execute, etc.

public class Test {
public static void main(String[] args) {
new Test().test();
}

public void test() {
    // Create scheduled task thread pool
    ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
    System.out.println(service);
    // Regular task: the first task is executed after the thread pool starts 500 milliseconds, and every 300 milliseconds thereafter.
    service.scheduleAtFixedRate(() -> {
        ThreadUtils.sleep(1000);
        System.out.println(Thread.currentThread().getName() + " executor.");
    }, 500, 300, TimeUnit.MILLISECONDS);
    System.out.println(service);
    service.shutdown();
}

}
Singlethreadexector thread pool
A single capacity thread pool. Manual destruction is required. It can be selected when there is a need to ensure the order of tasks. Such as the public channel chat in the hall, the second kill of a fixed number of goods, etc. The construction method is as follows:

The number of core threads and the maximum number of threads are both 1, and the task queue is LinkedBlockingQueue.

public class Test {
public static void main(String[] args) {
new Test().test();
}

public void test() {
    // Create a single capacity thread pool
    ExecutorService service = Executors.newSingleThreadExecutor();
    System.out.println(service);
    for (int i = 0; i < 5; i++) {
        service.execute(() -> {
            System.out.println(Thread.currentThread().getName() + " executor.");
            ThreadUtils.sleep(1000);
        });
    }
    service.shutdown();
}

}
ForkJoinPool thread pool
Branch merge thread pool, suitable for processing complex tasks. The initialization thread capacity is related to the number of CPU cores.

ForkJoinPool has no so-called capacity. It is a thread by default. It branches new sub threads automatically according to the task, and merges automatically when the sub threads are finished. The so-called automatic merging is realized by two methods: fork and join (called manually).

The divide and conquer merge task running in the thread pool must be a subtype of the ForkJoinTask (recursive task or recursive action, the difference between which is that one has a return value after running and the other does not), which provides branch and merge capabilities.

ForkJoinTask provides two abstract subtypes: recursive task and recursive action. Recursive task is a branch merge task with returned results, and recursive action is a branch merge task without returned results (similar to the difference between Callable and Runnable).

ForkJoinTask provides a compute method, which is the execution logic of the task.

The route pool is mainly used for the calculation and analysis of a large number of data.

public class Test {

public static void main(String[] args) throws ExecutionException, InterruptedException {
    long result = 0L;
    for (int NUMBER : NUMBERS) {
        result += NUMBER;
    }
    System.out.println(result);

    ForkJoinPool pool = new ForkJoinPool();
    // Branch merge task
    AddTask task = new AddTask(0, NUMBERS.length);
    // Submit tasks
    Future<Long> future = pool.submit(task);
    System.out.println(future.get());
}

private static final int[] NUMBERS = new int[1000000];
private static final int MAX_SIZE = 50000;
private static final Random RANDOM = new Random();

static {
    for (int i = 0; i < NUMBERS.length; i++) {
        NUMBERS[i] = RANDOM.nextInt(1000);
    }
}

static class AddTask extends RecursiveTask<Long> {
    int begin, end;

    AddTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if ((end - begin) < MAX_SIZE) {
            long sum = 0L;
            for (int i = begin; i < end; i++) {
                sum += NUMBERS[i];
            }
            return sum;
        } else {
            // Branch when the end value minus the start value is greater than the critical value
            int middle = begin + (end - begin) / 2;
            AddTask task1 = new AddTask(begin, middle);
            AddTask task2 = new AddTask(middle, end);
            // Branch's job is to start a new thread task
            task1.fork();
            task2.fork();
            // join is to merge and get the result of the task. It is a blocking method and will definitely get the result data.
            return task1.join() + task2.join();
        }
    }
}

}

Thread group

A collection of threads in which multiple threads perform the same batch of tasks. The threads are isolated from each other. The same group of threads can communicate with each other, but different groups of threads can not communicate with each other, so as to achieve thread shielding and ensure thread safety.

public class Test {

public static void main(String[] args) {
    new Test().test();
}

public void test() {
    ThreadGroup group = new ThreadGroup("LQ");
    Thread thread = new Thread(group, () ->
            System.out.println("group is " + Thread.currentThread().getThreadGroup().getName())
    );
    thread.start();
}

}
If you feel that there are any mistakes, shortcomings, or questions in the content, please leave a message to point out and learn together.

Keywords: Java Windows less jvm

Added by Bladescope on Thu, 24 Oct 2019 21:14:31 +0300