Hadoop principle and tuning

Hadoop principle

1. HDFS write process

1.client adopt Distributed FileSystem Module direction NameNode Request to upload files, NameNode It will check whether the target file exists, whether the path is correct, and whether the user has permission.
2.NameNode towards client Return whether you can upload or not, and return three items at the same time client Near DataNode Node, recorded as DN1/DN2/DN3. 
3.client adopt DFSOutPutStream Perform data cutting.
4.use chunk Verification information(512bytes Verification information+4bytes Check head)plus Data The data information forms a 64 KB of packet data block(Binary). 
5.with packet Transfer files as units, load into data queue Yes.
6.DN1 receive packet After, there will be two data flows, one is: DN1-ack queue,Second is DN1-DN2-DN3. 
7.DN1 and DN2/DN3 Establish between pipeline File transfer.
8.After the file is transferred, DN3,DN2 Hui Xiang DN1 Return success flag.
9.DN1 according to DN2/DN3 The return flag determines from ack queue Delete this block or append this block to data queue end.

2.HDFS reading process

1.client adopt Distributed FileSystem Module direction NameNode Request to download files, NameNode It will check whether the target file exists, whether the path is correct, and whether the user has permission.
2.NameNode towards client Returns how many blocks of this file are stored DataNode Come on.
3.client adopt FSDataInputStream instantiation  DFSInputStream,And follow the principle of proximity from DataNode The number of data blocks read on the node packet. 
4.client with packet It is received as a unit. It is first cached locally, then written to the target file, and finally integrated block. 

3.MapReduce workflow

1. Obtain the text path to be processed, the job path, probe the text details, and generate the plan configuration file.
2. Client towards RM Submit the assignment, RM Assign tasks to NM1,NM1 generate MrAppMaster. 
3. MrAppMaster Download resources from the path to the local and apply for running resources NM2/NM3,Transfer file segmentation and related resources to NM2/NM3. 
4. NM2/NM3 generate mapTask Tasks, using RecordReader Read data (default) TextInputFormat). 
5. RecordReader Cut data into K-V Form data to mapper. 
6. mapper After calculation, output the data to OutputCollectre. 
7. OutputCollectre Write data to the ring buffer, which defaults to 100 M,Start from the equator, write data and index in both directions, and write to 80%Post reverse write
8. Press in the ring buffer for data key Value partition sorting (fast sorting)
9. Overflow when the ring buffer reaches the threshold, merge Merge with key Partition and overflow to disk
10. reduceTask Pull data to local partition
11. reduceTask Merge files, merge and sort
12. adopt OutPutformat Output the results to the specified location. default TextOutputFormat

4. job submission process of yarn

1. client towards RM Apply for one application
2. RM return application Resource submission path and ID
3. client Submit resources required for job operation to application route
4. After resource submission client towards RM Apply for operation APPmaster
5. Initialize the user's request as a Task,And put it into the scheduler
6. RM Notify a free NM To collect Task task
7. Should NM Create container container,And generate a MrAppMaster
8. Should NM take job Download resources locally
9. MrAppMaster towards RM Request to run multiple MapTask Task resources
10. RM Will run MapTask Assign tasks to others NM
11. MrAppMaster Responsible to MapTask Operational NM Send program startup script
12. MrAppMaster wait for MapTask After running, report to RM Apply for operation ReduceTask Resources
13. ReduceTask Pull away MapTask The data of the corresponding partition shall be recorded reduce operation
14. After the program runs, MrAppMaster Hui Xiang RM Apply for cancellation

5.Yarn scheduler

yarn has three schedulers:

  • FIFO scheduler: first in first out, single queue, only one task in the queue is executing at the same time.
  • Capacity scheduler: the default scheduler is multi queue. Only one task in the queue is executing at the same time.
  • Fair scheduler: multiple queues. Each queue allocates resources to start tasks according to the size of internal vacancy. Multiple tasks are executed in the queue at the same time, and the parallelism of the queue is greater than the number of queues.
    • Vacancy: the gap between the computing resources obtained by each job in the ideal situation and the computing resources actually obtained. The larger the vacancy, the higher the priority.

tuning

1. HDFS small file archiving

hdfs saves the metadata information of each small file in memory, and a small file occupies 150KB of memory. Generally, the NameNode node uses 128GB memory nodes. Such a memory node can only store 910 million small files, so it is necessary to archive small files and reduce the size of memory metadata.

  • HAR archiving

    - file
    hadoop archive -archiveName dataName.har -p /tmp/data/input /tmp/data/output
    - decompression
    hadoop fs -cp har:///tmp/data/output/dataName.har/* /tmp/data/input
    - matters needing attention
     After archiving, the source file needs to be deleted manually
    
  • Adopt CombineTextInputFormat

    job.setInputFormatClass(CombineTextInputFormat.class)
    CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
    CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
    
  • JVM reuse

    <property>
        <name>mapreduce.job.jvm.numtasks</name>
        <value>10</value> (Increase here, generally 10-20 Between)
        <description>How many tasks to run per jvm,if set to -1 ,there is  no limit</description>
    </property> 
    

