Let's analyze a scenario:
Suppose we have a file with 1000W pieces of data. The values in it are mainly numbers, 1,2,3,4,5,6,7,8,9,10. We want to count the number of occurrences of each number
In fact, in private, we know the general situation of this data. Among the 1000w data, there are about 910w data with a value of 5, and there are only 90w of the remaining 9 numbers, which means that the data with a value of 5 is relatively concentrated, or the data with a value of 5 belongs to inclined data. In this whole data, It accounts for much more than other data.
Assuming that the 1000W data file has three block s, three inputsplts will be generated, and finally three Map tasks will be generated. By default, there is only one reduce task, so all data will be processed by this reduce task. In this way, the reduction pressure must be great, and a lot of time is consumed here
According to our previous analysis, we can increase the number of reduce tasks. We adjust the number of reduce tasks to 10. At this time, 1000w data will be processed in parallel by these 10 reduce tasks. At this time, the efficiency will certainly be improved, but finally we will find that the performance improvement is limited and does not achieve qualitative improvement, Then why?
Let's analyze. We just said that there are 910w pieces of data with a value of 5 in our data, which accounts for 90% of the whole data. Then 90% of the data will be processed by a reduce task. Here, let's assume that it is processed by reduce5. The execution of reduce5 is relatively slow, and other reduce tasks have been executed for a long time, but it has not finished yet, Because the amount of data processed in reduce5 is too different from the amount of data processed in other reduce, reduce5 finally lags behind. The time consumed in the execution of mapreduce tasks is counted until the last reduce task is completed, so it is useless even if other reduce tasks are completed long ago. The whole mapreduce task is not completed.
What about this situation?
At this time, simply increasing the number of reduce tasks does not play much role. If you start too many tasks, it may be counterproductive. In fact, the best way at this time is to break up the data with the value of 5 as much as possible, and allocate the inclined data to other reduce tasks for calculation, so as to fundamentally solve the problem.
This is a data skew problem we want to analyze
When the MapReduce program is executed, most of the Reduce nodes are executed, but one or several Reduce nodes run very slowly, resulting in a long processing time of the whole program
The specific performance is: the Ruduce stage has been stuck
According to the analysis just now, there are two schemes:
- Increase the number of reduce tasks: This is a temporary solution but not a permanent solution. For data with less severe skew, the problem can be solved. For data with severe skew, the fundamental problem can not be solved
- Break up the skewed data: this can cure the severely skewed data.
Let's focus on the implementation of scheme 2
In fact, the idea is very simple. Here we deal with it like this. Divide the data of 5 into 10 parts, and spell a random number of 0 ~ 9 behind the value 5.
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Random; public class WordCountJobSkewRandKey { /** * Map stage */ public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{ Logger logger = LoggerFactory.getLogger(MyMapper.class); Random random = new Random(); /** * You need to implement the map function * This map function can receive < K1, V1 > and generate < K2, V2 > */ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // Output the value of k1,v1 System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); // k1 represents the beginning offset of each row of data, and v1 represents the content of each row // Cut each row of data and cut out the words String[] words = v1.toString().split(" "); // Encapsulate words into < K2, V2 > String key = words[0]; if("5".equals(key)){ //Break up the inclined key and divide it into 10 parts key = "5"+"_"+random.nextInt(10); } Text k2 = new Text(key); LongWritable v2 = new LongWritable(1L); //Write out < K2, V2 > context.write(k2,v2); } } /** * Reduce stage */ public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{ Logger logger = LoggerFactory.getLogger(MyReducer.class); /** * For < K2, {V2...} > The data is accumulated and summed, and finally the data is converted into K3 and v3 */ @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Context co throws IOException, InterruptedException { // Create a sum variable to save the sum of v2s long sum = 0L; // Accumulate and sum the data in v2s for(LongWritable v2: v2s){ // Output the value of k2,v2 //System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+" //logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">"); sum += v2.get(); //Time consumed by complex calculation of simulated Reduce if(sum % 200 ==0){ Thread.sleep(1); } } // Assembly k3,v3 Text k3 = k2; LongWritable v3 = new LongWritable(sum); // Value of output k3,v3 //System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">"); //logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">"); // Write the results out context.write(k3,v3); } } /** * Assembly Job=Map+Reduce */ public static void main(String[] args) { try { if(args.length!=3){ //If the parameters passed are not enough, the program will exit directly System.exit(100); } //Specify the configuration parameters required by the Job Configuration conf = new Configuration(); //Create a Job Job job = Job.getInstance(conf); job.setJarByClass(WordCountJobSkew.class); // Specify the input path (either a file or a directory) FileInputFormat.setInputPaths(job,new Path(args[0])); // Specify the output path (only one directory that does not exist can be specified) FileOutputFormat.setOutputPath(job,new Path(args[1])); // Specifies the code associated with the map job.setMapperClass(MyMapper.class); // Specifies the type of k2 job.setMapOutputKeyClass(Text.class); // Specifies the type of v2 job.setMapOutputValueClass(LongWritable.class); // Specify the code related to reduce job.setReducerClass(MyReducer.class); // Specifies the type of k3 job.setOutputKeyClass(Text.class); // Specifies the type of v3 job.setOutputValueClass(LongWritable.class); // Set the number of reduce tasks job.setNumReduceTasks(Integer.parseInt(args[2])); // Submit job job.waitForCompletion(true); } catch (Exception e){ e.printStackTrace(); } } }
View results after successful execution
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -cat /out10000000/* 1 100000 5_3 1012097 2 100000 5_4 1011163 3 100000 5_5 1010498 4 100000 5_6 1010755 5_7 1010823 5_8 1012394 6 100000 7 100000 5_0 1011274 8 100000 10 100000 5_1 1009972 9 100000 5_2 1011024
However, the final result we obtained at this time is a semi-finished product, which needs to be processed again
In fact, after breaking up the skewed data, we have done a local aggregation. Now we need to develop another mapreduce task to do a global aggregation,
In fact, it is also very simple to obtain the output of the previous map task. After reading the data at the map end, the data is divided by spaces, and then the data in the first column is divided by underscores. After segmentation, the first column is always taken, so that the data with the value of 5 can be restored