MapReduce program 3 of Maven project --- realize the function of counting the total salary of employees in each department (optimization)

This paper is based on the realization of the function of counting the total salary of employees in each department. If it has not been realized, please refer to: Realize the function of counting the total salary of employees in each department

Optimization project:

1. Use serialization

2. Implement partition partition

3.Map uses Combiner

Use serialization

This case is in Realize the function of counting the total salary of employees in each department Based on.

Serialization and deserialization:

Serialization refers to the process of converting Java objects into binary strings to facilitate network transmission;

Deserialization is the process of converting binary strings into Java objects.

MapReduce programming model and programming ideas:

And Realize the function of counting the total salary of employees in each department In contrast, this case needs to create one more employee class. The Employee class code is as follows:

code

Employee.java

package com.wang.employee;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author CreazyHacking
 * @version 1.0
 * @date 12/4/2021 10:12 PM
 */
public class Employee implements Writable {

    //Field names empno, ename, job, Mgr, hiredate, Sal, comm, deptno
    //Data type: Int, Char, Char, Int, Date, Int, Int, Int
    //Data: 7654, MARTIN, SALESMAN, 7698, 1981/9/28, 1250, 1400, 30

    //Variables defined above

    private int empno; //Employee number
    private String ename; //full name
    private String job; //department
    private int mgr; //manager
    private String hiredate; //time
    private int sal;  //wages
    private int comm;//bonus

    //Serialization method: a technique for converting java objects into data streams (binary strings / bytes) that can be transmitted across machines
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(this.empno);
        dataOutput.writeUTF(this.ename);
        dataOutput.writeUTF(this.job);
        dataOutput.writeInt(this.mgr);
        dataOutput.writeUTF(this.hiredate);
        dataOutput.writeInt(this.sal);
        dataOutput.writeInt(this.comm);

    }
    //Deserialization method: a technology that converts a data stream (binary string) that can be transmitted across machines into java objects
    public void readFields(DataInput dataInput) throws IOException {
        this.empno = dataInput.readInt();
        this.ename = dataInput.readUTF();
        this.job = dataInput.readUTF();
        this.mgr = dataInput.readInt();
        this.hiredate = dataInput.readUTF();
        this.sal = dataInput.readInt();
        this.comm = dataInput.readInt();
    }
    //Other classes operate variables through the set/get method: source -- > generator getters and setters

    public int getEmpno() {
        return empno;
    }

    public void setEmpno(int empno) {
        this.empno = empno;
    }

    public String getEname() {
        return ename;
    }

    public void setEname(String ename) {
        this.ename = ename;
    }

    public String getJob() {
        return job;
    }

    public void setJob(String job) {
        this.job = job;
    }

    public int getMgr() {
        return mgr;
    }

    public void setMgr(int mgr) {
        this.mgr = mgr;
    }

    public String getHiredate() {
        return hiredate;
    }

    public void setHiredate(String hiredate) {
        this.hiredate = hiredate;
    }

    public int getSal() {
        return sal;
    }

    public void setSal(int sal) {
        this.sal = sal;
    }

    public int getComm() {
        return comm;
    }

    public void setComm(int comm) {
        this.comm = comm;
    }
}

SalaryTotalMapper.java

package com.wang.employee;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author CreazyHacking
 * @version 1.0
 * @date 12/4/2021 8:36 PM
 */
public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //Data: 7499, Allen, salesman, 7698, 1981 / 2 / 201600300,30
        String date = value.toString();

        String[] words = date.split(",");
        //Create employee object
        Employee employee = new Employee();
        //Set employee properties
        employee.setEmpno(Integer.parseInt(words[0]));//Employee number
        employee.setEname(words[1]); //full name
        employee.setJob(words[2]); //position
        try {
            employee.setMgr(Integer.parseInt(words[3])); //Boss number
        } catch (NumberFormatException e) {
            employee.setMgr(-1);  //There may be no boss number
        }
        employee.setHiredate(words[4]); //Entry date
        employee.setSal(Integer.parseInt(words[5])); //a monthly salary
        try {
            employee.setComm(Integer.parseInt(words[6])); //bonus
        } catch (NumberFormatException e) {
            employee.setComm(0); //There may be no bonus
        }
        employee.setDeptno(Integer.parseInt(words[7])); //Department number
        //Take out the department number words[7], convert String to Int, Int to IntWritable object, and assign k2
//        IntWritable k2 = new IntWritable(employee.getDeptno());  // Department number

        //Output k2, v2=employee object
        context.write(new IntWritable(employee.getDeptno()), employee);
    }
}

SalaryTotalReducer.java

package com.wang.employee;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author CreazyHacking
 * @version 1.0
 * @date 12/4/2021 8:36 PM
 */
