Explanation of Spark (2) operator
Article directory
I. wordcount
Based on the last wordcount, let's write a wordcount to count the wc program for the second time. Let's analyze the performance.
package com.littlepage.wc import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { val conf=new SparkConf().setAppName("wc").setMaster("local") val sparkContext=new SparkContext(conf) sparkContext.setLogLevel("error") val fileRDD:RDD[String] = sparkContext.textFile("data/data") val words:RDD[String] = fileRDD.flatMap(_.split(" ")) val pairWord:RDD[(String,Int)] = words.map((_,1)) val res:RDD[(String,Int)] = pairWord.reduceByKey(_+_) println("wordcount:") res.foreach(println) val rev:RDD[(Int,Int)] = res.map((x)=>{(x._2,1)}) val pl:RDD[(Int,Int)] = rev.reduceByKey(_+_) println("\nwordcountcount") pl.foreach(println) Thread.sleep(100000000) } }
Through the performance chart, we can know:
1. If Spark does not store or output its results, it will not process map or reduce operations.
2. If the output is repeated, the common map or reduce operation is only executed once.
3. By default, if a shuffle is generated, it is a turn to view the chart. In order to minimize the consumption of performance, the number of shuffles should be minimized when programming.
II. Programming model
Compared with MapReduce, Spark programming model can execute multiple jobs and states.
Source part reference video
III. use of RDD data sets and operators
1. Three necessary operators
When we write a Spark program, there are three inevitable operators: creation operator, transformation operator and collection operator.
The creation operator can create an RDD data set, which can be obtained in memory (collection container) or in hard disk (file)
The conversion operator can process an RDD data set, that is, map and reduce operations, which are regarded as conversion operators.
Collection operator when we write an RDD data set, we must use the collection operator to collect, otherwise we will not trigger shuffle.
For example, three operators write a filter number program.
package com.littlepage import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object demo2 { def main(args: Array[String]): Unit = { val conf=new SparkConf().setAppName("demo2").setMaster("local") val sc=new SparkContext(conf) sc.setLogLevel("error") val dataRDD: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,6,5,4,3,2,1))//Create operator val filterRDD: RDD[Int] = dataRDD.filter(_>3)//Transformation operator val ints:Array[Int] = filterRDD.collect()//Collection operator Thread.sleep(100000) } }
package com.littlepage import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object demo2 { def main(args: Array[String]): Unit = { val conf=new SparkConf().setAppName("demo2").setMaster("local") val sc=new SparkContext(conf) sc.setLogLevel("error") val dataRDD: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,6,5,4,3,2,1))//Create operator val filterRDD: RDD[Int] = dataRDD.filter(_>3)//Transformation operator val ints:Array[Int] = filterRDD.collect()//Collection operator Thread.sleep(100000) } }
2. Common operators (Cartesian, cogroup, join)
Union operator
Merge two data sets into one data set directly without shuffle
object union { def main(args: Array[String]): Unit = { val sc=new SparkContext(new SparkConf().setMaster("local").setAppName("union")) sc.setLogLevel("error") val rdd1:RDD[Int] = sc.parallelize(List(1,2,3,4,6,7)) val rdd2:RDD[Int] = sc.parallelize(List(2,3,4,5)) val uniondata = rdd1.union(rdd2) uniondata.foreach(print) Thread.sleep(100000) } }
2.2.intersection operator
Take the intersection of two data sets to generate a shuffle
val interdata:RDD[Int] = rdd1.intersection(rdd2)
2.3.substract operator
Take the difference set of 2 data sets to generate a shuffle
val subdata:RDD[Int] = rdd1.substract(rdd2)
Cartesian operator
Take Cartesian product for two data sets, and no shuffle will be generated.
val cartesiandata:RDD[Int] = rdd1.cartesian(rdd2)
2.5.cogroup operator
Two groups, key as the result key, value set as a binary ancestor, containing two partition elements, produce a shuffle.
val rdd1:RDD[(String,Int)] = sc.parallelize(List( ("zhangsan",11), ("zhangsan",12), ("lisi",13), ("wangwu",14) )); val rdd2:RDD[(String,Int)] = sc.parallelize(List( ("zhangsan",21), ("zhangsan",22), ("lisi",23), ("zhaoliu",28) )) val cogroupdata:RDD[(String,(Iterable[Int],Iterable[Int]))] = rdd1.cogroup(rdd2)
6.join,leftOuterJoin,rightOuterJoin,fullOuterJoin operators
val joindata:RDD[(String,(Int,Int))] = rdd1.join(rdd2) val leftdata:RDD[(String,(Int,Option[Int]))] = rdd1.leftOuterJoin(rdd2) val rightdata:RDD[(String,(Option[Int],Int))] = rdd2.rightOuterJoin(rdd2) val fulldata:RDD[(String,(Option[Int],Option[Int]))] = rdd1.fullOuterJoin(rdd2)
3. Sorting and aggregation calculation
3.1.swap operator
Exchange the key and value of a k-v data set, using
data.map(_.swap)
3.2.sort operator
Sort operator can sort all the items according to key
data.sortByKey()
3.3.take operator
The first n data obtained, n is an integer
data.take(n)
3.4.distinct de duplication
Remove the same key
val keys:RDD[(String,String) = map.distinct()