Hadoop data compression

1, Overview

1) Advantages and disadvantages of compression

Advantages of compression: to reduce disk IO and disk storage space.
Disadvantages of compression: increase CPU overhead.

2) Compression principle

(1) Operation intensive jobs use less compression
(2) IO intensive Job, multi-purpose compression

2, MR supported compression coding

1) Comparison and introduction of compression algorithms

Compression formatHadoop comes withalgorithmFile extensionCan I sliceAfter changing to compressed format, does the original program need to be modified
DEFLATEYes, direct useDEFLATE.deflatenoLike text processing, it does not need to be modified
GzipYes, direct useDEFLATE.gznoLike text processing, it does not need to be modified
bzip2Yes, direct usebzip2.bz2yesLike text processing, it does not need to be modified
LZONo, installation is requiredLZO.lzoyesYou need to create an index and specify the input format
SnappyYes, direct useSnappy.snappynoLike text processing, it does not need to be modified

3, Compression mode selection

When selecting the compression method, the following key factors shall be considered: compression / decompression speed, compression ratio (storage size after compression), and whether slicing can be supported after compression.

Gzip compression

Advantages: high compression ratio;
Disadvantages: Split is not supported; General compression / decompression speed;

Bzip2 compression

Advantages: high compression ratio; Support Split;
Disadvantages: slow compression / decompression speed.

Lzo compression

Advantages: fast compression / decompression speed; Support Split;
Disadvantages: average compression ratio; Additional indexes need to be created to support slicing.

Snappy compression (most commonly used)

Advantages: fast compression and decompression speed;
Disadvantages: Split is not supported; Average compression ratio;

4, Compression position selection

Compression can be enabled at any stage of MapReduce.

5, Compression parameter configuration

1) In order to support a variety of compression / decompression algorithms, Hadoop introduces CODEC / decoder

Compression formatCorresponding encoder / decoder
DEFLATEorg.apache.hadoop.io.compress.DefaultCodec
gziporg.apache.hadoop.io.compress.GzipCodec
bzip2org.apache.hadoop.io.compress.BZip2Codec
LZOcom.hadoop.compression.lzo.LzopCodec
Snappyorg.apache.hadoop.io.compress.SnappyCodec

2) To enable compression in Hadoop, you can configure the following parameters

parameterDefault valuestageproposal
io.compression.codecs (configured in core-site.xml)No, you need to enter hadoop checknative on the command lineInput compressionHadoop uses file extensions to determine whether a codec is supported
mapreduce.map.output.compress (configured in mapred-site.xml)falsemapper outputSet this parameter to true to enable compression
mapreduce.map.output.compress.codec (configured in mapred-site.xml)org.apache.hadoop.io.compress.DefaultCodecmapper outputEnterprises often use LZO or Snappy codecs to compress data at this stage
mapreduce.output.fileoutputformat.compress (configured in mapred-site.xml)falsereducer outputSet this parameter to true to enable compression
mapreduce.output.fileoutputformat.compress.codec (configured in mapred-site.xml)org.apache.hadoop.io.compress.DefaultCodecreducer outputUse standard tools or codecs such as gzip and bzip2

6, Compression practice case

6.1 Map output is compressed

Even if your MapReduce input and output files are uncompressed files, you can still compress the intermediate result output of the Map task because it needs to be written on the hard disk and transmitted to the Reduce node through the network. Compressing it can improve a lot of performance. For these tasks, just set two properties. Let's see how to set the code.

1) The supported compression formats of Hadoop source code provided to you are BZip2Codec and DefaultCodec

package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;	
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

		Configuration conf = new Configuration();

		// Enable map output compression
		conf.setBoolean("mapreduce.map.output.compress", true);

		// Set the map side output compression mode
		conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class,CompressionCodec.class);

		Job job = Job.getInstance(conf);

		job.setJarByClass(WordCountDriver.class);

		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		boolean result = job.waitForCompletion(true);

		System.exit(result ? 0 : 1);
	}
}

2) Mapper remains unchanged

package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

	Text k = new Text();
	IntWritable v = new IntWritable(1);

	@Override
	protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {

		// 1 get a row
		String line = value.toString();

		// 2 cutting
		String[] words = line.split(" ");

		// 3 cycle write
		for(String word:words){
			k.set(word);
			context.write(k, v);
		}
	}
}

3) Reducer remains unchanged

package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

	IntWritable v = new IntWritable();

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Context context) throws IOException, InterruptedException {
		
		int sum = 0;

		// 1 Summary
		for(IntWritable value:values){
			sum += value.get();
		}
		
         v.set(sum);

         // 2 output
		context.write(key, v);
	}
}

6.2 the reduce output is compressed

Case processing based on WordCount.
1) Modify drive

package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		
		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(WordCountDriver.class);
		
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		// Set the output compression on the reduce side
		FileOutputFormat.setCompressOutput(job, true);

		// Set compression mode
	    FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); 
//	    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); 
//	    FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); 
	    
		boolean result = job.waitForCompletion(true);
		
		System.exit(result?0:1);
	}
}

2) Mapper and Reducer remain unchanged

Keywords: Java Big Data Hadoop

Added by madhukar_garg on Mon, 27 Dec 2021 09:56:33 +0200