Multithreaded notes - master worker

The multithreaded master worker is also understandable literally

Master is equivalent to a leader. One is enough. If there are more than one, it's a big problem to listen to whom. Master is responsible for assigning tasks to workers. Then summarize the completion of each person

Worker is the person who works, completes the task assigned by the leader, and then gives the result to the leader

This mode is similar to MapReduce of big data. But it is much simpler than that

Here is an example:

Calculate the result of 1 M2 + 2 M2 +... + 100 m2

If the master worker method is used for calculation, first assume that the square calculation is time-consuming, and here assume that each square calculation takes 100ms

In order to get the calculation results here, it needs at least 100 * 100ms = 10000 ms, that is, 10 s time

If we solve this problem through master worker mode, the time will be greatly reduced

MyTask is used to store the value to be calculated, such as: 1, 2, 3

public class MyTask implements Serializable {
    private int id;
    private int num;
    public MyTask(int id, int num) {
        this.id = id;
        this.num = num;
    }  
......    
    @Override
    public String toString() {
        return "MyTask{" +
                "id=" + id +
                ", num=" + num +
                '}';
    }
}

 

Since Master is a leader, he must know what tasks have come, which subordinates to be assigned, and what the results of subordinates' work are

public class MyMaster {
    //1. A container is needed to store the tasks to be performed
    private final ConcurrentLinkedQueue<MyTask> tasks = new ConcurrentLinkedQueue<>();

    //2. A container is needed to store the thread executing the task   <Thread name, thread>
    private HashMap<String, Thread> workThreads = new HashMap<>();

    //3. A container is needed to store the results of each thread <task id, Task result>
    private ConcurrentHashMap<Integer, Object> resMap = new ConcurrentHashMap<>();

    //4. Constructor, take Worker afferent, Let each thread perform the same method
    public MyMaster(AbstractWorker myWorker, int workerCount) {
        myWorker.setTasks(tasks);
        myWorker.setResMap(resMap);

        for (int i = 1; i <= workerCount; i++) {
            String name = "worker" + i;
            workThreads.put(name, new Thread(myWorker));
        }
    }

    //5. Task submitted to container
    public boolean addTask(MyTask task) {
        return tasks.add(task);
    }

    //6. Task start execution method
    public void execute(){
        for (Map.Entry<String, Thread> worker : workThreads.entrySet()) {
            worker.getValue().start();
        }
    }

    //7. Judge whether all threads have completed execution
    public boolean isComplated(){
        for (Map.Entry<String, Thread> worker : workThreads.entrySet()) {
             if(worker.getValue().getState() != Thread.State.TERMINATED){
                 return false;
             }
        }
        return true;
    }

    //8. Sum up, Get results
    public int getResult(){
        int res = 0;
        for (Map.Entry<Integer, Object> resItem : resMap.entrySet()) {
            res += (int) resItem.getValue();
        }
        return res;
    }
}

 

As a Worker, you also need to know the task list. When you finish one task, you can get another one and roll up your sleeves to work. Then you need to know where to put the work results

public abstract class AbstractWorker implements Runnable { 
   // <Thread name, thread>, Subclass holding Master Task list for, Take the task from it
    private ConcurrentLinkedQueue<MyTask> tasks;

    //<task id, Task result>, Subclass holding Master Results list for, Put the calculation results in
    private ConcurrentHashMap<Integer, Object> resMap;

    public void setTasks(ConcurrentLinkedQueue<MyTask> tasks) {
        this.tasks = tasks;
    }

    public void setResMap(ConcurrentHashMap<Integer, Object> resMap) {
        this.resMap = resMap;
    }

    @Override
    public void run() {
        while (true) {
            MyTask task = tasks.poll();
            if (task == null) {
                break;
            }
            Object res = handle(task);
            resMap.put(task.getId(), res);
            System.out.println(Thread.currentThread().getName() + " Calculation " + task.getNum() + " The result is : " + res);
        }
    }

  //Here, the specific implementation logic is put into the subclass, Can increase scalability, In this case, it's Square, That's by passing in different Worker, It can also be calculated
    public abstract Object handle(MyTask task);
}


