Spark performance optimization guide - train of thought

preface

Spark job optimization is actually a general topic, because sometimes it is slow, but the solution is really different. I want to point out all aspects of optimization so that the system can formulate the overall optimization scheme.

Sorting out optimization ideas

How to treat the so-called slow problem? I made a sorting:

theme
resource optimization
Parallelism optimization
Code optimization
Shuffle optimization
Memory optimization
Out of heap memory optimization
Data skew processing
Read / write media optimization

resource optimization

The slowness of most homework is actually caused by the shortage of resources. That's why everything has not changed. When you go to check the problem, you can't find a reason. In other words, adding resources to most homework can solve problems, and even adding resources to some other problems can resist the past. Resource optimization involves two parts, one is the resource optimization of the cluster, and the other is the resource allocation within the job.

Cluster resource optimization

1. Choose Standalone or Yarn as the resource mode. In most cases, we actually see yarn's mode. This is because yarn undertakes the task of uniformly allocating resources throughout the enterprise. In history, most Spark jobs were switched by Hive. Yarn's scheduling method is reasonable, but yarn's allocation actually requires delay overhead. In production, it involves high-frequency scheduling, It is necessary to eliminate the delay of Yan's application resources, and often build a Standalone environment separately.
The upper limit of resource usage is determined in the configuration.
Resource configuration in Standalone:

SPARK_WORKER_CORES
SPARK_WORKER_MEMORY

Resource configuration in Yarn:

yarn.nodemanager.resource.cpu-vcores
yarn.nodemanager.resource.memory-mb

2. Resource allocation when submitting a task. This parameter is specified when submitting

./spark-submit 
--master spark://xxx  
--executor-cores xxx 
--executor-memory xx 

– Executor cores: how many cores are used to start an Executor
– Executor memory: how much memory is used to start an Executor
– total executor cores: how many cores are used to start an Application under the - standalone cluster
– num executor: specifies how many executors – yarn clusters to start an Application
It is recommended that the above parameters be set in the task submission command

In addition, Spark job actually supports dynamic parameter adjustment. It is recommended to temporarily close it when doing job test and open it after it is actually online
spark.dynamicAllocation.enabled

Parallelism optimization

Parallelism optimization actually has two points: one is that the parallelism is too low, resulting in a large number of slow tasks and shuffle overflow; the other is that the parallelism is too high, resulting in a large number of small tasks, huge resource consumption and high scheduling cost. The common control points are the parallel parameters controlling rdd:

sc.textFile(xx,minnumpartitions)
sc.parallelize(xx,num)
sc.markRDD(xx,num)
sc.parallelizePairs(List[Tuple2<String,String>],num)
reduceByKey(xx,num),distinct(xx,num),join(xx,num),groupByKey(xx,num),repartition(num)
spark.default.parallelism Adjust the default parallelism
spark.sql.shuffle.partitions = 200 
Custom partition
SparkStreaming in Direct Mode: with read topic The number of partitions is consistent

Code optimization

This is the basic skill required by Spark programmers, and there are rules to follow:

RDD reuse

We all know that rdd is calculated by a series of kinship operations. The so-called reuse actually means not to repeat the calculation, which will naturally reduce the consumption. Generally, the following two methods are used:
1. Try to reuse the same RDD to avoid creating duplicate RDDS
2. Persist the commonly used RDD S so that the next repeated calculation can directly pull data from the calculated results. The commonly used codes are as follows:

cache() = persist() = persist(StorageLevel.MEMORY_ONLY)
persist Policy enumeration:
	MEMORY_ONLY
	MEMORY_AND_DISK
	MEMORY_ONLY_SER
	MEMORY_AND_DISK_SER

Avoid using shuffle operators as much as possible

Shuffle itself brings huge consumption. In fact, we try not to shuffle. The common means is to use the operator of map class + broadcast variable instead of join

There are pre aggregation operations on the map side

Using map side pre aggregation has the following benefits
1. Reduce the amount of shuffle data on the map side
2. Reduce the amount of data read by the reduce end
3. Reduce the aggregation times at the reduce end
However, it is also necessary to ensure the consistency of calculation results. Common operators:

	reduceByKey
	aggreagateByKey
	combineByKey

Using high-performance operators

When saving data or inserting data into the database, you can do the following:
1. Use mapPartition instead of map, so that the child can be written in batches according to the partition, reducing the number of jdbc links
2.foreachPartition replaces foreach, which is also a batch operation
3. For processing a large number of small files, you can first use coalesce to reduce partitions
4. After filtering a large amount of data, consider using coalesce to reduce partitions
5. repartition can be used to increase partitions for data with a large amount of data and few partitions
6. Use reduceByKey instead of groupByKey

Using broadcast variables

When the Executor side uses variables on the Driver side, broadcast variables can be used to reduce the memory occupation of the Executor side
If broadcast variables are not used, there will be as many Driver side variable copies as there are task s in each Executor
be careful
1. The memory on the executor side needs to be sufficient
2. Broadcast variables cannot be modified in the Executor

Using Kryo serialization

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");//Set serialization mode
conf.registerKryoClasses(new Class[]{_KryoBean.class});//Register classes that use kryo serialization

1.RDD [custom type]
2. Data persistence: storagelevel MEMORY_ ADK_ DISK_ SER
3. Send between task nodes
Note: if a class uses java serialization, it cannot use kryo serialization

