Spark obtains a case of a mobile phone number staying under a base station and the location of the current mobile phone

1. Business requirements
Calculate a cell phone number (base station, dwell time), (current longitude, current latitude) by holding the cell phone number's dwell time log and base station information at each base station

The log information generated by connecting the mobile phone to the base station is similar to the following:

18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1
18611132889,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1
18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0
18611132889,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0

The above meaning means: cell phone number, time, base station ID, type of access network (0: unknown, 1:3g, 2:2g, 6:4g)

Base station information:

9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6
CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6
16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6

The above meaning indicates: base station ID, longitude, latitude, access network type (0: unknown, 1:3g, 2:2g, 6:4g)

Write Scale code:

	package com.Hive
	
	import org.apache.spark.rdd.RDD
	import org.apache.spark.{SparkConf, SparkContext}
	
	object FD {
	
	  def main(args: Array[String]): Unit = {
	
	    val conf = new SparkConf().setAppName("FD").setMaster("local[2]")
	    val sc  = new SparkContext(conf)
	
	    //1. Read data file
	
	    val user =sc.textFile("src/main/data/log/")//user data
	    val base  = sc.textFile("src/main/data/base_info.txt")//Base station data
	
	
	    //2. Data cleaning, data dimension extraction
	//    User data cleaning
	    val splited = user.map(line =>{
	
	    val fields = line.split(",")
	    val phone = fields(0)
	
	    val base = fields(2)
	    val envet = fields(3).toInt
	
	    val time  = {
	      if (envet == 1){
	       -fields(1).toLong//Assignment -
	      }else{
	        fields(1).toLong//Positive +
	      }
	    }
	
	  ((phone,base),time)
	})
	
	//   splited.collect().foreach(println(_))
	
	//    Base station data cleaning
	    val alcsplited = base.map(line =>{
	      val fields = line.split(",")
	      val id = fields(0)
	      val x = fields(1)
	      val y = fields(2)
	      (id,(x,y))
	    })
	
	   // splited.collect().foreach(println(_))
	
	    //3. Count the time that each user stays in each base station
	
	    val reducted = splited.reduceByKey(_+_)
	
	   // reducted.collect().foreach(println(_))
	
	    //((phone,base),time)
	    val pmt = reducted.map(x=>{
	
	      //(base station ID, (mobile number, time))
	      //X. 1 corresponds to (mobile,lac) in tuple (mobile,lac, time)
	      //X. 2 corresponds to time in tuple (mobile,lac, time)
	      ((x._1._2),(x._1._1,x._2))
	
	    })
	
	
	
	    //Result after connection join [(base station ID, ((mobile number, time), (longitude, latitude))]
	
	    val joined:RDD[(String, ((String, Long), (String, String)))] = pmt.join(alcsplited)
	
	
	    //Group by phone number
	    //_.: represents the cell phone number, time, longitude and latitude of the base station
	    //_. 2: mobile number, time longitude, latitude
	    //_. 2? 1: mobile number, time
	    //_. 2. 1. Represents the mobile phone number
	    val MobileGroupBykey  = joined.groupBy(_._2._1._1)
	
	    val result = MobileGroupBykey.mapValues(_.toList.sortBy(_._2._1._2).reverse.take(2))
	
	    println(result.collect().toBuffer)
	
	    sc.stop()
	
	
	  }
	
	}

Keywords: Big Data Mobile network Apache Spark

Added by compguru910 on Tue, 10 Dec 2019 21:16:10 +0200