Scala-19: complex WordCount case
1, Case analysis
In a simple case, all the data passed in are strings, as shown below:
List( "hello", "hello world", "hello scala", "hello spark from scala", "hello flink from scala" )
If the incoming data is not all string, but after a statistical data, such as
List( ("hello",1), ("hello world",2), ("hello scala",3), ("hello spark from scala",1), ("hello flink from scala",2) )
Then the processing process will be a little more complicated. We need to consider the number of times after each word. There are two common processing methods
1: First kind
Expand directly to normal version
("hello world",2) => ("hello world hello world")
After becoming a normal version, the processing flow is the same as that of the previous simple WordCount case
2: Second
Conversion based on the results of pre statistics
("hello world",2) => ("hello", 2), ("world", 2)
Word grouping by binary
("hello" => ("hello", 1), ("hello", 2), ("hello", 3), ("hello", 1), ("hello", 2))
Superimpose the pre counted value of each word
("hello", 9)
2, Method used
The method used is similar to that in a simple case, except that different processing methods require different types of map stage output
1: Break up the list into each word + number flatMap
List[(String, Int)] => List[(String, Int)]
2: Superimpose the pre counted value of each word
Map[String, Lsit[(String, Int)]] => Map[(String, Int)]
3, Code implementation
1: Expand directly to normal version
object ComplexWordCount { def main(args: Array[String]): Unit = { val stringList: List[(String, Int)] = List( ("hello",1), ("hello world",2), ("hello scala",3), ("hello spark from scala",1), ("hello flink from scala",2) ) //Idea 1: directly expand to normal version val newStringList: List[String] = stringList.map( kv => { (kv._1.trim + " ") * kv._2 } ) //Break up val groupMap: Map[String, List[String]] = newStringList.flatMap(_.split(" ")).groupBy(word => word) val countMap: Map[String, Int] = groupMap.map(kv => (kv._1, kv._2.length)) //sort val sortMap: List[(String, Int)] = countMap.toList.sortWith(_._2 > _._2) println(sortMap) //Take the first three val preList: List[(String, Int)] = sortMap.take(3) println(preList) } }
2: Conversion based on the results of pre statistics
object ComplexWordCount { def main(args: Array[String]): Unit = { val stringList: List[(String, Int)] = List( ("hello",1), ("hello world",2), ("hello scala",3), ("hello spark from scala",1), ("hello flink from scala",2) ) //1. Break up the list into + words val proCountList: List[(String, Int)] = stringList.flatMap( tuple => { val strings: Array[String] = tuple._1.split(" ") strings.map(word => (word,tuple._2)) } ) //2. Group according to binary words val preCountMap: Map[String, List[(String, Int)]] = proCountList.groupBy(_._1) println(preCountMap) //3. Superimpose the pre counted values of each word val countMap: Map[String, Int] = preCountMap.map( tupleList => (tupleList._1, tupleList._2.map(_._2).sum) ) //4. Convert to list, sort and take the top three val sortList: List[(String, Int)] = countMap.toList.sortWith(_._2 > _._2).take(3) println(sortList) } }
4, Differences between the two methods
The first method is to multiply the word by the pre statistical result to obtain the full string data. The final processing method is the same as that of a simple case. Strictly speaking, the final statistics cannot be regarded as complete aggregation
The second method is to aggregate according to the pre statistical results of each word with the help of the pre statistical results. It is a real aggregation operation. This method is recommended and is similar to the stage of MapRedue.