2.NameNode heartbeat concurrency configuration

formula: 20*ln Cluster size

3. Principle of erasure code

Erasure code is to change the exponential rise of storage cost caused by HDFS storage copy.

There is a file of 300m. If HDFS2 is used for storage, it is assumed that it is saved as 3 copies, which requires 900m space.

Using HDFS3 for storage, assuming that it is saved as 3 copies, it only takes 500m space (divide 300m files into three copies, one is 100m, add two verification units GT, one is 100m), and even if two copies of data are lost, it can be calculated through the remaining reverse.

4. Hot and cold data separation

HDFS storage type:

Storage typeStorage mode
RAM_DISKMemory image file system, stored in memory
SSDSolid state disk
DISKNormal disk, default storage type
ARCHIVEThere is no specific storage medium, which is mainly used for archiving

HDFS storage policy:

Policy IDPolicy nameReplica distributionexplain
15Lazy_PersistRAM_DISK:1,DISK:n-1One copy is saved in memory and the other is saved on disk
12All_SSDSSD:nAll copies are saved in SSD
10One_SSDSSD:n,DISK:n-1One copy is saved on SSD and the other is saved on disk
7Hot(default)DISK:nAll copies are saved on disk
5WarmDISK:1,ARCHIVE:n-1One copy is kept on disk and the other is archived
2ColdARCHIVE:nAll copies are archived

Common commands:

# View what storage policies are currently available
hdfs storagepolicies -listPolicies

# Sets the specified storage policy for the specified path (data storage directory)
hdfs storagepolicies -setStoragePolicy -path xxx -policy xxx

# Gets the storage policy of the specified path (data storage directory or file)
hdfs storagepolicies -getStoragePolicy -path xxx

# Cancel the storage policy; After executing the change command, the directory or file shall be subject to its parent directory. If it is the root directory, it is HOT
hdfs storagepolicies -unsetStoragePolicy -path xxx

# View the distribution of file blocks
bin/hdfs fsck xxx -files -blocks -locations

# View cluster nodes
hadoop dfsadmin -report

# Common configuration HDFS site XML configuration directory storage type 
<property>
    <name>dfs.datanode.data.dir</name>
    <value>[SSD]file:///opt/module/hadoop-3.1.4/hdfsdata/ssd,[DISK]file:///opt/module/hadoop-3.1.4/hdfsdata/disk</value>
</property>

5.HDFS load balancing

Load balancing between nodes

Enable data equalization command:

start-balancer.sh -threshold 10

For parameter 10, it means that the difference in disk space utilization of each node in the cluster is no more than 10%, which can be adjusted according to the actual situation.
Stop data balancing command:

stop-balancer.sh

Inter disk load balancing

(1) Generate balance plan (we only have one disk and will not generate plan)

hdfs diskbalancer -plan host name

(2) Execute balanced plan

hdfs diskbalancer -execute host name.plan.json

(3) View the execution of the current balancing task

hdfs diskbalancer -query host name

(4) Cancel balancing task

hdfs diskbalancer -cancel host name.plan.json

6.HDFS benchmark

(1) Test HDFS write performance

// Test content: write 10 128M files to HDFS cluster
hadoop jar /opt/module/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-
tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB

(2) Test HDFS read performance

// Test content: read 10 128M files of HDFS cluster
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-
tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB

(3) Delete test generated data

 hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -clean

(4) Evaluate MR with Sort program

// RandomWriter is used to generate random numbers. Each node generates 10 Map tasks, and each Map generates binary random numbers of about 1G size
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar randomwriter random-dat
// Perform Sort program sorting
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar sort random-data sorted-data
// Verify whether the data is really sorted
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data

7. Hadoop LZO compression

  1. Compile jar package

    1.install maven
    
    2.Compilation environment preparation
    yum -y install gcc-c++ lzo-devel zlib-devel autoconf automake libtool
    
    3.compile
    # Download lzo
    wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz
    tar -zxvf lzo-2.10.tar.gz
    cd lzo-2.10
    ./configure -prefix=/usr/local/hadoop/lzo/
    make
    make install
    
    4.compile Hadoop-Lzo
    git clone https://github.com/twitter/hadoop-lzo/archive/master.zip
    unzip master.zip
    cd hadoop-lzo-master
    # Modify hadoop version number
    vi pom.xml
    <hadoop.current.version>3.1.3</hadoop.current.version> 
    
    # Declare two temporary environment variables
    export C_INCLUDE_PATH=/usr/local/hadoop/lzo/include
    export LIBRARY_PATH=/usr/local/hadoop/lzo/lib 
    
    # compile
    mvn package -Dmaven.test.skip=true
    cd target  # Here is the compiled Hadoop LZO Jar package 
    
  2. Configure core site xml

    cp hadoop-lzo-0.4.20.jar hadoop3/share/hadoop/common/
    
    vi /hadoop3/etc/hadoop/core-site.xml
    
    <configuration>
        <property>
            <name>io.compression.codecs</name>
            <value>
                org.apache.hadoop.io.compress.GzipCodec,
                org.apache.hadoop.io.compress.DefaultCodec,
                org.apache.hadoop.io.compress.BZip2Codec,
                org.apache.hadoop.io.compress.SnappyCodec,
                com.hadoop.compression.lzo.LzoCodec,
                com.hadoop.compression.lzo.LzopCodec
            </value>
        </property>
    
        <property>
            <name>io.compression.codec.lzo.class</name>
            <value>com.hadoop.compression.lzo.LzoCodec</value>
        </property>
    </configuration>
    
    
    synchronization jar Package and configuration to other nodes
    
  3. Configure Lzo index

    The slice of Lzo compressed file depends on the index. We need to create an index for Lzo compressed file. If there is no index, there is only one slice of Lzo file

    hadoop jar /opt/module/hadoop3/share/hadoop/common/hadoop-lzo-0.4.20.jar  com.hadoop.compression.lzo.DistributedLzoIndexer /input/bigtable.lzo
    

