hadoop data compression and related algorithms and (MapReduce) code example demonstration

To see which compression algorithms hadoop has

[lqs@bdc112 hadoop-3.1.3]$ bin/hadoop checknative
2021-12-15 16:20:12,342 INFO bzip2.Bzip2Factory: Successfully loaded & initialized native-bzip2 library system-native
2021-12-15 16:20:12,345 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
2021-12-15 16:20:12,348 WARN zstd.ZStandardCompressor: Error loading zstandard native libraries: java.lang.InternalError: Cannot load libzstd.so.1 (libzstd.so.1: Cannot open shared object file: There is no such file or directory)!
2021-12-15 16:20:12,353 WARN erasurecode.ErasureCodeNative: ISA-L support is not available in your platform... using builtin-java codec where applicable
Native library checking:
hadoop:  true /home/lqs/module/hadoop-3.1.3/lib/native/libhadoop.so.1.0.0
zlib:    true /lib64/libz.so.1
zstd  :  false 
snappy:  true /lib64/libsnappy.so.1
lz4:     true revision:10301
bzip2:   true /lib64/libbz2.so.1
openssl: true /lib64/libcrypto.so
ISA-L:   false libhadoop was built without ISA-L support

Introduction to hadoop data compression

Benefits of data compression:
It can reduce disk IO and disk storage space
Compression disadvantages:
Increase CPU overhead

Compression principle:
a. Run intensive jobs and minimize compression
b. For IO intensive operations, use compression as much as possible

Compression coding supported by MR (MapReduce)

Basic comparison of compression algorithms

Compression formatDoes it come with youalgorithmFile extensionsection?Whether slicing is required after conversion to compressed file
DEFLATEYes, direct useDEFLATE.deflatenoNo, use directly
GzipYes, direct useDEFLATE.gznoNo, use directly
bzip2Yes, direct usebzip2.bz2yesNo, use directly
LZONo, installation is requiredLZO.lzoyesYes, you need to create an index and specify the input format
SnappyYes, direct useSnappy.snappynoNo, use directly

Compression performance comparison

compression algorithmOriginal file sizeCompressed file sizeCompression speed (mb/s)Decompression speed (mb/s)
gzip8.31.817.558
bzip28.31.12.49.5
LZO8.32.949.374.6

Select compression criteria

When selecting the compression mode, we should focus on the following considerations: compression / decompression speed, compression ratio (storage size after compression), and whether slicing can be supported after compression. At the same time, we should also consider the network, IO stream, disk, cluster, etc.

Comparison of several compression algorithms

Compressed nameadvantageshortcoming
GzipHigh compression ratioSplit is not supported. The speed of compression or decompression is average
Bzip2High compression rate and support slicingThe speed of compression or decompression is slow
LzoThe compression or decompression speed is relatively fast and supports slicingThe compression rate is average. To support slicing, you need to create an additional index
SnappyThe speed of compression or decompression is fastSlicing is not supported, and the compression rate is general

Selection of compression position

Before MapBetween Map and ReducerAfter Reducer
Normally:
There is no need to display the encoding and decoding method specified. Hadoop automatically checks the file extension. If the extension can match, it will compress and decompress the file with appropriate encoding and decoding methods.

Enterprise development considerations:
a. The amount of data is smaller than the block size, and LZO/Snappy with fast compression and decompression speed is mainly considered
b. The amount of data is very large. Focus on Bzip2 and LZO that support slicing
This mainly refers to the output after MapTask.

How to select in enterprise development:
To reduce network IO between MapTask and ReduceTask. Focus on LZO and Snappy with fast compression and decompression.
See requirements:

a. If the data is permanently saved, consider Bzip2 and Gzip with high compression ratio.
b. If it is used as the next MapReduce input, you need to consider the amount of data and whether cutting is supported.

Configuration of compression parameters

Compression formatCorresponding encoder and decoder
DEFLATEorg.apache.hadoop.io.compress.DefaultCodec
gziporg.apache.hadoop.io.compress.GzipCodec
bzip2org.apache.hadoop.io.compress.BZip2Codec
LZOcom.hadoop.compression.lzo.LzopCodec
Snappyorg.apache.hadoop.io.compress.SnappyCodec

To enable compression configuration, you need to configure the following parameters in hadoop

parameterDefault valuestageproposal
io.compression.codecs
(configured in core-site.xml)
No, this needs to be entered on the command line
hadoop checknative view
Input compressionHadoop uses file extensions to determine whether a codec is supported
mapreduce.map.output.compress
(configured in mapred-site.xml)
falsemapper outputSet this parameter to true to enable compression
mapreduce.map.output.compress.codec
(configured in mapredsite.xml)
org.apache.hadoop.io.
compress.DefaultCodec
mapper outputEnterprises often use LZO or Snappy codecs to compress data at this stage
mapreduce.output.fileoutputformat.compress
(configured in mapred-site.xml)
falsereducer outputSet this parameter to true to enable compression
mapreduce.output.fileoutputformat.compress.codec
(configured in mapred-site.xml)
org.apache.hadoop.io.
compress.DefaultCodec
reducer outputUse standard tools or codecs such as gzip and bzip2

