Shenzhen big data training: demonstration of Transformation operator
val conf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(conf) //Generation of rdd by parallelization val rdd = sc.parallelize(List(5,6,4,7,3,8,2,9,10)) //map: multiply each element in rdd by 2 and sort val rdd2: RDD[Int] = rdd.map(_ * 2) //collect returns all elements of the dataset as an array (it is an Action operator) println(rdd2.collect().toBuffer) //filter: the RDD consists of the input elements with the return value of true after func function calculation val rdd3: RDD[Int] = rdd2.filter(_ > 10) println(rdd3.collect().toBuffer) val rdd4 = sc.parallelize(Array("a b c","b c d")) //flatMap: cut and flatten the elements in rdd4 val rdd5: RDD[String] = rdd4.flatMap(_.split(" ")) println(rdd5.collect().toBuffer) //If: list (list ("a, B", "B C"), list ("E C", "I O")) //Flatten flatMap(_.flatMap(_.split("")) //sample random sampling //With replacement indicates whether the extracted data is put back. true indicates sampling with put back, and false indicates sampling without put back. //fraction sampling ratio, for example, 30% is 0.3, but this value is a floating value, which is not accurate. //Seed specifies that the random number generator seed default parameter is not passed val rdd5_1 = sc.parallelize(1 to 10) val sample = rdd.sample(false,0.5) println(sample.collect().toBuffer) //union: union val rdd6 = sc.parallelize(List(5,6,7,8)) val rdd7 = sc.parallelize(List(1,2,5,6)) val rdd8 = rdd6 union rdd7 println(rdd8.collect.toBuffer) //Intersection: intersection val rdd9 = rdd6 intersection rdd7 println(rdd9.collect.toBuffer) //distinct: to repeat println(rdd8.distinct.collect.toBuffer) //join the same key will be merged val rdd10_1 = sc.parallelize(List(("tom",1),("jerry" ,3),("kitty",2))) val rdd10_2 = sc.parallelize(List(("jerry" ,2),("tom",2),("dog",10))) val rdd10_3 = rdd10_1 join rdd10_2 println(rdd10_3.collect().toBuffer) //Left connection and right connection //Option type except benchmark, because there may be null value, so option is used. val rdd10_4 = rdd10_1 leftOuterJoin rdd10_2 //No null based on the left val rdd10_5 = rdd10_1 rightOuterJoin rdd10_2 //No null based on the right println(rdd10_4.collect().toList) println(rdd10_5.collect().toBuffer) val rdd11_1 = sc.parallelize(List(("tom",1),("jerry" ,3),("kitty",2))) val rdd11_2 = sc.parallelize(List(("jerry" ,2),("tom",2),("dog",10))) //Cartesian product val rdd11_3 = rdd11_1 cartesian rdd11_2 println(rdd11_3.collect.toBuffer)
//Group based on incoming parameters
val rdd11_5_1 = rdd11_4.groupBy(_._1) println(rdd11_5_1.collect().toList) //Group according to the same key, and partition can be made. val rdd11_5_2 = rdd11_4.groupByKey println(rdd11_5_2.collect().toList) //Group according to the same key [two tuples are needed for grouping] //The difference between cogroup and groupBykey //cogroup does not need to merge the data first to group. The result is the same key and different data sets. //groupByKey needs to be merged first and then grouped according to the same key. val rdd11_6: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11_1 cogroup rdd11_2 println(rdd11_6)
Action operator demonstration
val conf = new SparkConf().setAppName("Test").setMaster("local[]")
val sc = new SparkContext(conf)
/Action operator*/
//Set function
val rdd1 = sc.parallelize(List(2,1,3,6,5),2)
val rdd11 = rdd1.reduce(+_)
println(rdd1_1)
//Returns all elements of a dataset as an array
println(rdd1.collect().toBuffer)
//Return the number of elements of RDD
println(rdd1.count())
//Take the value of the corresponding quantity in descending order by default. If you enter 0, an empty array will be returned.
println(rdd1.top(3).toBuffer)
//Take out the value of corresponding quantity in sequence
println(rdd1.take(3).toBuffer)
//Take out the value of the corresponding quantity in order. The default order is generated.
println(rdd1.takeOrdered(3).toBuffer)
//Getting the first value is equivalent to taking (1)
println(rdd1.first())
//Write the processed data as a file (stored in HDFS or local file system)
//rdd1.saveAsTextFile("dir/file1")
//Count the number of keys and generate map k is the name of key v is the number of keys
val rdd2 = sc.parallelize(List(("key1",2),("key2",1),("key3",3),("key4",6),("key5",5)),2)
val rdd2_1: collection.Map[String, Long] = rdd2.countByKey()
println(rdd2_1)
//Traversal data
rdd1.foreach(x => println(x))
/*Other operators*/ //Count the number of value s, but treat an element in the collection as a vluae val value: collection.Map[(String, Int), Long] = rdd2.countByValue println(value) //filterByRange: filter the elements in RDD and return the data within the specified range val rdd3 = sc.parallelize(List(("e",5),("c",3),("d",4),("c",2),("a",1))) val rdd3_1: RDD[(String, Int)] = rdd3.filterByRange("c","e")//Including start and end println(rdd3_1.collect.toList) //flatMapValues is used to flatten parameters. It is the value of value. val rdd3_2 = sc.parallelize(List(("a","1 2"),("b","3 4"))) println( rdd3_2.flatMapValues(_.split(" ")).collect.toList) //The foreachPartition loops over partition data // Foreach partiton is generally used for data persistence, storage in database, and partition data storage. val rdd4 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3) rdd4.foreachPartition(x => println(x.reduce(_+_))) //keyBy takes the return value of the function passed in as the key, and the element in RDD is a new tuple of value val rdd5 = sc.parallelize(List("dog","cat","pig","wolf","bee"),3) val rdd5_1: RDD[(Int, String)] = rdd5.keyBy(_.length) println(rdd5_1.collect.toList) //keys get all key values get all values println(rdd5_1.keys.collect.toList) println(rdd5_1.values.collect.toList) //collectAsMap converts the required binary into a Map val map: collection.Map[String, Int] = rdd2.collectAsMap() println(map)