Background: sometimes we need to define an external data source and use spark sql to process it. There are two benefits:
(1) after defining the external data source, it is very simple to use, and the software architecture is clear. It can be used directly through sql.
(2) it is easy to divide modules into layers and build them up layer by layer, and it is easy to shield implementation details.
At this time, we need to use the method of defining external data sources. It is also very simple to use in spark, so-called meeting is not difficult.
First, specify a package name, and all classes are under the unified package. For example, com.example.hou.
Then define two things, one is DefaultSource, and the other is a subclass of BaseRelation with TableScan.
DefaultSource's code is very simple. If you look at the code directly, it doesn't explain:
package com.example.hou import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, SchemaRelationProvider} import org.apache.spark.sql.types.StructType class DefaultSource extends CreatableRelationProvider with SchemaRelationProvider{ def createRelation( sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = { val path = parameters.get("path") path match { case Some(x) => new TextDataSourceRelation(sqlContext,x,schema) case _ => throw new IllegalArgumentException("path is required...") } } override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = { createRelation(sqlContext,parameters,null) } }
Source code of TextDataSourceRelation:
package com.example.hou import org.apache.spark.sql.types.LongType import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources.TableScan import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.internal.Logging import org.apache.spark.sql.Row class TextDataSourceRelation (override val sqlContext: SQLContext,path:String,userSchema: StructType) extends BaseRelation with TableScan with Logging{ //If the schema passed in is not empty, the schema passed in will be used; otherwise, the custom schema will be used override def schema: StructType = { if(userSchema != null){ userSchema }else{ StructType( StructField("id",LongType,false) :: StructField("name",StringType,false) :: StructField("gender",StringType,false) :: StructField("salary",LongType,false) :: StructField("comm",LongType,false) :: Nil ) } } //Read in the data and then convert it to RDD[Row] override def buildScan(): RDD[Row] = { logWarning("this is ruozedata buildScan....") //Read data, change to RDD //The whole textfiles will read in the file name. You can remove the file name through map (". 2). The first is the file name, and the second is the content val rdd = sqlContext.sparkContext.wholeTextFiles(path).map(_._2) //Get schema val schemaField = schema.fields //rdd.collect().foreach(println) //rdd + schemaField parse rdd and schemaField and put them together val rows = rdd.map(fileContent => { //Get the data for each row val lines = fileContent.split("\n") //Each row of data is separated by commas, followed by spaces, and then converted into a seq set val data = lines.filter(line=>{!line.trim().contains("//")}).map(_.split(",").map(_.trim)).toSeq //zipWithIndex val result = data.map(x => x.zipWithIndex.map { case (value, index) => { val columnName = schemaField(index).name //There are two parameters in castTo. The first parameter needs to be judged. If the field is gender, then judge and convert it. If it is not gender, use this field directly Utils.castTo(if(columnName.equalsIgnoreCase("gender")){ if(value == "0"){ "man" }else if(value == "1"){ "woman" } else{ "unknown" } }else{ value },schemaField(index).dataType) } }) result.map(x => Row.fromSeq(x)) }) rows.flatMap(x => x) } }
The last sentence is used in the Main method:
package com.example.hou import org.apache.spark.sql.SparkSession import org.apache.spark.SparkConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.LongType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType object TestApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("TextApp") .master("local[2]") .getOrCreate() //Define Schema val schema = StructType( StructField("id", LongType, false) :: StructField("name", StringType, false) :: StructField("gender", StringType, false) :: StructField("salary", LongType, false) :: StructField("comm", LongType, false) :: Nil) //Just write the package name... example.hou, don't write it like this... example.hou.DefaultSource val df = spark.read.format("com.example.hou") .option("path", "C://code//data.txt").schema(schema).load() df.show() df.createOrReplaceTempView("test") spark.sql("select name,salary from test").show() println("Application Ended...") spark.stop() } }
Data type conversion:
package com.example.hou import org.apache.spark.sql.types.DataType import org.apache.spark.sql.types.LongType import org.apache.spark.sql.types.StringType object Utils { def castTo(value:String,dataType:DataType) ={ dataType match { case _:LongType =>value.toLong case _:StringType => value } } }