Big data - Summary of common operators of Spark RDD

The core of Spark is based on the same abstract Resilient Distributed Datasets (RDD), which enables the components of Spark to integrate seamlessly and complete big data processing in the same application

1. Basic concepts of RDD

RDD is the most important abstract concept provided by spark. It is a special data set with fault-tolerant mechanism. It can be distributed on the nodes of the cluster to carry out various parallel operations in the way of functional operation set

RDD can be understood as a collection of distributed objects, which is essentially a read-only collection of partition records. Each RDD can be divided into multiple partitions, and each partition is a data set fragment. Different partitions of an RDD can be saved to different nodes in the cluster, so parallel computing can be carried out on different nodes in the cluster.

Elasticity:

Storage elasticity: automatic switching between memory and disk

Fault tolerant resilience: data loss can be recovered automatically

Computational flexibility: computational error retry mechanism

Elasticity of slicing: it can be sliced again as needed

Distributed:

Data is stored on different nodes of the big data cluster

Dataset:

RDD encapsulates computing logic and does not save data

Data abstraction:

RDD is an abstract class, which needs the concrete implementation of subclasses

Immutable:

RDD encapsulates the computing logic and cannot be changed. If you want to change, you can only generate a new RDD. The computing logic is encapsulated in the new RDD (directed acyclic graph DAG)

Divisible:

Implement parallel computing

2. Core attributes of RDD

Partition list

There is a partition list in RDD data structure, which is used for parallel computing when executing tasks. It is an important attribute to realize distributed computing

Partition calculation function

Spark uses partition function to calculate each partition during calculation

Dependencies between RDD S

RDD is the encapsulation of computing models. When multiple computing models need to be combined in requirements, it is necessary to establish dependencies on multiple RDDS

Partition manager (optional)

When the data is KV type data, you can customize the partition of the data by setting the divider

Preferred location (optional)

When calculating data, you can select different node locations for calculation according to the status of the calculation node

3. Working principle of RDD

1) start the Yan cluster environment

2) Spark creates scheduling nodes and computing nodes by applying for resources

3) Spark framework divides the computing logic into different tasks according to the requirements

4) the scheduling node sends the task to the corresponding computing node for calculation according to the status of the computing node

It can be seen from the above process that RD is mainly used for logical encapsulation in the whole process, and generates tasks and sends them to the Executor node for calculation

4.RDD creation

There are four ways to create RDD S in Spark:

1) Create RDD from collection (memory)

Spark mainly provides two methods for creating RDD S from collections: parallelize and makeRDD

    val conf = new SparkConf().setMaster("local[*]").setAppName("test1")
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(List(1, 2, 3, 4))
    val rdd2 = sc.makeRDD(List(1, 2, 3, 4))
    println(rdd1.collect().mkString(","))
    println(rdd2.collect().mkString(","))
    sc.stop()

In terms of the underlying code implementation, the makeRDD method is actually the parallelize method

  /** Distribute a local Scala collection to form an RDD.
   *
   * This method is identical to `parallelize`.
   * @param seq Scala collection to distribute
   * @param numSlices number of partitions to divide the collection into
   * @return RDD representing distributed collection
   */
    def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
  }

2) Create RDD from external storage (file)

RDD S are created from data sets of external storage systems, including local file systems and all data sets supported by Hadoop, such as HDFS, HBase, etc

    val conf = new SparkConf().setMaster("local[*]").setAppName("test1")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("file:///D:\\Study\\13_spark\\cha01\\file\\tweets.txt")
    rdd.collect().foreach(println)
    sc.stop()

3) Create from other RDD S

It mainly generates a new RDD after an RDD operation

4) Create RDD directly (new)

RDD is directly constructed by using new, which is generally used by Spark framework itself

5.RDD operator

Value type

1)map

Function: returns a new RDD, which is composed of each input element converted by func function
Requirement: create an RDD and make each element * 2 form a new RDD


object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create RDD
    val listRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5))
    //4. Each number is doubled
    val rsRdd: RDD[Int] = listRdd.map(_ * 2)
    //5. Print
    rsRdd.foreach(println(_))
  }
}

2)mapPartitions

It is similar to map, but runs independently on each fragment of RDD. Therefore, when running on an RDD of type T, the function type of func must be iterator [t] = > iterator [u]. Assuming that there are N elements and M partitions, the map function will be called N times, while mapPartitions will be called M times. One function will process all partitions at once.
Requirement: create an RDD and make each element * 2 form a new RDD

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create RDD
    val listRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5))
    //4. Each number is doubled
    val rsRdd: RDD[Int] = listRdd.mapPartitions(i=>{i.map(_*2)})
    //5. Print
    rsRdd.foreach(println)
  }
}

3)mapPartitionsWithIndex(func)

