MapReduce Performance Optimization -- data skew problem

Let's analyze a scenario:
Suppose we have a file with 1000W pieces of data. The values in it are mainly numbers, 1,2,3,4,5,6,7,8,9,10. We want to count the number of occurrences of each number

In fact, in private, we know the general situation of this data. Among the 1000w data, there are about 910w data with a value of 5, and there are only 90w of the remaining 9 numbers, which means that the data with a value of 5 is relatively concentrated, or the data with a value of 5 belongs to inclined data. In this whole data, It accounts for much more than other data.

Assuming that the 1000W data file has three block s, three inputsplts will be generated, and finally three Map tasks will be generated. By default, there is only one reduce task, so all data will be processed by this reduce task. In this way, the reduction pressure must be great, and a lot of time is consumed here

According to our previous analysis, we can increase the number of reduce tasks. We adjust the number of reduce tasks to 10. At this time, 1000w data will be processed in parallel by these 10 reduce tasks. At this time, the efficiency will certainly be improved, but finally we will find that the performance improvement is limited and does not achieve qualitative improvement, Then why?

Let's analyze. We just said that there are 910w pieces of data with a value of 5 in our data, which accounts for 90% of the whole data. Then 90% of the data will be processed by a reduce task. Here, let's assume that it is processed by reduce5. The execution of reduce5 is relatively slow, and other reduce tasks have been executed for a long time, but it has not finished yet, Because the amount of data processed in reduce5 is too different from the amount of data processed in other reduce, reduce5 finally lags behind. The time consumed in the execution of mapreduce tasks is counted until the last reduce task is completed, so it is useless even if other reduce tasks are completed long ago. The whole mapreduce task is not completed.

What about this situation?
At this time, simply increasing the number of reduce tasks does not play much role. If you start too many tasks, it may be counterproductive. In fact, the best way at this time is to break up the data with the value of 5 as much as possible, and allocate the inclined data to other reduce tasks for calculation, so as to fundamentally solve the problem.

This is a data skew problem we want to analyze
When the MapReduce program is executed, most of the Reduce nodes are executed, but one or several Reduce nodes run very slowly, resulting in a long processing time of the whole program
The specific performance is: the Ruduce stage has been stuck

According to the analysis just now, there are two schemes:

  1. Increase the number of reduce tasks: This is a temporary solution but not a permanent solution. For data with less severe skew, the problem can be solved. For data with severe skew, the fundamental problem can not be solved
  2. Break up the skewed data: this can cure the severely skewed data.

Let's focus on the implementation of scheme 2
In fact, the idea is very simple. Here we deal with it like this. Divide the data of 5 into 10 parts, and spell a random number of 0 ~ 9 behind the value 5.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Random;

public class WordCountJobSkewRandKey {
    /**
	* Map stage
    */
    public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{
        Logger logger = LoggerFactory.getLogger(MyMapper.class);
        Random random = new Random();
        
        /**
		* You need to implement the map function
		* This map function can receive < K1, V1 > and generate < K2, V2 > 
		*/
        @Override
        protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { 
	        // Output the value of k1,v1
			System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
			logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
			// k1 represents the beginning offset of each row of data, and v1 represents the content of each row 
			// Cut each row of data and cut out the words
			String[] words = v1.toString().split(" "); 
			// Encapsulate words into < K2, V2 >
			String key = words[0]; if("5".equals(key)){
			//Break up the inclined key and divide it into 10 parts
	                key = "5"+"_"+random.nextInt(10);
	            }
			Text k2 = new Text(key);
			LongWritable v2 = new LongWritable(1L); //Write out < K2, V2 >
			context.write(k2,v2);
		}
	}
	
	/**
	* Reduce stage 
	*/
    public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
        Logger logger = LoggerFactory.getLogger(MyReducer.class);
        
        /**
		* For < K2, {V2...} > The data is accumulated and summed, and finally the data is converted into K3 and v3
        */
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s, Context co
throws IOException, InterruptedException { 
			// Create a sum variable to save the sum of v2s
			long sum = 0L;
			// Accumulate and sum the data in v2s
			for(LongWritable v2: v2s){ 
				// Output the value of k2,v2
				//System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+"
                //logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
                sum += v2.get();
				//Time consumed by complex calculation of simulated Reduce 
				if(sum % 200 ==0){
                    Thread.sleep(1);
                }
			}
			
			// Assembly k3,v3
			Text k3 = k2;
			LongWritable v3 = new LongWritable(sum);
			// Value of output k3,v3 
			//System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
			//logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
			// Write the results out
			context.write(k3,v3);
		}
	}

	/**
	* Assembly Job=Map+Reduce 
	*/
	public static void main(String[] args) {
		try {
			if(args.length!=3){ 
				//If the parameters passed are not enough, the program will exit directly 
				System.exit(100);
			}
			
			//Specify the configuration parameters required by the Job
			Configuration conf = new Configuration(); 
			//Create a Job
			Job job = Job.getInstance(conf);
			
			job.setJarByClass(WordCountJobSkew.class);
			// Specify the input path (either a file or a directory) 
			FileInputFormat.setInputPaths(job,new Path(args[0])); 
			// Specify the output path (only one directory that does not exist can be specified) 
			FileOutputFormat.setOutputPath(job,new Path(args[1]));
			// Specifies the code associated with the map 
			job.setMapperClass(MyMapper.class); 
			// Specifies the type of k2 
			job.setMapOutputKeyClass(Text.class); 
			// Specifies the type of v2
			job.setMapOutputValueClass(LongWritable.class);
			
			// Specify the code related to reduce
			job.setReducerClass(MyReducer.class); 
			// Specifies the type of k3
			job.setOutputKeyClass(Text.class);
			// Specifies the type of v3 
			job.setOutputValueClass(LongWritable.class); 
			// Set the number of reduce tasks 
			job.setNumReduceTasks(Integer.parseInt(args[2]));
			
			// Submit job
        	job.waitForCompletion(true);
    	} catch (Exception e){
    		e.printStackTrace();
   		}
	}
}

View results after successful execution

[root@bigdata01 hadoop-3.2.0]# hdfs dfs -cat /out10000000/*
1       100000
5_3     1012097
2       100000
5_4     1011163
3       100000
5_5     1010498
4       100000
5_6 	1010755
5_7 	1010823
5_8 	1012394
6 		100000
7 		100000
5_0     1011274
8       100000
10      100000
5_1     1009972
9       100000
5_2     1011024

However, the final result we obtained at this time is a semi-finished product, which needs to be processed again
In fact, after breaking up the skewed data, we have done a local aggregation. Now we need to develop another mapreduce task to do a global aggregation,
In fact, it is also very simple to obtain the output of the previous map task. After reading the data at the map end, the data is divided by spaces, and then the data in the first column is divided by underscores. After segmentation, the first column is always taken, so that the data with the value of 5 can be restored

Keywords: Big Data

Added by .Stealth on Sun, 16 Jan 2022 04:23:03 +0200