I have to say that the process of reading the source code is extremely painful. Dream Car town building ~!

Although the whole MapReduce process only has the Map stage and Reduce stage, think carefully about what to do in the Map stage? This stage should include data input, data calculation and data output. The division of these three steps is very consistent with thinking habits.
Starting with the hello world case of big data development, the following is a map program of word count case
public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Text k = new Text(); private IntWritable v = new IntWritable(1); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1. Get a row of data String str = value.toString(); //2. Separate words according to the space String[] words = str.split(" "); //3. Traverse the set and assemble it into the form of (word,one) for (String word : words) { this.k.set(word); context.write(k, v); } } }
The custom WcMapper class inherits the Mapper class and rewrites the map() method. In this method, we write the corresponding business logic according to the requirements. Enter Mapper class to view.
This class does not contain many methods, and it is more in line with the thinking law of seeing the name and knowing the meaning. You can roughly understand its specific functions according to the method auxiliary annotation. The header of this class also includes a descriptive comment on the class, which roughly means what is done in the map stage. Try to simply translate the core content
- Map input key / value pairs to a set of intermediate key / value pairs.
- Mapping is a single task that transforms an input record into an intermediate record. The converted intermediate record does not need to be of the same type as the input record. A given input pair can be mapped to zero or more output pairs.
- The Hadoop map reduce framework generates a mapping task for each InputSplit generated by InputFormat for the job. Mapper can be implemented through jobcontext Getconfiguration() accesses the Configuration of the job.
- The framework first calls setup(Mapper.Context), and then calls map(Object, Object, Mapper.Context) for each key / value pair in InputSplit. Finally, call cleanup(Mapper.Context).
- All intermediate values associated with a given output key are then grouped by the framework and passed to Reducer to determine the final output. Users can control sorting and grouping by specifying two key RawComparator classes.
- Mapper output is partitioned by Reducer. Users can control which keys (and records) go to which Reducer by implementing a custom Partitioner.
- Users can choose to use job Setcombinerclass (class) specifies combiner to perform local aggregation of intermediate output, which helps to reduce the amount of data transferred from Mapper to Reducer.
- The application can specify whether and how to compress intermediate output and which compressioncodecs to use through Configuration.
If the job has a zero reduction, Mapper's output will be written directly to OutputFormat without pressing the key to sort.
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {} protected void setup(Context context ) throws IOException, InterruptedException {} protected void map(KEYIN key, VALUEIN value,Context context) throws IOException, InterruptedException {context.write((KEYOUT) key, (VALUEOUT) value);} protected void cleanup(Context context) throws IOException, InterruptedException {} public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } }
When you see the run (context) method, you feel more like a framework. In this method, you call setup(context) and cleanup(context) once, while for the map method, you call once for each key / value pair in the input split.
Even if this class sees this, it is over, and the others can't see anything. Enter the MapTask class, which contains a large number of core business logic methods. This class will be called run method by Yan reflection to instantiate MapTask. Directly enter the run method and delete some non core code, which is refreshing.
@Override public void run(final JobConf job, final TaskUmbilicalProtocol umbilical){ this.umbilical = umbilical; if (isMapTask()) { // The number of reduce is 0, so the whole task has only map stage if (conf.getNumReduceTasks() == 0) { mapPhase = getProgress().addPhase("map", 1.0f); } else { // If there is a reduce phase, the progress will be allocated mapPhase = getProgress().addPhase("map", 0.667f); // The sorting phase makes the subsequent reduce phase easier to complete. You only need to pull the file once to reduce I/O sortPhase = getProgress().addPhase("sort", 0.333f); } } TaskReporter reporter = startReporter(umbilical); boolean useNewApi = job.getUseNewMapper(); initialize(job, getJobID(), reporter, useNewApi); // check if it is a cleanupJobTask ......... if (useNewApi) { // Selection of old and new API s // Enter this method runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter); }
Continue to the runnewmapper (job, splitmetainfo, mathematical, reporter) method. It's a little long inside. It's hard to find the key point. Common sense: put important things in try catch!! So first look at the try catch block.
private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job,final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical,TaskReporter reporter) { ............Delete it first and skip it............ // Go through the method name with human thinking try { // 1. Initialize input stream input.initialize(split, mapperContext); // 2. Intuitively call the run() method and eventually call the custom map method mapper.run(mapperContext); // 3. Complete the map calculation phase mapPhase.complete(); // 4. Start of sorting stage setPhase(TaskStatus.Phase.SORT); // 5. Update or transfer of status information (guessed) statusUpdate(umbilical); // 6. Close input stream input.close(); input = null; // 7. Enter the out phase and output map data output.close(mapperContext); output = null; } finally { // Quiet, do something silently closeQuietly(input); closeQuietly(output, mapperContext); } }
In this way, the whole idea is very smooth. Looking back at the deleted code fragments, the original annotation information is also very easy to understand.
// 1. make a task context so we can get the classes encapsulates the context of the task. There is configuration in the job // Common sense: in the framework, context objects are indispensable. Some information shuttles back and forth in the business line. Encapsulated in the context, it can be obtained at any time // Recall: the client uploads the task to the resource layer, including Jar package, configuration file and slice. The container can instantiate the job org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(),reporter); // 2. make a mapper: instantiate a mapper object according to taskContext + job org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) // It is the WCMapper object written by yourself, which corresponds to the mapper below run(mapperContext). Silky ~! ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // 3. make the input format: input format. Why do you need this? split is a piece of data, so it's necessary to read a piece of data org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) // When writing job configuration, you can specify InputFormat. The default is TextInputFormat ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // 4. rebuild the input split. Each map should determine which split to move to org.apache.hadoop.mapreduce.InputSplit split = null; // Each mapper should find out which split to read split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); // 5. input = split + inputFormat (the parent class is RecordReader) org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> // So input has the ability to read out records in split (split, inputFormat, reporter, taskContext); // Summary: steps 3, 4 and 5 are to find a way to read a piece of data in Split //--------------------NewTrackingRecordReader() begin------------------------------- private final org.apache.hadoop.mapreduce.RecordReader<K,V> real; NewTrackingRecordReader(...){ ..... // Call the createRecordReader of TextInputFormat and return a LineRecordReader object // So input is a LineRecordReader object this.real = inputFormat.createRecordReader(split, taskContext); ..... } //--------------------NewTrackingRecordReader() end-------------------------------- ...........Skip the output section first........... // 6. The above is the task context, and here is the map context, including input, output and split org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split); // 7. It also wraps a layer of mapercontext for the map context, including input, output and split // This is the input parameter of run (context) in Mapper class ~!! org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( mapContext); //-------------Mapper::run(Context context) begin ---------------------------------- public void run(Context context) throws IOException, InterruptedException { setup(context); try { // Judge whether there is next data from the context of mapper while (context.nextKeyValue()) { // Take out the next piece of data in the slice for calculation map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } // How to get a piece of data from the context information of map? The ~ ~ layers of encapsulation returned by LineRecordReader are amazing //-------------Mapper::run(Context context) end ----------------------------------
Now you can look back at the input in the try block Initialize (split, mapercontext) method. Go in and see the details of the method implementation. Data will be cut in the HDFS layer, so how can it be calculated correctly? There is a corresponding implementation code here. The code is not complex, but there are small highlights.
Only retain the core business logic, or delete what should be deleted, clear and refreshing, and read the source code happily~
// Remember that this is the initialization method of Recordreader public void initialize(InputSplit genericSplit,TaskAttemptContext context) { // map task calculation is slice oriented. First get the slice, and then get the beginning of the slice start = split.getStart(); // Start + slice size to get the end end = start + split.getLength(); // Get the file path from the slice final Path file = split.getPath(); // open the file and seek to the start of the split // Gets an object of the file system final FileSystem fs = file.getFileSystem(job); //Open the file and you will get a file oriented input stream // Each map is executed in parallel, so it will not be read from the file header, so it needs to be matched with a seek() method fileIn = fs.open(file); if (...) { ...... } else { // Each map will seek to the position of its slice offset and start reading data fileIn.seek(start); // SplitLineReader: a line record reader in a slice. The name is object-oriented at first sight in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes); filePosition = fileIn; } // If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. // If this is not the first split, we always discard the first record. // Because we always read an extra line in the next() method (except for the last split). // This prevents the calculation error caused by the split of hello into he llo if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; }
in.readLine(new Text(), 0, maxBytesToConsume(start)) this method gives a read line of data to a Text object, and the return value is a numeric value of type int, indicating how many bytes have been read.
Note that the method passes a parameter to the new Text() object. When the method is executed, this object will be recycled by GC because there is no reference. So since there is no reference, what is it doing?
Recall: slice is a logical segmentation. The default size is the size of a block block. If a split is less than a block, the block will be cut into multiple parts. If it is NIMA's size, the two slices of hello are split into two parts of he llo, which will lead to calculation errors. At this time, read down one more line. Alas, this problem will be solved.
Back to: computing moves to data. What if the extra read line is at other nodes? A: transfer this line of data without moving the calculation.
In fact, you can see here that in the input link of the whole Map, the LineRecordReader is really engaged in reading data, and the key is the line oriented byte offset. The following code has appeared many times
public void run(Context context) throws IOException, InterruptedException { setup(context); try { // Judge whether there is next data from the context of mapper while (context.nextKeyValue()) { // Take out the next piece of data in the slice for calculation map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } }
By reading the source code above, we already know that the Context passed here is actually a MapContextImpl object, Context The nextkeyvalue() method is calling the LineRecordReader::nextKeyValue() method. Inside this method: the key value will be assigned, and the boolean value will be returned, indicating whether the assignment is successful. Overall, it feels very silky~
Conclusion: (I can understand it myself ~)
MapTask: input -> map -> output
intput: (Split + format) the record reader object returned from the input format class
TextInputFormat - > LineRecordReader:
Split has three dimensions: file, offset and length
init(): in = fs.open(file). seek. Except for the first slice, I will read one more line down.
nextKeyValue():
1. Read the key and value assignment corresponding to a record in the data;
2. Returns a Boolean value;
getCurrentKey()
getCurrentValue()