Shenzhen big data training: demonstration of Transformation operator

Shenzhen big data training: demonstration of Transformation operator
val conf = new SparkConf().setAppName("Test").setMaster("local")

  val sc = new SparkContext(conf)

//Generation of rdd by parallelization

val rdd = sc.parallelize(List(5,6,4,7,3,8,2,9,10))

//map: multiply each element in rdd by 2 and sort

val rdd2: RDD[Int] = rdd.map(_ * 2)

//collect returns all elements of the dataset as an array (it is an Action operator)

println(rdd2.collect().toBuffer)

//filter: the RDD consists of the input elements with the return value of true after func function calculation

val rdd3: RDD[Int] = rdd2.filter(_ > 10)

println(rdd3.collect().toBuffer)

val rdd4 = sc.parallelize(Array("a b c","b c d"))

//flatMap: cut and flatten the elements in rdd4

val rdd5: RDD[String] = rdd4.flatMap(_.split(" "))

println(rdd5.collect().toBuffer)

//If: list (list ("a, B", "B C"), list ("E C", "I O"))

//Flatten flatMap(_.flatMap(_.split(""))

//sample random sampling

//With replacement indicates whether the extracted data is put back. true indicates sampling with put back, and false indicates sampling without put back.

//fraction sampling ratio, for example, 30% is 0.3, but this value is a floating value, which is not accurate.

//Seed specifies that the random number generator seed default parameter is not passed

val rdd5_1 = sc.parallelize(1 to 10)

val sample = rdd.sample(false,0.5)

println(sample.collect().toBuffer)

//union: union

val rdd6 = sc.parallelize(List(5,6,7,8))

val rdd7 = sc.parallelize(List(1,2,5,6))

val rdd8 = rdd6 union rdd7

println(rdd8.collect.toBuffer)

//Intersection: intersection

val rdd9 = rdd6 intersection rdd7

println(rdd9.collect.toBuffer)

//distinct: to repeat

println(rdd8.distinct.collect.toBuffer)

//join the same key will be merged

val rdd10_1 = sc.parallelize(List(("tom",1),("jerry" ,3),("kitty",2)))

val rdd10_2 = sc.parallelize(List(("jerry" ,2),("tom",2),("dog",10)))

val rdd10_3 = rdd10_1 join rdd10_2

println(rdd10_3.collect().toBuffer)

//Left connection and right connection

//Option type except benchmark, because there may be null value, so option is used.

val rdd10_4 = rdd10_1 leftOuterJoin rdd10_2 //No null based on the left

val rdd10_5 = rdd10_1 rightOuterJoin rdd10_2 //No null based on the right

println(rdd10_4.collect().toList)

println(rdd10_5.collect().toBuffer)

val rdd11_1 = sc.parallelize(List(("tom",1),("jerry" ,3),("kitty",2)))

val rdd11_2 = sc.parallelize(List(("jerry" ,2),("tom",2),("dog",10)))

//Cartesian product

val rdd11_3 = rdd11_1 cartesian rdd11_2

println(rdd11_3.collect.toBuffer)

//Group based on incoming parameters

val rdd11_5_1 = rdd11_4.groupBy(_._1)

println(rdd11_5_1.collect().toList)

//Group according to the same key, and partition can be made.

val rdd11_5_2 = rdd11_4.groupByKey

println(rdd11_5_2.collect().toList)

//Group according to the same key [two tuples are needed for grouping]

//The difference between cogroup and groupBykey

//cogroup does not need to merge the data first to group. The result is the same key and different data sets.

//groupByKey needs to be merged first and then grouped according to the same key.

val rdd11_6: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11_1 cogroup rdd11_2

println(rdd11_6)

Action operator demonstration
val conf = new SparkConf().setAppName("Test").setMaster("local[]")
val sc = new SparkContext(conf)
/Action operator*/
//Set function
val rdd1 = sc.parallelize(List(2,1,3,6,5),2)
val rdd11 = rdd1.reduce(+_)
println(rdd1_1)
//Returns all elements of a dataset as an array
println(rdd1.collect().toBuffer)
//Return the number of elements of RDD
println(rdd1.count())
//Take the value of the corresponding quantity in descending order by default. If you enter 0, an empty array will be returned.
println(rdd1.top(3).toBuffer)
//Take out the value of corresponding quantity in sequence
println(rdd1.take(3).toBuffer)
//Take out the value of the corresponding quantity in order. The default order is generated.
println(rdd1.takeOrdered(3).toBuffer)
//Getting the first value is equivalent to taking (1)
println(rdd1.first())
//Write the processed data as a file (stored in HDFS or local file system)
//rdd1.saveAsTextFile("dir/file1")
//Count the number of keys and generate map k is the name of key v is the number of keys
val rdd2 = sc.parallelize(List(("key1",2),("key2",1),("key3",3),("key4",6),("key5",5)),2)
val rdd2_1: collection.Map[String, Long] = rdd2.countByKey()
println(rdd2_1)
//Traversal data
rdd1.foreach(x => println(x))

/*Other operators*/
//Count the number of value s, but treat an element in the collection as a vluae
val value: collection.Map[(String, Int), Long] = rdd2.countByValue
println(value)
//filterByRange: filter the elements in RDD and return the data within the specified range
val rdd3 = sc.parallelize(List(("e",5),("c",3),("d",4),("c",2),("a",1)))
val rdd3_1: RDD[(String, Int)] = rdd3.filterByRange("c","e")//Including start and end
println(rdd3_1.collect.toList)
//flatMapValues is used to flatten parameters. It is the value of value.
val rdd3_2 = sc.parallelize(List(("a","1 2"),("b","3 4")))
println( rdd3_2.flatMapValues(_.split(" ")).collect.toList)
//The foreachPartition loops over partition data
// Foreach partiton is generally used for data persistence, storage in database, and partition data storage.
val rdd4 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
rdd4.foreachPartition(x => println(x.reduce(_+_)))
//keyBy takes the return value of the function passed in as the key, and the element in RDD is a new tuple of value
val rdd5 = sc.parallelize(List("dog","cat","pig","wolf","bee"),3)
val rdd5_1: RDD[(Int, String)] = rdd5.keyBy(_.length)
println(rdd5_1.collect.toList)
//keys get all key values get all values
println(rdd5_1.keys.collect.toList)
println(rdd5_1.values.collect.toList)
//collectAsMap converts the required binary into a Map
val map: collection.Map[String, Int] = rdd2.collectAsMap()
println(map)

Keywords: Big Data Database

Added by hippypink on Sat, 02 Nov 2019 22:04:15 +0200