Know what it is before you know why
MapReduce getting started overview
MapReduce definition
MapReduce is a programming framework for distributed computing programs based on Hadoop
Its core function is to combine the business logic code written by the user and its own components into a complete distributed computing program, which runs concurrently on the Hadoop cluster.
MapReduce benefits
- MapReduce is easy to program: the distributed program can be realized by simply implementing some interfaces, and the distributed program can be distributed to a large number of cheap PC machines for execution.
- Good scalability of the machine
- High fault tolerance: the so-called fault tolerance is that when a machine in the system fails, there is a mechanism to assign tasks to a new machine and then continue to run. This process does not need human intervention
- Suitable for off-line processing of PB level data: stable processing of big data
MapReduce disadvantages
- Not good at real-time computing: MapReduce cannot return results in milliseconds or seconds like Mysql
- Not good at flow computing: the input data of flow computing is dynamic and continuous, but the data processed by MR must be static, which is determined by the design
- Not good at DAG Computing: multiple tasks have dependencies, and the input of the latter depends on the output of the former. This kind of live MR is not good at reading and writing too many disks, and the performance is degraded
MapReduce word counting process
By default, the data is segmented according to 128M
In the figure above, there are three processes:
- APPMaster: responsible for the process scheduling and state coordination of the whole program
- MapTask: responsible for the entire data processing process in the Map phase
- ReduceTask: responsible for the entire data processing process in the Reduce phase
MapReduce programming routine
The part we write is basically divided into three parts: Mapper, Reducer and Driver
Map phase
(1) User defined parent class to be inherited by Mapper
(2) Mapper's input format is KV
(3) The business logic in Mapper is written in the map() method [map() calls each KV pair once]
(4) Mapper's output data format is also KV pair
Reducer phase
(1) User defined Reducer inherits its own parent class
(2) The input data type of Reducer corresponds to the output data type of Mapper, which is also KV
(3) Reduce business logic is written in the reduce() method [reduce() calls each KV pair once]
Driver phase
It is equivalent to the client of YARN cluster. After writing the program, you need to submit the whole program to YARN cluster
HELLO WORLD case
demand
The word frequency of words in the statistics file
Mapper code
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 get a row String line = value.toString(); // 2 cutting String[] words = line.split(" "); // 3 output for (String word : words) { k.set(word); context.write(k, v); } } }
Reducer phase
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ int sum; IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 1 cumulative summation sum = 0; for (IntWritable count : values) { sum += count.get(); } // 2 output v.set(sum); context.write(key,v); } }
Driver driver class
public class WordcountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // The input / output path needs to be set according to the actual input / output path on your computer // Note that this is running under win. If you put it on the cluster, the path needs to be changed args = new String[] { "e:/input/inputword", "e:/output1" }; // 1 get configuration information and encapsulate tasks Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2. Set jar loading path job.setJarByClass(WordcountDriver.class); // 3. Set map and reduce classes job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); // 4 set map output job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5 set Reduce output job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6 setting input and output paths FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 submission boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }