java Concurrent Programming (1) cancelling concurrent programs from closing

1. Mission cancelled and closed

1. Interrupt Thread

1. Each thread has a boolean interrupt state. true is in the interrupt state

Interrupt: issue interrupt request; isInterrupt: return interrupt status; interrupted: clear interrupt status

2. The blocking method in JVM checks the interrupt status of threads. Its response method is: clearing the interrupt status, throwing an InterruptedException exception, indicating that the blocking operation is terminated; however, JVM does not guarantee when the blocking method detects the interrupt status of threads.

3. Interrupt Understanding: It does not really interrupt a running thread, but only makes a request. The specific interrupt is handled by the task itself.

Eliminating threads by interrupting is usually the best way

public class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;
    PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }
    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!Thread.currentThread().isInterrupted())
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
            /* Allow thread to exit */
            //If an interrupt exception is caught, the thread exits itself
        }
    }
    public void cancel() {
        interrupt();
    }
}

 

2. Interruption of uninterruptible blocking

For example, the Socket I/O operation will not interrupt even if the interrupt request is set, but the close socket will throw an exception to achieve the interrupt effect; therefore, we need to rewrite the interrupt method.

  

//custom callable Implementation class
public abstract class SocketUsingTask <T> implements CancellableTask<T> {
    private Socket socket;

    protected synchronized void setSocket(Socket s) {
        socket = s;
    }
    //Cancellation method
    public synchronized void cancel() {
        try {
            if (socket != null)
                socket.close();
        } catch (IOException ignored) {
        }
    }
    //Method of New Instances
    public RunnableFuture<T> newTask() {
        return new FutureTask<T>(this) {
            public boolean cancel(boolean mayInterruptIfRunning) {
                try {
                    SocketUsingTask.this.cancel();
                } finally {
                    return super.cancel(mayInterruptIfRunning);
                }
            }
        };
    }
}

//custom callable Interface
interface CancellableTask <T> extends Callable<T> {
    void cancel();
    RunnableFuture<T> newTask();
}
//Custom execution pool
class CancellingExecutor extends ThreadPoolExecutor {
    ......
    //By rewriting newTaskFor Return to your own Callable
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        if (callable instanceof CancellableTask)
            return ((CancellableTask<T>) callable).newTask();
        else
            return super.newTaskFor(callable);
    }
}

 

 

3. Cancel Timing Tasks by Customizing

private static final ScheduledExecutorService cancelExec = newScheduledThreadPool(1);
    /**
     *
     * @param r task
     * @param timeout timeout
     * @param unit TimeUnit
     * @throws InterruptedException
     */
    public static void timedRun(final Runnable r,long timeout, TimeUnit unit) throws InterruptedException {
        class RethrowableTask implements Runnable {
            //Through a volatile Variable to store thread exceptions
            private volatile Throwable t;
            public void run() {
                try {
                    r.run();
                } catch (Throwable t) {
                    this.t = t;
                }
            }
            private void rethrow() {
                if (t != null)
                    throw launderThrowable(t);
            }
        }
        RethrowableTask task = new RethrowableTask();
        final Thread taskThread = new Thread(task);
        taskThread.start();
        //delayed timeout individual unit Thread interruption after unit execution
        cancelExec.schedule(() -> taskThread.interrupt(), timeout, unit);
        //Wait anyway;If the thread does not respond to interrupts, it passes through join Waiting Task Thread timeout No longer wait after time, go back to the caller thread
        taskThread.join(unit.toMillis(timeout));
        //If there is an exception in the task thread, it throws
        task.rethrow();
    }

Note: Dependent on join, task timeout join exit and normal join launch cannot be judged.

4. Canceling Timing Tasks by Futrue

private static final ExecutorService taskExec = Executors.newCachedThreadPool();
    public static void timedRun(Runnable r,long timeout, TimeUnit unit) throws InterruptedException {
        Future<?> task = taskExec.submit(r);
        try {
            //adopt Futrue.get(timeout),Capture exceptions to handle timing runs and cancel tasks
            task.get(timeout, unit);
        } catch (TimeoutException e) {
            // task will be cancelled below
        } catch (ExecutionException e) {
            // exception thrown in task; rethrow
            throw launderThrowable(e.getCause());
        } finally {
            // Harmless if task already completed
            task.cancel(true); // interrupt if running
        }
    }

2. Stop Thread-based Services

