Scala-19: complex WordCount case

Scala-19: complex WordCount case

1, Case analysis

In a simple case, all the data passed in are strings, as shown below:

List(
      "hello",
      "hello world",
      "hello scala",
      "hello spark from scala",
      "hello flink from scala"
    )

If the incoming data is not all string, but after a statistical data, such as

List(
      ("hello",1),
      ("hello world",2),
      ("hello scala",3),
      ("hello spark from scala",1),
      ("hello flink from scala",2)
    )

Then the processing process will be a little more complicated. We need to consider the number of times after each word. There are two common processing methods

1: First kind

Expand directly to normal version

("hello world",2) => ("hello world hello world")

After becoming a normal version, the processing flow is the same as that of the previous simple WordCount case

2: Second

Conversion based on the results of pre statistics

("hello world",2) => ("hello", 2), ("world", 2)

Word grouping by binary

("hello" => ("hello", 1), ("hello", 2), ("hello", 3), ("hello", 1), ("hello", 2))

Superimpose the pre counted value of each word

("hello", 9)

2, Method used

The method used is similar to that in a simple case, except that different processing methods require different types of map stage output

1: Break up the list into each word + number flatMap

List[(String, Int)] => List[(String, Int)]

2: Superimpose the pre counted value of each word

Map[String, Lsit[(String, Int)]] => Map[(String, Int)]

3, Code implementation

1: Expand directly to normal version

object ComplexWordCount {
  def main(args: Array[String]): Unit = {

    val stringList: List[(String, Int)] = List(
      ("hello",1),
      ("hello world",2),
      ("hello scala",3),
      ("hello spark from scala",1),
      ("hello flink from scala",2)
    )

    //Idea 1: directly expand to normal version
    val newStringList: List[String] = stringList.map(
      kv => {
        (kv._1.trim + " ") * kv._2
      }
    )

    //Break up
    val groupMap: Map[String, List[String]] = newStringList.flatMap(_.split(" ")).groupBy(word => word)
    val countMap: Map[String, Int] = groupMap.map(kv => (kv._1, kv._2.length))
    //sort
    val sortMap: List[(String, Int)] = countMap.toList.sortWith(_._2 > _._2)
    println(sortMap)
    //Take the first three
    val preList: List[(String, Int)] = sortMap.take(3)
    println(preList)
  }

}

2: Conversion based on the results of pre statistics

object ComplexWordCount {
  def main(args: Array[String]): Unit = {

    val stringList: List[(String, Int)] = List(
      ("hello",1),
      ("hello world",2),
      ("hello scala",3),
      ("hello spark from scala",1),
      ("hello flink from scala",2)
    )

    //1. Break up the list into + words
    val proCountList: List[(String, Int)] = stringList.flatMap(
      tuple => {
        val strings: Array[String] = tuple._1.split(" ")
        strings.map(word => (word,tuple._2))
      }
    )

    //2. Group according to binary words
    val preCountMap: Map[String, List[(String, Int)]] = proCountList.groupBy(_._1)
    println(preCountMap)

    //3. Superimpose the pre counted values of each word
    val countMap: Map[String, Int] = preCountMap.map(
      tupleList => (tupleList._1, tupleList._2.map(_._2).sum)

    )

    //4. Convert to list, sort and take the top three
    val sortList: List[(String, Int)] = countMap.toList.sortWith(_._2 > _._2).take(3)
    println(sortList)

  }

}

4, Differences between the two methods

The first method is to multiply the word by the pre statistical result to obtain the full string data. The final processing method is the same as that of a simple case. Strictly speaking, the final statistics cannot be regarded as complete aggregation

The second method is to aggregate according to the pre statistical results of each word with the help of the pre statistical results. It is a real aggregation operation. This method is recommended and is similar to the stage of MapRedue.

Keywords: Scala

Added by asa_carter on Sun, 19 Dec 2021 03:21:56 +0200