public class SalaryTotalReducer extends Reducer<IntWritable,Employee,IntWritable,IntWritable> {
    @Override
    protected void reduce(IntWritable k3, Iterable<Employee> v3, Context context) throws IOException, InterruptedException {
       //Take out the data of each employee in v3 for salary summation

        IntWritable k4 = k3;
        int toal = 0;
        for (Employee v : v3){
            toal += v.getSal();
        }
        context.write(k4, new IntWritable(toal));
    }
}

SalaryTotalMain.java

package com.wang.employee;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author CreazyHacking
 * @version 1.0
 * @date 12/4/2021 8:35 PM
 */
@SuppressWarnings("all")
public class SalaryTotalMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1. Create a job and task entry (specify the main class)
        Job job = Job.getInstance();
        job.setJarByClass(SalaryTotalMain.class);
        //2. Specify the mapper of the job and the type of output < K2 V2 >
        job.setMapperClass(SalaryTotalMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Employee.class);
        //3. Specify the reducer and output type of the job < K4 V4 >
        job.setReducerClass(SalaryTotalReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        //4. Specify the input and output paths of the job
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //5. Execute job
        job.waitForCompletion(true);
    }

}

Packaging project

Submit it to Hadoop to run and view the running results. It will not be viewable Realize the function of counting the total salary of employees in each department Packaging implementation process in
Note: if the code path changes, you need to change pro.xml
as

Replace with

<mainClass>com.wang.employee.SalaryTotalMain</mainClass>

result
Execute command

hadoop jar ~/soft/SalTotal-yh-1.0-SNAPSHOT.jar /EMP.csv /output_yh
hdfs dfs -ls /output_yh
hdfs dfs -cat /output_yh/part-r-00000

Implement partition partition

This case is an optimization of the use of serialization: the number of output files can be specified to store the results in partitions.

MapReduce has only one Reduce output file by default, such as part-r-00000.

Partition Partitioner can output multiple Reduce files with different output files, such as part-r-00000, part-r-00001, part-r-00002.

MapReduce programming model and programming ideas:

Compared with partition serialization, this case has one more partition class. The code of the partition class is as follows:

Patition.java

package com.myPatition;
 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
//                                    map-outputs:k2,v2-->IntWritable, Employee 
public class SalaryTotalPartitioner extends Partitioner<IntWritable, Employee>{
 
	@Override
	public int getPartition(IntWritable k2, Employee v2, int numPatition) {
		
		//How to partition: each department is placed in a partition
		if(v2.getDeptno() == 10) {
			//Put it in zone 1
			return 1%numPatition;// 1%3=1
		}else if(v2.getDeptno() == 20){
			//Put it in zone 2
			return 2%numPatition;// 2%3=2
		}else {
			//Put it in zone 3
			return 3%numPatition;// 3%3=0
		}			
	}
}

Employee.java

package com.wang.patitioner;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author CreazyHacking
 * @version 1.0
 * @date 12/4/2021 10:12 PM
 */
public class Employee implements Writable {

    //Field names empno, ename, job, Mgr, hiredate, Sal, comm, deptno
    //Data type: Int, Char, Char, Int, Date, Int, Int, Int
    //Data: 7654, MARTIN, SALESMAN, 7698, 1981/9/28, 1250, 1400, 30

    //Variables defined above

    private int empno; //Employee number
    private String ename; //full name
    private String job; //position
    private int mgr; //manager
    private String hiredate; //time
    private int sal;  //wages
    private int comm;//bonus
    private int deptno; //Department number



    public Employee() {
    }

    //Serialization method: a technique for converting java objects into data streams (binary strings / bytes) that can be transmitted across machines
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(this.empno);
        dataOutput.writeUTF(this.ename);
        dataOutput.writeUTF(this.job);
        dataOutput.writeInt(this.mgr);
        dataOutput.writeUTF(this.hiredate);
        dataOutput.writeInt(this.sal);
        dataOutput.writeInt(this.comm);

    }
    //Deserialization method: a technology that converts a data stream (binary string) that can be transmitted across machines into java objects
    public void readFields(DataInput dataInput) throws IOException {
        this.empno = dataInput.readInt();
        this.ename = dataInput.readUTF();
        this.job = dataInput.readUTF();
        this.mgr = dataInput.readInt();
        this.hiredate = dataInput.readUTF();
        this.sal = dataInput.readInt();
        this.comm = dataInput.readInt();
    }
    //Other classes operate variables through the set/get method: source -- > generator getters and setters

    public int getEmpno() {
        return empno;
    }

    public void setEmpno(int empno) {
        this.empno = empno;
    }

    public String getEname() {
        return ename;
    }

    public void setEname(String ename) {
        this.ename = ename;
    }

    public String getJob() {
        return job;
    }

    public void setJob(String job) {
        this.job = job;
    }

    public int getMgr() {
        return mgr;
    }

    public void setMgr(int mgr) {
        this.mgr = mgr;
    }

    public String getHiredate() {
        return hiredate;
    }

    public void setHiredate(String hiredate) {
        this.hiredate = hiredate;
    }

    public int getSal() {
        return sal;
    }

    public void setSal(int sal) {
        this.sal = sal;
    }

    public int getComm() {
        return comm;
    }

    public void setComm(int comm) {
        this.comm = comm;
    }
    public int getDeptno() {
        return deptno;
    }

    public void setDeptno(int deptno) {
        this.deptno = deptno;
    }
}

