java Concurrent programming-Executor framework

Links to the original text: https://my.oschina.net/u/249393/blog/268066

Executor framework refers to a series of functions related to executor in concurrent libraries introduced in java 5, including thread pool, Executor, Executors, Executor Service, Completion Service, Future, Callable, etc. Their relationship is:


One way of programming concurrent programming is to break tasks into columns of small tasks, namely Runnable, and then submit them to an Executor for execution, Executor.execute(Runnalbe). Executor uses an internal thread pool to perform operations during execution.

1. Creating thread pools

The Executors class provides a series of factory methods for pioneering thread pools, and the returned thread pools all implement the ExecutorService interface.

public static ExecutorService newFixedThreadPool(int nThreads)

Create a pool of threads with a fixed number of threads.

public static ExecutorService newCachedThreadPool()

Create a cacheable thread pool and call execute to reuse previously constructed threads if they are available. If no existing thread is available, create a new thread and add it to the pool. Terminate and remove threads that have not been used for 60 seconds from the cache.

public static ExecutorService newSingleThreadExecutor()

Create a single threaded Executor.

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

Create a thread pool that supports timed and periodic task execution, which in most cases can be used to replace the Timer class.

  1. Executor executor = Executors.newFixedThreadPool(10);  
  2. Runnable task = new Runnable() {  
  3.  @Override  
  4.  public void run() {  
  5.   System.out.println("task over");  
  6.  }  
  7. };  
  8. executor.execute(task);  
  9.   
  10. executor = Executors.newScheduledThreadPool(10);  
  11. ScheduledExecutorService scheduler = (ScheduledExecutorService) executor;  
  12. scheduler.scheduleAtFixedRate(task, 1010, TimeUnit.SECONDS);  

Executor Service and Life Cycle

Executor Service extends Executor and adds some lifecycle management methods. An Executor's life cycle has three states: running, closing, and terminating. Executor was running when it was created. When ExecutorService.shutdown() is called, it is closed, and the isShutdown() method returns true. At this point, you shouldn't think about adding tasks to Executor. After all the added tasks have been executed, Executor is terminated and isTerminated() returns true.

If Executor is closed, submitting a task to Executor throws unchecked exception Rejected Execution Exception.

  1. ExecutorService executorService = (ExecutorService) executor;  
  2. while (!executorService.isShutdown()) {  
  3.  try {  
  4.   executorService.execute(task);  
  5.  } catch (RejectedExecutionException ignored) {  
  6.     
  7.  }  
  8. }  
  9. executorService.shutdown();  

3. Using Callable, Future returns the result

Future < V > represents an asynchronous operation. The result of the operation can be obtained by the get() method. If the asynchronous operation has not been completed, get() will block the current thread. FutureTask < V > implements Future < V > and Runable < V >. Callable represents a return worth operation.

  1. Callable<Integer> func = new Callable<Integer>(){  
  2.    public Integer call() throws Exception {  
  3.     System.out.println("inside callable");  
  4.     Thread.sleep(1000);  
  5.     return new Integer(8);  
  6.    }    
  7.   };    
  8.   FutureTask<Integer> futureTask  = new FutureTask<Integer>(func);  
  9.   Thread newThread = new Thread(futureTask);  
  10.   newThread.start();  
  11.     
  12.   try {  
  13.    System.out.println("blocking here");  
  14.    Integer result = futureTask.get();  
  15.    System.out.println(result);  
  16.   } catch (InterruptedException ignored) {  
  17.   } catch (ExecutionException ignored) {  
  18.   }  

ExecutoreService provides a submit() method that passes a Callable, or Runnable, back to Future. If the Executor background thread pool has not completed the Callable calculation, the call returns the get() method of the Future object, blocking until the calculation is completed.

Example: Parallel computing of the sum of arrays.

  1. package executorservice;  
  2.   
  3. import java.util.ArrayList;  
  4. import java.util.List;  
  5. import java.util.concurrent.Callable;  
  6. import java.util.concurrent.ExecutionException;  
  7. import java.util.concurrent.ExecutorService;  
  8. import java.util.concurrent.Executors;  
  9. import java.util.concurrent.Future;  
  10. import java.util.concurrent.FutureTask;  
  11.   
  12. public class ConcurrentCalculator {  
  13.   
  14.  private ExecutorService exec;  
  15.  private int cpuCoreNumber;  
  16.  private List<Future<Long>> tasks = new ArrayList<Future<Long>>();  
  17.   
  18.  //Internal category  
  19.  class SumCalculator implements Callable<Long> {  
  20.   private int[] numbers;  
  21.   private int start;  
  22.   private int end;  
  23.   
  24.   public SumCalculator(final int[] numbers, int start, int end) {  
  25.    this.numbers = numbers;  
  26.    this.start = start;  
  27.    this.end = end;  
  28.   }  
  29.   
  30.   public Long call() throws Exception {  
  31.    Long sum = 0l;  
  32.    for (int i = start; i < end; i++) {  
  33.     sum += numbers[i];  
  34.    }  
  35.    return sum;  
  36.   }  
  37.  }  
  38.   
  39.  public ConcurrentCalculator() {  
  40.   cpuCoreNumber = Runtime.getRuntime().availableProcessors();  
  41.   exec = Executors.newFixedThreadPool(cpuCoreNumber);  
  42.  }  
  43.   
  44.  public Long sum(final int[] numbers) {  
  45.   //Create FutureTask and submit it to Executor based on the number of CPU core tasks split  
  46.   for (int i = 0; i < cpuCoreNumber; i++) {  
  47.    int increment = numbers.length / cpuCoreNumber + 1;  
  48.    int start = increment * i;  
  49.    int end = increment * i + increment;  
  50.    if (end > numbers.length)  
  51.     end = numbers.length;  
  52.    SumCalculator subCalc = new SumCalculator(numbers, start, end);  
  53.    FutureTask<Long> task = new FutureTask<Long>(subCalc);  
  54.    tasks.add(task);  
  55.    if (!exec.isShutdown()) {  
  56.     exec.submit(task);  
  57.    }  
  58.   }  
  59.   return getResult();  
  60.  }  
  61.   
  62.  /** 
  63.   * Iterate each task, get partial sum, add back 
  64.   *  
  65.   * @return  
  66.   */  
  67.  public Long getResult() {  
  68.   Long result = 0l;  
  69.   for (Future<Long> task : tasks) {  
  70.    try {  
  71.     //Blocking if calculation is not completed  
  72.     Long subSum = task.get();  
  73.     result += subSum;  
  74.    } catch (InterruptedException e) {  
  75.     e.printStackTrace();  
  76.    } catch (ExecutionException e) {  
  77.     e.printStackTrace();  
  78.    }  
  79.   }  
  80.   return result;  
  81.  }  
  82.   
  83.  public void close() {  
  84.   exec.shutdown();  
  85.  }  
  86. }  

Main

Java code
  1. int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 10, 11 };   
  2. ConcurrentCalculator calc = new ConcurrentCalculator();   
  3. Long sum = calc.sum(numbers);   
  4. System.out.println(sum);   
  5. calc.close();  

Reproduced in: https://my.oschina.net/u/249393/blog/268066

Keywords: Java Programming

Added by collette on Mon, 09 Sep 2019 11:14:00 +0300