Similar to mapPartitions, but func takes an integer parameter to represent the index value of the partition. Therefore, when running on an RDD of type T, the function type of func must be (int, interlator [t]) = > iterator [u];
Requirement: create an RDD so that each element forms a tuple with its partition to form a new RDD

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create RDD for 2 partitions
    val listRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), 2)
    //4. Each number is doubled
    val rsRdd: RDD[(Int, Int)] = listRdd.mapPartitionsWithIndex((index, item) => {
      item.map((index, _))
    })
    //5. Print
    rsRdd.foreach(println)
  }
}

4)flatMap(func) 

Each element in the collection is manipulated and then flattened.
Requirement: extract the elements in the set according to '

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create RDD for 2 partitions
     val listRdd: RDD[String] = sc.makeRDD(List("Hello World","Hello Spark"))
    //4. Flattening
    val rsRdd: RDD[String] = listRdd.flatMap(_.split(" "))
    //5. Print
    rsRdd.foreach(println)
  }
}

5)glom

Function: form each partition into an array to form a new RDD type [array [t]]
Requirements: create an RDD with three partitions and put the data of each partition into an array

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create RDD S for 3 partitions
    val listRdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8), 3)
    //4,glom
    val rsRdd: RDD[Array[Int]] = listRdd.glom()
    //5. Print
    rsRdd.foreach(item => {
      println(item.mkString(","))
    })
  }
}

6)groupBy

Grouping: grouping according to the return value of the incoming function. Put the value corresponding to the same key into an iterator.
Requirement: create an RDD and group it with the value of 2 according to the element module.

def main(args: Array[String]): Unit = {
  //1. Create local spark profile
  val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
  //2. Create Spark context object
  val sc = new SparkContext(config)
  //3. Create RDD
  val listRdd: RDD[Int] = sc.makeRDD(1 to 20)
  //4. Grouping
  val rsRdd: RDD[(Int, Iterable[Int])] = listRdd.groupBy(_%2)
  //5. Print
  rsRdd.foreach(t=>println(t._1,t._2))
}

7)filter

Function: filtering. Returns a new RDD, which is composed of input elements whose return value is true after being calculated by func function.
Requirement: create an RDD, filter out the number with 2 and the remainder is 0

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create RDD
    val listRdd: RDD[Int] = sc.makeRDD(1 to 10)
    //4. Filter
    val rsRdd: RDD[Int] = listRdd.filter(_%2==0)
    //5. Print
    rsRdd.foreach(println)
  }
}

8)distinct

De duplication partition

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create RDD
    val listRdd: RDD[Int] = sc.makeRDD(List(1,2,1,2,1,2))
    //4. Save results with 2 partitions
    val rsRdd: RDD[Int] = listRdd.distinct(2)
    //5. Print
    rsRdd.foreach(println)
  }
}

9)repartition(numPartitions)

Repartition: randomly shuffle all data through the network according to the number of partitions.
Requirement: create an RDD with 4 partitions and repartition it

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create Rdd for 4 partitions
    val listRdd: RDD[Int] = sc.makeRDD(1 to 10, 4)
    println("repartition before:" + listRdd.partitions.size)
    //4. Repartition
    val rsRdd: RDD[Int] = listRdd.repartition(2)
    //5. Print
    println("repartition after:" + rsRdd.partitions.size)
  }
}

10)sortBy(func,[ascending], [numTasks])

effect; Use func to process the data first and sort according to the processed data comparison results. The default is positive order.
Requirements: create an RDD and sort according to different rules

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create Rdd for 4 partitions
    val listRdd: RDD[Int] = sc.makeRDD(1 to 10)
    //4. Ascending order
    val sortAsc: Array[Int] = listRdd.sortBy(x => x).collect()
    println(sortAsc.mkString(","))
    //5. Descending order
    val sortDesc: Array[Int] = listRdd.sortBy(x => x, false).collect()
    println(sortDesc.mkString(","))
  }
}

Double Value type interaction

1) union(otherDataset)

Function: returns a new RDD after combining the source RDD and the parameter RDD
Requirements: create two RDD S and combine them

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create 2 list s
    val list1: RDD[Int] = sc.makeRDD(1 to 2)
    val list2: RDD[Int] = sc.makeRDD(2 to 4)
    //4. Union level
    val rsRdd: Array[Int] = list1.union(list2).collect()
    //5. Print
    rsRdd.foreach(println)
  }
}

2)subtract (otherDataset)

Difference set, a function for calculating the difference. Remove the same elements in two RDDS, and different RDDS will be retained. Create two RDDS and find the difference set between the first RDD and the second RDD

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create 2 list s
    val list1: RDD[Int] = sc.makeRDD(1 to 10)
    val list2: RDD[Int] = sc.makeRDD(5 to 10)
    //4. Union level
    val rsRdd: Array[Int] = list1.subtract(list2).collect()
    //5. Print
    rsRdd.foreach(println)
  }
}

3)intersection(otherDataset)

Intersection: find the intersection between the source RDD and the parameter RDD and return a new RDD

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create 2 list s
    val list1: RDD[Int] = sc.makeRDD(1 to 10)
    val list2: RDD[Int] = sc.makeRDD(5 to 10)
    //4. Intersection
    val rsRdd: Array[Int] = list1.intersection(list2).collect()
    //5. Print
    rsRdd.foreach(println)
  }
}