public class SquareWorker extends AbstractWorker {

    public Object handle(MyTask task)   {
        int res= task.getNum() * task.getNum();
        try {
       //Time consuming of analog operation
            Thread.sleep(100);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        return res;
    }
}

 

Test:

public static void main(String[] args) throws InterruptedException {
    SquareWorker worker = new SquareWorker();
    MyMaster master = new MyMaster(worker, 10);
    String outPrint = "";
    for (int i = 1; i <= 100; i++) {
        MyTask task = new MyTask(i, i);
        master.addTask(task);

        outPrint += i + "²" ;
        if(i < 100){
            outPrint  += " + ";
        }
    }
    System.out.println(outPrint);
   //Calculation time under statistics
long startTime = System.currentTimeMillis(); master.execute(); while (true){ if(!master.isComplated()){ Thread.sleep(50); continue; } int result = master.getResult(); System.out.println("The result of calculation is : " + result + ", Time consuming : " + (System.currentTimeMillis() - startTime)); break; } }

 

Result:

It can be seen that the time consumption is only a little more than 1s, which is much shorter than the previous 10s

The output order here is not orderly. This is also a feature of multithreading. Normally, the code writing order and multithreading execution order are not consistent

Of course, there are advantages and disadvantages in any way

The advantage is shorter execution time

The disadvantage is that there are more threads. You need to know that there is a cost to start threads (and Optimization). If you start more threads, you will occupy more space. It's equivalent to using space for time

Here, you can make a small optimization first, and hand over the threads to the thread pool for hosting. You don't need to know the specific work of those people, which means a little outsourcing. The master just needs to publish the task, and then get the results he wants

Rewrite Master first:

public class MyMaster {

    // <Thread name, thread>, Subclass holding Master Task list for, Take the task from it
    private final ConcurrentLinkedQueue<MyTask> tasks =  new ConcurrentLinkedQueue<>();

    //2. Thread pool is required to host threads
    private ThreadPoolExecutor pool;

    //3. Specific work standards
    private AbstractWorker worker;

    //4. Hire a few people to work
    private int workerCount ;

    //3. A container is needed to store the results of each thread <task id, Task result>
    private ConcurrentHashMap<Integer, Object> resMap = new ConcurrentHashMap<>();

    //4. Constructor, take Worker afferent, Let each thread perform the same method
    public MyMaster(AbstractWorker worker, int workerCount) {

        this.worker = worker;
        worker.setResMap(resMap);
        worker.setTasks(tasks);

        this.workerCount = workerCount;

        pool = new ThreadPoolExecutor(workerCount, workerCount, 0, TimeUnit.SECONDS, new LinkedBlockingQueue());
    }

    public void addTask(MyTask task) {
        tasks.add(task);
    }

    public void execute(){
        for (int i = 0; i < workerCount; i++) {
            pool.execute(worker);
        }
    }

    //7. Judge whether all threads have completed execution
    public void finish() {
        pool.shutdown();
    }

    public boolean isFinished() {
        return pool.isTerminated();
    }

    //8. Sum up, Get results
    public int getResult() {
        int res = 0;
        for (Map.Entry<Integer, Object> resItem : resMap.entrySet()) {
            res += (int) resItem.getValue();
        }
        return res;
    }
}

Test method:

public static void main(String[] args) throws InterruptedException {
    SquareWorker worker = new SquareWorker();
    MyMaster master = new MyMaster(worker, 10);

    String outPrint = "";
    long startTime = System.currentTimeMillis();
    for (int i = 1; i <= 100; i++) {
        MyTask task = new MyTask(i, i);
        master.addTask(task);

        outPrint += i + "²";
        if (i < 100) {
            outPrint += " + ";
        }
    }
    System.out.println(outPrint);

    master.execute();

    master.finish();

    while (!master.isFinished()){
        Thread.sleep(10);
    }

    int res  = master.getResult();
    System.out.println("The result of calculation is : " + res + ", Time consuming : " + (System.currentTimeMillis() - startTime));
}

Result:

Keywords: Java Big Data

Added by Pudgemeister on Wed, 26 Feb 2020 15:47:12 +0200