Map output is compressed

Map class

package com.lqs.mapreduce.zip;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author qingSong liu
 * @version 1.0
 * @time 2021/12/8 19:48
 */

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    Text k;

    IntWritable v;

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context) {
        k = k = new Text();
        v = new IntWritable(1);
    }

    /**
     * @param key     Offset
     * @param value   One line of data
     * @param context Context object, transfer, transfer data to reduce
     * @throws IOException          abnormal
     * @throws InterruptedException abnormal
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //1. Get a line and convert the string
        String line = value.toString();

        //2. Cut
        String[] words = line.split(" ");

        //3. Output, iterative write out
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }
}

Reducer class

package com.lqs.mapreduce.zip;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author qingSong liu
 * @version 1.0
 * @time 2021/12/8 19:52
 */

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    int sum = 0;
    /**
     * Encapsulate int type
     */
    IntWritable v;

    @Override
    protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context) {
        v = new IntWritable();
    }

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

        //1. Cumulative summation
        for (IntWritable count : values) {
            sum += count.get();
        }

        //2. Output
        v.set(sum);
        context.write(key, v);

    }
}

Driver class

package com.lqs.mapreduce.zip;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author qingSong liu
 * @version 1.0
 * @time 2021/12/8 19:55
 */

public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        //1. Get configuration information and get job object
        Configuration configuration = new Configuration();

        // Enable map output compression
        configuration.setBoolean("mapreduce.map.output.compress", true);
		// Set the map side output compression mode
        configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

        Job job = Job.getInstance(configuration);

        //2. The jar or Driver class associated with this Driver program
        job.setJarByClass(WordCountDriver.class);

        //3. jar associated with Mapper and Reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //4. Sets the kv type of Mapper output
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5. Set final output kv type
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6. Set input and output paths
        FileInputFormat.setInputPaths(job, new Path("F:\\hdpData\\Input\\inputword"));
        FileOutputFormat.setOutputPath(job, new Path("F:\\hdpData\\Output\\outputWord"));

        //7. Submit job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }

}

The Reducer output uses compression
As in the above case, you only need to change the Driver class

package com.lqs.mapreduce.zip;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author qingSong liu
 * @version 1.0
 * @time 2021/12/8 19:55
 */

public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        //1. Get configuration information and get job object
        Configuration configuration = new Configuration();

//        //Enable map output compression
//        configuration.setBoolean("mapreduce.map.output.compress", true);
 set up map Terminal output compression mode
//        configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

        Job job = Job.getInstance(configuration);

        //2. The jar or Driver class associated with this Driver program
        job.setJarByClass(WordCountDriver.class);

        //3. jar associated with Mapper and Reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //4. Sets the kv type of Mapper output
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5. Set final output kv type
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6. Set input and output paths
        FileInputFormat.setInputPaths(job, new Path("F:\\hdpData\\Input\\inputword"));
        FileOutputFormat.setOutputPath(job, new Path("F:\\hdpData\\Output\\outputWord"));

        // Set the output compression on the reduce side
        FileOutputFormat.setCompressOutput(job, true);
        // Set compression mode
        FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
        // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        //7. Submit job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }

}

The following are common usage methods, which can be configured directly in the configuration of the Driver class. The code is as follows:

package com.lqs.mapreduce.zip;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author qingSong liu
 * @version 1.0
 * @time 2021/12/8 19:55
 */

public class WordCountDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        //1. Get configuration information and get job object
        Configuration configuration = new Configuration();

        //Map
        configuration.set("mapreduce.map.output.compress","true");
        configuration.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.BZip2Codec");

        //Reducer
        configuration.set("mapreduce.output.fileoutputformat.compress","true");
        configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

//        //Enable map output compression
//        configuration.setBoolean("mapreduce.map.output.compress", true);
 set up map Terminal output compression mode
//        configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

        Job job = Job.getInstance(configuration);

        //2. The jar or Driver class associated with this Driver program
        job.setJarByClass(WordCountDriver.class);

        //3. jar associated with Mapper and Reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //4. Sets the kv type of Mapper output
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5. Set final output kv type
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6. Set input and output paths
        FileInputFormat.setInputPaths(job, new Path("F:\\hdpData\\Input\\inputword"));
        FileOutputFormat.setOutputPath(job, new Path("F:\\hdpData\\Output\\outputWord"));

        // Set the output compression on the reduce side
        FileOutputFormat.setCompressOutput(job, true);
        // Set compression mode
        FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
        // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        //7. Submit job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }

}

Keywords: Big Data Hadoop Algorithm Back-end mapreduce

Added by thebighere on Wed, 15 Dec 2021 19:16:34 +0200