A detailed explanation of Spark operator

Explanation of Spark (2) operator

Article directory

I. wordcount

Based on the last wordcount, let's write a wordcount to count the wc program for the second time. Let's analyze the performance.

package com.littlepage.wc

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("wc").setMaster("local")
    val sparkContext=new SparkContext(conf)
    sparkContext.setLogLevel("error")
    val fileRDD:RDD[String] = sparkContext.textFile("data/data")
    val words:RDD[String] = fileRDD.flatMap(_.split(" "))
    val pairWord:RDD[(String,Int)] = words.map((_,1))
    val res:RDD[(String,Int)] = pairWord.reduceByKey(_+_)
    println("wordcount:")
    res.foreach(println)
    val rev:RDD[(Int,Int)] = res.map((x)=>{(x._2,1)})
    val pl:RDD[(Int,Int)] = rev.reduceByKey(_+_)
    println("\nwordcountcount")
    pl.foreach(println)
    Thread.sleep(100000000)
  }
}

Through the performance chart, we can know:

1. If Spark does not store or output its results, it will not process map or reduce operations.

2. If the output is repeated, the common map or reduce operation is only executed once.

3. By default, if a shuffle is generated, it is a turn to view the chart. In order to minimize the consumption of performance, the number of shuffles should be minimized when programming.

II. Programming model

Compared with MapReduce, Spark programming model can execute multiple jobs and states.

Source part reference video

III. use of RDD data sets and operators

1. Three necessary operators

When we write a Spark program, there are three inevitable operators: creation operator, transformation operator and collection operator.

The creation operator can create an RDD data set, which can be obtained in memory (collection container) or in hard disk (file)

The conversion operator can process an RDD data set, that is, map and reduce operations, which are regarded as conversion operators.

Collection operator when we write an RDD data set, we must use the collection operator to collect, otherwise we will not trigger shuffle.

For example, three operators write a filter number program.

package com.littlepage

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object demo2 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("demo2").setMaster("local")
    val sc=new SparkContext(conf)
    sc.setLogLevel("error")
    val dataRDD: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,6,5,4,3,2,1))//Create operator
    val filterRDD: RDD[Int] = dataRDD.filter(_>3)//Transformation operator
    val ints:Array[Int] = filterRDD.collect()//Collection operator
    Thread.sleep(100000)
  }
}
package com.littlepage

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object demo2 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("demo2").setMaster("local")
    val sc=new SparkContext(conf)
    sc.setLogLevel("error")
    val dataRDD: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,6,5,4,3,2,1))//Create operator
    val filterRDD: RDD[Int] = dataRDD.filter(_>3)//Transformation operator
    val ints:Array[Int] = filterRDD.collect()//Collection operator
    Thread.sleep(100000)
  }
}
2. Common operators (Cartesian, cogroup, join)

Union operator

Merge two data sets into one data set directly without shuffle

object union {
  def main(args: Array[String]): Unit = {
    val sc=new SparkContext(new SparkConf().setMaster("local").setAppName("union"))
    sc.setLogLevel("error")
    val rdd1:RDD[Int] = sc.parallelize(List(1,2,3,4,6,7))
    val rdd2:RDD[Int] = sc.parallelize(List(2,3,4,5))
    val uniondata = rdd1.union(rdd2)
    uniondata.foreach(print)
    Thread.sleep(100000)
  }
}

2.2.intersection operator

Take the intersection of two data sets to generate a shuffle

val interdata:RDD[Int] = rdd1.intersection(rdd2)

2.3.substract operator

Take the difference set of 2 data sets to generate a shuffle

val subdata:RDD[Int] = rdd1.substract(rdd2)

Cartesian operator

Take Cartesian product for two data sets, and no shuffle will be generated.

val cartesiandata:RDD[Int] = rdd1.cartesian(rdd2)

2.5.cogroup operator

Two groups, key as the result key, value set as a binary ancestor, containing two partition elements, produce a shuffle.

val rdd1:RDD[(String,Int)] = sc.parallelize(List(
      ("zhangsan",11),
      ("zhangsan",12),
      ("lisi",13),
      ("wangwu",14)
    ));
    val rdd2:RDD[(String,Int)] = sc.parallelize(List(
      ("zhangsan",21),
      ("zhangsan",22),
      ("lisi",23),
      ("zhaoliu",28)
    ))
    val cogroupdata:RDD[(String,(Iterable[Int],Iterable[Int]))] = rdd1.cogroup(rdd2)

6.join,leftOuterJoin,rightOuterJoin,fullOuterJoin operators

val joindata:RDD[(String,(Int,Int))] = rdd1.join(rdd2)
val leftdata:RDD[(String,(Int,Option[Int]))] = rdd1.leftOuterJoin(rdd2)
val rightdata:RDD[(String,(Option[Int],Int))]  = rdd2.rightOuterJoin(rdd2)
val fulldata:RDD[(String,(Option[Int],Option[Int]))]  = rdd1.fullOuterJoin(rdd2)
3. Sorting and aggregation calculation

3.1.swap operator

Exchange the key and value of a k-v data set, using

data.map(_.swap)

3.2.sort operator

Sort operator can sort all the items according to key

data.sortByKey()

3.3.take operator

The first n data obtained, n is an integer

data.take(n)

3.4.distinct de duplication

Remove the same key

val keys:RDD[(String,String) = map.distinct()

Keywords: Spark Apache Programming

Added by Cragsterboy on Mon, 28 Oct 2019 08:52:55 +0200