1. Usually, services cannot be interrupted directly, resulting in loss of service data.

2. Thread pool services cannot be interrupted directly either

1. Logging Service

Standard producer, consumer model

public class LogService {
    private final BlockingQueue<String> queue;
    private final LoggerThread loggerThread;
    private final PrintWriter writer;
    private boolean isShutdown;
    private int reservations;

    public LogService(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>();
        this.loggerThread = new LoggerThread();
        this.writer = new PrintWriter(writer);
    }

    public void start() {
        loggerThread.start();
    }

    public void stop() {
        synchronized (this) {
            isShutdown = true;
        }
        loggerThread.interrupt();   //Issue interruption
    }

    public void log(String msg) throws InterruptedException {
        synchronized (this) {
            if (isShutdown){
                throw new IllegalStateException(/*...*/);
            }
            ++reservations; //The correct number of logs saved in the queue
        }
        queue.put(msg);     //Queuing logs
    }

    private class LoggerThread extends Thread {
        public void run() {
            try {
                while (true) {
                    try {
                        synchronized (LogService.this) {
                            if (isShutdown && reservations == 0) {
                                break;
                            }
                        }
                        String msg = queue.take();
                        synchronized (LogService.this) {
                            --reservations;
                        }
                        writer.println(msg);
                    } catch (InterruptedException e) { /* retry */
                        //Interrupt requests were captured, but in order to output the remaining logs, no processing was done until the counter == 0 When, close
                    }
                }
            } finally {
                writer.close();
            }
        }
    }
}

 

2. ExecutorService interruption

shutDown and shutDownNow

Typically, ExecetorService is encapsulated; LogService, for example, gives it its own lifecycle approach

Limitations of shutDownNow: Without knowing the thread status in the current pool, you can return tasks that have not started, but tasks that have started and not ended.

  

public class TrackingExecutor extends AbstractExecutorService {
    private final ExecutorService exec;
    private final Set<Runnable> tasksCancelledAtShutdown =
            Collections.synchronizedSet(new HashSet<Runnable>());

    public TrackingExecutor() {
        exec = Executors.newSingleThreadExecutor();
    }

    /*public TrackingExecutor(ExecutorService exec) {
        this.exec = exec;
    }*/

    public void shutdown() {
        exec.shutdown();
    }

    public List<Runnable> shutdownNow() {
        return exec.shutdownNow();
    }

    public boolean isShutdown() {
        return exec.isShutdown();
    }

    public boolean isTerminated() {
        return exec.isTerminated();
    }

    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        return exec.awaitTermination(timeout, unit);
    }

    public List<Runnable> getCancelledTasks() {
        if (!exec.isTerminated())
            throw new IllegalStateException(/*...*/);
        return new ArrayList<Runnable>(tasksCancelledAtShutdown);
    }

    public void execute(final Runnable runnable) {
        exec.execute(new Runnable() {
            public void run() {
                try {
                    runnable.run();
                } finally {
                    if (isShutdown()
                            && Thread.currentThread().isInterrupted())
                        tasksCancelledAtShutdown.add(runnable);
                }
            }
        });
    }

    @Test
    public void test() throws InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        TrackingExecutor trackingExecutor = new TrackingExecutor();
        trackingExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                    System.err.println("123123");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); //Set the status or continue to throw, in the ____________ execute Intermediate treatment
                    e.printStackTrace();
                } finally {

                }
            }
        });
        List<Runnable> runnables = trackingExecutor.shutdownNow();
        trackingExecutor.awaitTermination(10,TimeUnit.SECONDS);
        List<Runnable> cancelledTasks = trackingExecutor.getCancelledTasks();
        System.err.println(cancelledTasks.size());
    }
}

 

3. Dealing with Abnormal Thread Termination

1. Thread termination caused by uncovered Exception

Handling uncovered exceptions manually

2. Through Thread's API UncaughException Handler, it can detect that a thread encounters Uncaptured and causes abnormal termination.

Note: By default, the exception stack information is exported to the console; the custom Hadler: implements Thread. UncaughException Handler override method

You can set up a global ThreadGroup for each thread

    Thread.setUncaughtExceptionHandler/Thread.setDefaultUncaughtExceptionHandler

2.JVM exit, daemon thread, etc.

Keywords: Java socket jvm

Added by scnjl on Fri, 28 Jun 2019 03:39:24 +0300