Optimize data structure

In fact, the optimization of data structure is to reduce storage. For example, the spaces of int types 0 and 1 and string "0" and "1" are very different, and the cost increases at the same time when serializing transmission. Therefore, the definition of data structure also has the following suggestions:
1. Try to use native data types instead of strings
2. Try to use string instead of object
3. Try to use array instead of Map set

In short, the level of code optimization is actually the direct embodiment of performance. The goal is to be efficient, reduce memory usage, reduce shuffle, reduce data transmission between nodes, and achieve the desired results in the optimal way.

Shuffle optimization

The shuffle mentioned here means that we can make some adjustments during the shuffle process. We need to track the metadata of the shuffle to make adjustments:

spark.reducer.maxSizeInFlight  48M shuffle Cache of pull data at one time
spark.shuffle.io.maxRetries 3 :  shuffle Pull data task Failed retries
spark.shuffle.io.retryWait 5s :  shuffle Pull data task Retry wait interval
spark.shuffle.sort.bypassMergeThreshold :  200 ,bypass One of the conditions for the mechanism to turn on, the other is used bypass with the understanding that map End cannot have prepolymerization

Memory optimization

Spark has always been known as memory computing, but in fact, when the data volume is large, memory becomes a burden, and the optimal use of memory can be achieved. On the contrary, disk will be used for data exchange, and the overall performance will be reduced. The memory allocation has the following principles:
1. The task runs with more memory to reduce disk overflow
2. Reasonably adjust Spark memory distribution
Static memory distribution – the past usage mode leads to unreasonable use of memory, resulting in a great degree of waste
Unified memory distribution – 2 After X, the purpose is to unify the allocation of memory
3. Out of heap memory adjustment spark executor. memoryOverhead=2048M
Examples are as follows:

Total 300 M Reservation will be allocated according to the following configuration
			(total-300M) * 0.6 -- spark.memory.fraction
						 0.5 : RDD Cache and broadcast variables  --spark.memory.storageFraction	
						 0.5 :  shuffle Aggregate memory
			(total-300M) * 0.4 	task Running memory

Data skew processing

There are the following scenarios for data skew:
MapReduce: the amount of data processed by the reduce task is larger than that of other tasks – due to shuffle
Hive: there are many corresponding same key s under a column in hive. This hive table has data skew
Spark: the amount of data in one partition of Spark rdd is larger than that in other partitions – shuffle leads to

resolvent

1.use Hive ETL Pretreatment
2.Filter a few tilted key
3.Increase parallelism
	Scene: different key More partitions and fewer partitions can directly increase the degree of parallelism
4.Double polymerization
	Scene: same key More, fewer partitions
	Solution: put key Scatter (randomly prefix), aggregate, de prefix, and then aggregate
5.use map join replace reduce join 
	Scene: two RDD want join, One RDD Large, data tilt, one RDD Small
	Solution: you can consider directly transferring the small RDD Recycle the broadcast and tilt the data RDD Direct use map Operator operation of class
6.Find the tilt key ,Spin off join
	Scene: two RDD want join,One RDD Large, small key There is data skew, another RDD Small, but the fifth option cannot be adopted
	Solution: find the tilt key,Divided into inclined RDD And non inclined RDD,tilt RDD In, a random prefix and an expansion process are added, which is normal RDD normal join,Final results union Together
7.Use random prefix and capacity expansion RDD conduct join
	Scene: two RDD want join,One RDD Large, large key There is data skew, another RDD Small, unable to adopt the fifth solution
	Solution: direct to RDD Prefix and expand randomly	

Storage media optimization

For spark memory computing, stored rpc requests are often the bottleneck. In spark jobs, the goal is to optimize the reading speed. We store data according to different reading and writing frequencies. In our production practice, data is directly cache d in memory, using alluxio. The optimization of storage is summarized as follows:

	StoragePolicies
		Storage type
			ARCHIVE
			DISK
			SSD
			RAM_DISK
		Storage strategy
			hot
			Cold
			Warm

Resource isolation

We know a lot about this in production, that is, we need to find hot nodes. The common problem is that the task on a node is always very slow. At this time, we need to doubt the machine. Hot data, long tasks and high io will affect it. At this time, we can consider some resource isolation schemes, and yarn's node labels for marking, Achieve the gray mechanism at the big data level.

yarn.scheduler.capacity.root.accessible-node-labels=*
yarn.scheduler.capacity.root.xxxx.accessible-node-labels=label_xx
yarn.scheduler.capacity.root.yyyy.accessible-node-labels=label_xx,label_yy
yarn.scheduler.capacity.root.default.default-node-label-expression=
yarn.scheduler.capacity.root.xxxx.default-node-label-expression=label_xx
yarn.scheduler.capacity.root.yyyy.default-node-label-expression=label_yy

Postscript

In fact, the original RDD level optimization won't be contacted too much. The reason is that this difficulty is relatively large, and the engine level is constantly improving. In the era of SparkSQL, there are many intelligent voice adjustment mechanisms, such as AQE, but the spark kernel level adjustment has wider adaptability, and we look forward to bringing some effective help.

Keywords: Big Data Spark Optimize

Added by jber on Fri, 14 Jan 2022 22:46:36 +0200