Key value type

1)groupByKey

groupByKey also operates on each key, but only generates a sequence.
Requirement: wordcount

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create list
    val list: RDD[String] = sc.makeRDD(List("Hello World","Hello Scala","Spark Spark Spark"))
    //4,wordcount
     val mapRdd: RDD[(String, Int)] = list.flatMap(_.split(" ")).map((_,1))
    //5,groupByKey
    val groupByKeyRdd: RDD[(String, Iterable[Int])] = mapRdd.groupByKey()
    groupByKeyRdd.foreach(println)
    //6. Aggregate
    val wcRdd: RDD[(String, Int)] = groupByKeyRdd.map(t=>(t._1,t._2.size))
    wcRdd.foreach(println)
  }
}

2)reduceByKey

Call on a (K,V) RDD and return a (K,V) RDD. Use the specified reduce function to aggregate the values of the same key. The number of reduce tasks can be set through the second optional parameter.
Requirement: wordcount

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create list
    val list: RDD[String] = sc.makeRDD(List("Hello World","Hello Scala","Spark Spark Spark"))
    //4. Flattening
     val mapRdd: RDD[(String, Int)] = list.flatMap(_.split(" ")).map((_,1))
    //5,wordcount
    val reduceMapRdd: RDD[(String, Int)] = mapRdd.reduceByKey(_+_)
    //6. Print
    reduceMapRdd.foreach(println)
  }
}

3)sortByKey([ascending], [numTasks])

Function: when called on a (K,V) RDD, K must implement the Ordered interface and return a (K,V) RDD sorted by key
Create a pairRDD and sort according to the positive and reverse order of key s

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create list
    val listRdd: RDD[(Int, Int)] = sc.makeRDD(List((1, 3), (1, 2), (1, 4), (2, 3), (3, 6), (3, 8)))
    //4. Ascending order
    val sortAsc: Array[(Int, Int)] = listRdd.sortByKey(true).collect()
    println("Ascending order:" + sortAsc.mkString(","))
    //5. Descending order
    val sortDesc: Array[(Int, Int)] = listRdd.sortByKey(false).collect()
    println("Descending order:" + sortDesc.mkString(","))
  }
}

4)join(otherDataset, [numTasks])

Call on RDDS of types (K,V) and (K,W) to return the RDD of (K,(V,W)) of all element pairs corresponding to the same key

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    val r1: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
    val r2: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
    val rsRdd: Array[(Int, (String, String))] = r1.join(r2).collect()
    rsRdd.foreach(println)
  }
}

Action operator

1) reduce, function: gather all elements in RDD through func function, first aggregate data in partitions, and then aggregate data between partitions.
Requirement: create an RDD and aggregate all elements to get the result.

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 5)
    //4,reduce
    val rs: Int = rdd.reduce(_+_)
    //5. Results
    println(rs)
  }
}

2) collect()

In the driver, all elements of the dataset are returned as an array.
Create an RDD and collect the RDD content to the Driver side for printing

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create RDD
    val lisrRdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5))
    val ints: Array[Int] = lisrRdd.collect()
    println(ints.mkString(","))
  }
}

3) count()

Returns the number of elements in the RDD

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create RDD
     val rdd: RDD[Int] = sc.makeRDD(1 to 10)
    val rddCount: Long = rdd.count()
    println(rddCount)
  }
}

4)first()

Returns the first element in the RDD

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 10)
    //4. Returns the first element in the RDD
    val firstNum: Int = rdd.first()
    println(firstNum)
  }
}

5) take(n)

Returns an array of the first n elements of an RDD

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 10)
    val top3: Array[Int] = rdd.take(3)
    top3.foreach(println)
  }
}

6) takeOrdered(n)

Returns an array of the first n elements sorted by the RDD

def main(args: Array[String]): Unit = {
  //1. Create local spark profile
  val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
  //2. Create Spark context object
  val sc = new SparkContext(config)
  //3. Create RDD
  val rdd: RDD[Int] = sc.makeRDD(List(5, 3, 2, 1, 7, 6))
  val top3: Array[Int] = rdd.takeOrdered(3)
  println(top3.mkString(","))
}

7)countByKey()

For RDD S of type (K,V), a (K,Int) map is returned, indicating the number of elements corresponding to each key.

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create RDD
    val rdd: RDD[(Int, Int)] = sc.makeRDD(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)))
    //4. Statistics key
    val countKey: collection.Map[Int, Long] = rdd.countByKey()
    //5. Print
    println(countKey)
  }
}

8)foreach

Print

object SparkRdd {
  def main(args: Array[String]): Unit = {
    //1. Create local spark profile
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //2. Create Spark context object
    val sc = new SparkContext(config)
    //3. Create RDD
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5))
    rdd.foreach(println)
  }
}

Keywords: Big Data Spark

Added by cainfool on Mon, 10 Jan 2022 21:37:05 +0200