RDD blood relationship source code details!

I. Dependency of RDD

RDD dependencies fall into two categories: wide dependencies and narrow dependencies. We can think of it as follows:

  • (1) Narrow dependencies: The partition of each parent RDD is used by at most one partition of the child RDD.
  • (2) Wide dependency: Each parent RDD partition is used by multiple child RDD partitions.

Narrow dependencies can be generated in parallel for each child RDD partition, while wide dependencies require all parent RDD partition shuffle results to be obtained before proceeding.

2. Source code parsing of org.apache.spark.Dependency.scala

Dependency is an abstract class:

// Denpendency.scala
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

It has two subclasses: Narrow Dependency and Shuffle Denpendency, which correspond to narrow and wide dependencies, respectively.

(1) NarrowDependency is also an abstract class

The abstract method getParents is defined, and partitionId is input to obtain all partitions of a partition-dependent parent RDD of child RDD.

// Denpendency.scala
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {  
/**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}

Narrow dependency has two concrete implementations: One to One Dependency and Range Dependency.
(a) OneToOne Dependency means that the partition of child RDD only depends on one partition of parent RDD. The operators that generate OneToOne Dependency are map, filter, flatMap, etc. You can see that the getParents implementation is very simple, that is, pass in a partitionId, and then put the partitionId in the List and pass out.

// Denpendency.scala
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
        (b)RangeDependency finger child RDD partition One-to-one dependence within a certain range parent RDD partition,Mainly used for union. 

// Denpendency.scala
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)  
  extends NarrowDependency[T](rdd) {//inStart denotes the start index of parent RDD, and outStart denotes the start index of child RDD.
  override def getParents(partitionId: Int): List[Int] = {    
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)//Represents the relative position of the current index
    } else {
      Nil
    }
  }
}
(2) Shuffle Dependency refers to breadth dependence

A partition representing a parent RDD is used multiple times by the child RDD partition. It takes shuffle to form.

// Denpendency.scala
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],    
    val partitioner: Partitioner,    
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {  //shuffle is based on PairRDD, so the incoming RDD is of key-value type
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)  //Get shuffleId
  val shuffleId: Int = _rdd.context.newShuffleId()  //Register shuffle information with shuffle Manager
  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

Because shuffle involves network transmission, serializer should be serialized. In order to reduce network transmission, map-side aggregation can be used, controlled by mapSideCombine and aggregator, keyOrdering related to key ordering, partitioner of how the re-output data is partitioned, and some class information. The relationship between Partition s stops abruptly at shuffle, so shuffle is the basis for dividing stage s.

3. The Distinction between Two Dependencies

First, narrow dependencies allow all parent partitions to be computed pipeline d on a cluster node. For example, map and filter operations are performed element by element, while wide dependencies require that all parent partition data be calculated first, and then Shuffle between nodes, similar to MapReduce. Secondly, narrow dependency can restore the failed nodes more effectively, that is, only the parent partition of the lost RDD partition needs to be recalculated, and parallel computing can be performed between different nodes. For a Lineage graph with wide dependency, single node failure may cause all ancestors of the RDD to lose part of the partition, so it needs to be recalculated as a whole.

Keywords: Big Data Scala Spark network Apache

Added by infomamun on Wed, 26 Jun 2019 23:38:31 +0300