The multithreaded master worker is also understandable literally
Master is equivalent to a leader. One is enough. If there are more than one, it's a big problem to listen to whom. Master is responsible for assigning tasks to workers. Then summarize the completion of each person
Worker is the person who works, completes the task assigned by the leader, and then gives the result to the leader
This mode is similar to MapReduce of big data. But it is much simpler than that
Here is an example:
Calculate the result of 1 M2 + 2 M2 +... + 100 m2
If the master worker method is used for calculation, first assume that the square calculation is time-consuming, and here assume that each square calculation takes 100ms
In order to get the calculation results here, it needs at least 100 * 100ms = 10000 ms, that is, 10 s time
If we solve this problem through master worker mode, the time will be greatly reduced
MyTask is used to store the value to be calculated, such as: 1, 2, 3
public class MyTask implements Serializable { private int id; private int num; public MyTask(int id, int num) { this.id = id; this.num = num; } ...... @Override public String toString() { return "MyTask{" + "id=" + id + ", num=" + num + '}'; } }
Since Master is a leader, he must know what tasks have come, which subordinates to be assigned, and what the results of subordinates' work are
public class MyMaster { //1. A container is needed to store the tasks to be performed private final ConcurrentLinkedQueue<MyTask> tasks = new ConcurrentLinkedQueue<>(); //2. A container is needed to store the thread executing the task <Thread name, thread> private HashMap<String, Thread> workThreads = new HashMap<>(); //3. A container is needed to store the results of each thread <task id, Task result> private ConcurrentHashMap<Integer, Object> resMap = new ConcurrentHashMap<>(); //4. Constructor, take Worker afferent, Let each thread perform the same method public MyMaster(AbstractWorker myWorker, int workerCount) { myWorker.setTasks(tasks); myWorker.setResMap(resMap); for (int i = 1; i <= workerCount; i++) { String name = "worker" + i; workThreads.put(name, new Thread(myWorker)); } } //5. Task submitted to container public boolean addTask(MyTask task) { return tasks.add(task); } //6. Task start execution method public void execute(){ for (Map.Entry<String, Thread> worker : workThreads.entrySet()) { worker.getValue().start(); } } //7. Judge whether all threads have completed execution public boolean isComplated(){ for (Map.Entry<String, Thread> worker : workThreads.entrySet()) { if(worker.getValue().getState() != Thread.State.TERMINATED){ return false; } } return true; } //8. Sum up, Get results public int getResult(){ int res = 0; for (Map.Entry<Integer, Object> resItem : resMap.entrySet()) { res += (int) resItem.getValue(); } return res; } }
As a Worker, you also need to know the task list. When you finish one task, you can get another one and roll up your sleeves to work. Then you need to know where to put the work results
public abstract class AbstractWorker implements Runnable { // <Thread name, thread>, Subclass holding Master Task list for, Take the task from it private ConcurrentLinkedQueue<MyTask> tasks; //<task id, Task result>, Subclass holding Master Results list for, Put the calculation results in private ConcurrentHashMap<Integer, Object> resMap; public void setTasks(ConcurrentLinkedQueue<MyTask> tasks) { this.tasks = tasks; } public void setResMap(ConcurrentHashMap<Integer, Object> resMap) { this.resMap = resMap; } @Override public void run() { while (true) { MyTask task = tasks.poll(); if (task == null) { break; } Object res = handle(task); resMap.put(task.getId(), res); System.out.println(Thread.currentThread().getName() + " Calculation " + task.getNum() + " The result is : " + res); } } //Here, the specific implementation logic is put into the subclass, Can increase scalability, In this case, it's Square, That's by passing in different Worker, It can also be calculated public abstract Object handle(MyTask task); } public class SquareWorker extends AbstractWorker { public Object handle(MyTask task) { int res= task.getNum() * task.getNum(); try { //Time consuming of analog operation Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return res; } }
Test:
public static void main(String[] args) throws InterruptedException { SquareWorker worker = new SquareWorker(); MyMaster master = new MyMaster(worker, 10); String outPrint = ""; for (int i = 1; i <= 100; i++) { MyTask task = new MyTask(i, i); master.addTask(task); outPrint += i + "²" ; if(i < 100){ outPrint += " + "; } } System.out.println(outPrint);
//Calculation time under statistics long startTime = System.currentTimeMillis(); master.execute(); while (true){ if(!master.isComplated()){ Thread.sleep(50); continue; } int result = master.getResult(); System.out.println("The result of calculation is : " + result + ", Time consuming : " + (System.currentTimeMillis() - startTime)); break; } }
Result:
It can be seen that the time consumption is only a little more than 1s, which is much shorter than the previous 10s
The output order here is not orderly. This is also a feature of multithreading. Normally, the code writing order and multithreading execution order are not consistent
Of course, there are advantages and disadvantages in any way
The advantage is shorter execution time
The disadvantage is that there are more threads. You need to know that there is a cost to start threads (and Optimization). If you start more threads, you will occupy more space. It's equivalent to using space for time
Here, you can make a small optimization first, and hand over the threads to the thread pool for hosting. You don't need to know the specific work of those people, which means a little outsourcing. The master just needs to publish the task, and then get the results he wants
Rewrite Master first:
public class MyMaster { // <Thread name, thread>, Subclass holding Master Task list for, Take the task from it private final ConcurrentLinkedQueue<MyTask> tasks = new ConcurrentLinkedQueue<>(); //2. Thread pool is required to host threads private ThreadPoolExecutor pool; //3. Specific work standards private AbstractWorker worker; //4. Hire a few people to work private int workerCount ; //3. A container is needed to store the results of each thread <task id, Task result> private ConcurrentHashMap<Integer, Object> resMap = new ConcurrentHashMap<>(); //4. Constructor, take Worker afferent, Let each thread perform the same method public MyMaster(AbstractWorker worker, int workerCount) { this.worker = worker; worker.setResMap(resMap); worker.setTasks(tasks); this.workerCount = workerCount; pool = new ThreadPoolExecutor(workerCount, workerCount, 0, TimeUnit.SECONDS, new LinkedBlockingQueue()); } public void addTask(MyTask task) { tasks.add(task); } public void execute(){ for (int i = 0; i < workerCount; i++) { pool.execute(worker); } } //7. Judge whether all threads have completed execution public void finish() { pool.shutdown(); } public boolean isFinished() { return pool.isTerminated(); } //8. Sum up, Get results public int getResult() { int res = 0; for (Map.Entry<Integer, Object> resItem : resMap.entrySet()) { res += (int) resItem.getValue(); } return res; } }
Test method:
public static void main(String[] args) throws InterruptedException { SquareWorker worker = new SquareWorker(); MyMaster master = new MyMaster(worker, 10); String outPrint = ""; long startTime = System.currentTimeMillis(); for (int i = 1; i <= 100; i++) { MyTask task = new MyTask(i, i); master.addTask(task); outPrint += i + "²"; if (i < 100) { outPrint += " + "; } } System.out.println(outPrint); master.execute(); master.finish(); while (!master.isFinished()){ Thread.sleep(10); } int res = master.getResult(); System.out.println("The result of calculation is : " + res + ", Time consuming : " + (System.currentTimeMillis() - startTime)); }
Result: