1. Business requirements
Calculate a cell phone number (base station, dwell time), (current longitude, current latitude) by holding the cell phone number's dwell time log and base station information at each base station
The log information generated by connecting the mobile phone to the base station is similar to the following:
18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1 18611132889,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1 18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0 18611132889,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0
The above meaning means: cell phone number, time, base station ID, type of access network (0: unknown, 1:3g, 2:2g, 6:4g)
Base station information:
9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6 CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6 16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6
The above meaning indicates: base station ID, longitude, latitude, access network type (0: unknown, 1:3g, 2:2g, 6:4g)
Write Scale code:
package com.Hive import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object FD { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("FD").setMaster("local[2]") val sc = new SparkContext(conf) //1. Read data file val user =sc.textFile("src/main/data/log/")//user data val base = sc.textFile("src/main/data/base_info.txt")//Base station data //2. Data cleaning, data dimension extraction // User data cleaning val splited = user.map(line =>{ val fields = line.split(",") val phone = fields(0) val base = fields(2) val envet = fields(3).toInt val time = { if (envet == 1){ -fields(1).toLong//Assignment - }else{ fields(1).toLong//Positive + } } ((phone,base),time) }) // splited.collect().foreach(println(_)) // Base station data cleaning val alcsplited = base.map(line =>{ val fields = line.split(",") val id = fields(0) val x = fields(1) val y = fields(2) (id,(x,y)) }) // splited.collect().foreach(println(_)) //3. Count the time that each user stays in each base station val reducted = splited.reduceByKey(_+_) // reducted.collect().foreach(println(_)) //((phone,base),time) val pmt = reducted.map(x=>{ //(base station ID, (mobile number, time)) //X. 1 corresponds to (mobile,lac) in tuple (mobile,lac, time) //X. 2 corresponds to time in tuple (mobile,lac, time) ((x._1._2),(x._1._1,x._2)) }) //Result after connection join [(base station ID, ((mobile number, time), (longitude, latitude))] val joined:RDD[(String, ((String, Long), (String, String)))] = pmt.join(alcsplited) //Group by phone number //_.: represents the cell phone number, time, longitude and latitude of the base station //_. 2: mobile number, time longitude, latitude //_. 2? 1: mobile number, time //_. 2. 1. Represents the mobile phone number val MobileGroupBykey = joined.groupBy(_._2._1._1) val result = MobileGroupBykey.mapValues(_.toList.sortBy(_._2._1._2).reverse.take(2)) println(result.collect().toBuffer) sc.stop() } }