1, Overview
1) Advantages and disadvantages of compression
Advantages of compression: to reduce disk IO and disk storage space.
Disadvantages of compression: increase CPU overhead.
2) Compression principle
(1) Operation intensive jobs use less compression
(2) IO intensive Job, multi-purpose compression
2, MR supported compression coding
1) Comparison and introduction of compression algorithms
Compression format | Hadoop comes with | algorithm | File extension | Can I slice | After changing to compressed format, does the original program need to be modified |
---|---|---|---|---|---|
DEFLATE | Yes, direct use | DEFLATE | .deflate | no | Like text processing, it does not need to be modified |
Gzip | Yes, direct use | DEFLATE | .gz | no | Like text processing, it does not need to be modified |
bzip2 | Yes, direct use | bzip2 | .bz2 | yes | Like text processing, it does not need to be modified |
LZO | No, installation is required | LZO | .lzo | yes | You need to create an index and specify the input format |
Snappy | Yes, direct use | Snappy | .snappy | no | Like text processing, it does not need to be modified |
3, Compression mode selection
When selecting the compression method, the following key factors shall be considered: compression / decompression speed, compression ratio (storage size after compression), and whether slicing can be supported after compression.
Gzip compression
Advantages: high compression ratio;
Disadvantages: Split is not supported; General compression / decompression speed;
Bzip2 compression
Advantages: high compression ratio; Support Split;
Disadvantages: slow compression / decompression speed.
Lzo compression
Advantages: fast compression / decompression speed; Support Split;
Disadvantages: average compression ratio; Additional indexes need to be created to support slicing.
Snappy compression (most commonly used)
Advantages: fast compression and decompression speed;
Disadvantages: Split is not supported; Average compression ratio;
4, Compression position selection
Compression can be enabled at any stage of MapReduce.
5, Compression parameter configuration
1) In order to support a variety of compression / decompression algorithms, Hadoop introduces CODEC / decoder
Compression format | Corresponding encoder / decoder |
---|---|
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
2) To enable compression in Hadoop, you can configure the following parameters
parameter | Default value | stage | proposal |
---|---|---|---|
io.compression.codecs (configured in core-site.xml) | No, you need to enter hadoop checknative on the command line | Input compression | Hadoop uses file extensions to determine whether a codec is supported |
mapreduce.map.output.compress (configured in mapred-site.xml) | false | mapper output | Set this parameter to true to enable compression |
mapreduce.map.output.compress.codec (configured in mapred-site.xml) | org.apache.hadoop.io.compress.DefaultCodec | mapper output | Enterprises often use LZO or Snappy codecs to compress data at this stage |
mapreduce.output.fileoutputformat.compress (configured in mapred-site.xml) | false | reducer output | Set this parameter to true to enable compression |
mapreduce.output.fileoutputformat.compress.codec (configured in mapred-site.xml) | org.apache.hadoop.io.compress.DefaultCodec | reducer output | Use standard tools or codecs such as gzip and bzip2 |
6, Compression practice case
6.1 Map output is compressed
Even if your MapReduce input and output files are uncompressed files, you can still compress the intermediate result output of the Map task because it needs to be written on the hard disk and transmitted to the Reduce node through the network. Compressing it can improve a lot of performance. For these tasks, just set two properties. Let's see how to set the code.
1) The supported compression formats of Hadoop source code provided to you are BZip2Codec and DefaultCodec
package com.atguigu.mapreduce.compress; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // Enable map output compression conf.setBoolean("mapreduce.map.output.compress", true); // Set the map side output compression mode conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class,CompressionCodec.class); Job job = Job.getInstance(conf); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
2) Mapper remains unchanged
package com.atguigu.mapreduce.compress; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { // 1 get a row String line = value.toString(); // 2 cutting String[] words = line.split(" "); // 3 cycle write for(String word:words){ k.set(word); context.write(k, v); } } }
3) Reducer remains unchanged
package com.atguigu.mapreduce.compress; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; // 1 Summary for(IntWritable value:values){ sum += value.get(); } v.set(sum); // 2 output context.write(key, v); } }
6.2 the reduce output is compressed
Case processing based on WordCount.
1) Modify drive
package com.atguigu.mapreduce.compress; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.Lz4Codec; import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // Set the output compression on the reduce side FileOutputFormat.setCompressOutput(job, true); // Set compression mode FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }
2) Mapper and Reducer remain unchanged