8.HDFS benchmark

# Write 10 128M files to HDFS cluster
hadoop jar /opt/module/hadoop3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB

# Read 10 128M files from HDFS cluster
hadoop jar /opt/module/hadoop3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB

# Delete test data
hadoop jar /opt/module/hadoop3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -clean

9.MapReduce optimization

# Reduce the number of overflow writes
mapreduce.task.io.sort.mb shuffle The size of the ring buffer can be appropriately increased
mapreduce.task.io.sort.spill.percent shuffle The ring buffer overflow threshold can be appropriately increased

# Increase the number of merges per merge
mapreduce.task.io.sort.factor reduce merge Number of merges

# Adjust MapTask memory, stack, and CPU core count
mapreduce.map.memory.mb You can press 128 M Data increase 1 G Memory regulation
mapreduce.map.java.opts increase MapTask Heap memory size
mapreduce.map.cpu.vcores increase MapTask of CPU Kernel number

# Increase the number of parallel data pulled by ReduceTask
mapreduce.reduce.shuffle.parallelcopiees Can be appropriately improved

# Increase the memory proportion of ReduceTask occupied by Buffer
mapreduce.reduce.shuffle.input.buffer.percent Default 0.7,Can be appropriately improved

# Adjust how much data in the Buffer is written to disk
mapreduce.reduce.shuffle.merge.percent  Default 0.66,Can be appropriately improved

# Adjust the number of ReduceTask memory, stack and CPU cores
mapreduce.reduce.memory.mb You can press 128 M Data increase 1 G Memory regulation
mapreduce.reduce.java.opts increase ReduceTask Heap memory size
mapreduce.reduce.cpu.vcores increase ReduceTask of CPU Kernel number

# Adjust the start time of ReduceTask
mapreduce.job.reduce.slowstart.completedmaps Default when MapTask Complete 50%Will start when ReduceTask

# Set job timeout
mapreduce.task.timeout Default 10 minutes, Task If there is no response, kill the task

10.Yarn optimization

# Configure scheduler, default capacity scheduler
yarn.resourcemanager.scheduler.class

# The number of threads that ResourceManager processes scheduler requests. The default is 50
yarn.resourcemanager.scheduler.client.thread-count     

# Whether to let yarn detect the hardware and configure it by itself. The default is false
yarn.nodemanager.resource.detect-hardware-capabilities  

# Whether to treat virtual cores as CPU cores. The default is false
yarn.nodemanager.resource.count-logical-processors-as-cores     

# Virtual core and physical core multiplier, for example: 4-core 8-thread, this parameter should be set to 2, and the default is 1.0
yarn.nodemanager.resource.pcores-vcores-multiplier        

# NodeManager uses memory, 8G by default
yarn.nodemanager.resource.memory-mb       

# How much memory does the NodeManager reserve for the system
yarn.nodemanager.resource.system-reserved-memory-mb  

# NodeManager uses 8 CPU cores by default
yarn.nodemanager.resource.cpu-vcores   

#  Whether to turn on the physical memory check limit container. It is turned on by default
yarn.nodemanager.pmem-check-enabled       

# Whether to enable the virtual memory check limit container. It is on by default
yarn.nodemanager.vmem-check-enabled        

# Virtual memory physical memory ratio, default 2.1
yarn.nodemanager.vmem-pmem-ratio    

# The minimum memory of the container is 1G by default
yarn.scheduler.minimum-allocation-mb

# The maximum memory of the container is 8G by default
yarn.scheduler.maximum-allocation-mb

# The minimum number of CPU cores of the container is 1 by default
yarn.scheduler.minimum-allocation-vcores

# The maximum number of CPU cores of the container is 4 by default
yarn.scheduler.maximum-allocation-vcores

Keywords: Big Data Hadoop hdfs

Added by kee1108 on Wed, 23 Feb 2022 10:12:13 +0200