SalaryTotalMapper.java

package com.wang.patitioner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author CreazyHacking
 * @version 1.0
 * @date 12/4/2021 8:36 PM
 */
public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //Data: 7499, Allen, salesman, 7698, 1981 / 2 / 201600300,30
        String date = value.toString();

        String[] words = date.split(",");
        //Create employee object
        Employee employee = new Employee();
        //Set employee properties
        employee.setEmpno(Integer.parseInt(words[0]));//Employee number
        employee.setEname(words[1]); //full name
        employee.setJob(words[2]); //position
        try {
            employee.setMgr(Integer.parseInt(words[3])); //Boss number
        } catch (NumberFormatException e) {
            employee.setMgr(-1);  //There may be no boss number
        }
        employee.setHiredate(words[4]); //Entry date
        employee.setSal(Integer.parseInt(words[5])); //a monthly salary
        try {
            employee.setComm(Integer.parseInt(words[6])); //bonus
        } catch (NumberFormatException e) {
            employee.setComm(0); //There may be no bonus
        }
        employee.setDeptno(Integer.parseInt(words[7])); //Department number
        //Take out the department number words[7], convert String to Int, Int to IntWritable object, and assign k2
        IntWritable k2 = new IntWritable(employee.getDeptno());  //Department number
        Employee v2 = employee; //Employee object
        //Output k2, v2=employee object
        context.write(k2, v2);
    }
}

SalaryTotalReducer.java

package com.wang.patitioner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author CreazyHacking
 * @version 1.0
 * @date 12/4/2021 8:36 PM
 */
public class SalaryTotalReducer extends Reducer<IntWritable, Employee,IntWritable,Employee> {
    @Override
    protected void reduce(IntWritable k3, Iterable<Employee> v3, Context context) throws IOException, InterruptedException {
       //Take out the data of each employee in v3 for salary summation
        IntWritable k4 = k3;
        for (Employee e : v3){
            context.write(k4, e);
        }
    }
}

PartEmployeelMain.java

package com.wang.patitioner;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author CreazyHacking
 * @version 1.0
 * @date 12/4/2021 8:35 PM
 */
@SuppressWarnings("all")
public class PartEmployeelMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1. Create a job and task entry (specify the main class)
        Job job = Job.getInstance();
        job.setJarByClass(PartEmployeelMain.class);
        //2. Specify the mapper of the job and the type of output < K2 V2 >
        job.setMapperClass(SalaryTotalMapper.class);
        job.setMapOutputKeyClass(IntWritable.class); //Department number
        job.setMapOutputValueClass(Employee.class); // staff
        //Specifies the partition rule for the task
        job.setPartitionerClass(Patition.class);
        //Specify how many partitions to create
        job.setNumReduceTasks(3);
        //3. Specify the reducer and output type of the job < K4 V4 >
        job.setReducerClass(SalaryTotalReducer.class);
        job.setOutputKeyClass(IntWritable.class);//Department number
        job.setOutputKeyClass(Employee.class);// staff
        //4. Specify the input and output paths of the job
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //5. Execute job
        job.waitForCompletion(true);
    }

}

Package upload execution

Packaging project

Submit it to Hadoop to run and view the running results. It will not be viewable Realize the function of counting the total salary of employees in each department Packaging implementation process in
Note: if the code path changes, you need to change pro.xml
as

Replace with

<mainClass>com.wang.patitioner.PartEmployeelMain</mainClass>

result
Execute command

hadoop jar SalTotal-Patition-1.0-SNAPSHOT.jar /EMP.csv /ouput-patition 
hdfs dfs -ls /ouput-patition 
hdfs dfs -cat /ouput-patition/part-r-00000
hdfs dfs -cat /ouput-patition/part-r-00001
hdfs dfs -cat /ouput-patition/part-r-00002

Map uses Combiner

This case is based on the function of counting the total salary of employees in each department.

Using Combine means using Reduce once to merge intermediate results during Map output, which can Reduce the number of Shuffle network transmission and improve efficiency. However, it should be noted that Combiner cannot be used in some situations, for example, when calculating the average value.

MapReduce programming model and programming ideas:

Using Combine is very simple. You only need to add it to the Main class, as shown in step 2.1 above.

   job.setCombinerClass(SalaryTotalReducer.class);

Package execution

The steps are the same as above
Give the results directly

Reference link

Keywords: Hadoop Maven mapreduce

Added by baw on Sun, 05 Dec 2021 11:00:33 +0200