Spark custom external data source

Background: sometimes we need to define an external data source and use spark sql to process it. There are two benefits:

(1) after defining the external data source, it is very simple to use, and the software architecture is clear. It can be used directly through sql.

(2) it is easy to divide modules into layers and build them up layer by layer, and it is easy to shield implementation details.

At this time, we need to use the method of defining external data sources. It is also very simple to use in spark, so-called meeting is not difficult.

First, specify a package name, and all classes are under the unified package. For example, com.example.hou.

Then define two things, one is DefaultSource, and the other is a subclass of BaseRelation with TableScan.

DefaultSource's code is very simple. If you look at the code directly, it doesn't explain:



package com.example.hou

import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType


class DefaultSource extends CreatableRelationProvider with SchemaRelationProvider{

  def createRelation(
                      sqlContext: SQLContext,
                      parameters: Map[String, String],
    schema: StructType): BaseRelation = {
    val path = parameters.get("path")

    path match {
      case Some(x) => new TextDataSourceRelation(sqlContext,x,schema)
      case _ => throw new IllegalArgumentException("path is required...")
    }
  }

  override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {

    createRelation(sqlContext,parameters,null)

  }
  
}

Source code of TextDataSourceRelation:



package com.example.hou

import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.TableScan
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row

class TextDataSourceRelation (override val sqlContext: SQLContext,path:String,userSchema: StructType) extends BaseRelation with TableScan with Logging{
  //If the schema passed in is not empty, the schema passed in will be used; otherwise, the custom schema will be used
  override def schema: StructType = {
    if(userSchema != null){
      userSchema
    }else{
      StructType(
        StructField("id",LongType,false) ::
          StructField("name",StringType,false) ::
          StructField("gender",StringType,false) ::
          StructField("salary",LongType,false) ::
          StructField("comm",LongType,false) :: Nil
      )
    }
  }

  //Read in the data and then convert it to RDD[Row]
  override def buildScan(): RDD[Row] = {
    logWarning("this is ruozedata buildScan....")
    //Read data, change to RDD
    //The whole textfiles will read in the file name. You can remove the file name through map (". 2). The first is the file name, and the second is the content
    val rdd = sqlContext.sparkContext.wholeTextFiles(path).map(_._2)
    //Get schema
    val schemaField = schema.fields

    //rdd.collect().foreach(println)

    //rdd + schemaField parse rdd and schemaField and put them together
    val rows = rdd.map(fileContent => {
      //Get the data for each row
      val lines = fileContent.split("\n")
      //Each row of data is separated by commas, followed by spaces, and then converted into a seq set
      val data = lines.filter(line=>{!line.trim().contains("//")}).map(_.split(",").map(_.trim)).toSeq

      //zipWithIndex
      val result = data.map(x => x.zipWithIndex.map {
        case (value, index) => {

          val columnName = schemaField(index).name
          //There are two parameters in castTo. The first parameter needs to be judged. If the field is gender, then judge and convert it. If it is not gender, use this field directly
          Utils.castTo(if(columnName.equalsIgnoreCase("gender")){
            if(value == "0"){
              "man"
            }else if(value == "1"){
              "woman"
            } else{
              "unknown"
            }
          }else{
            value
          },schemaField(index).dataType)

        }
      })

      result.map(x => Row.fromSeq(x))
    })

    rows.flatMap(x => x)

  }

}

The last sentence is used in the Main method:

package com.example.hou

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType

object TestApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("TextApp")
      .master("local[2]")
      .getOrCreate()

      //Define Schema
    val schema = StructType(
      StructField("id", LongType, false) ::
        StructField("name", StringType, false) ::
        StructField("gender", StringType, false) ::
        StructField("salary", LongType, false) ::
        StructField("comm", LongType, false) :: Nil)

    //Just write the package name... example.hou, don't write it like this... example.hou.DefaultSource
    val df = spark.read.format("com.example.hou")
      .option("path", "C://code//data.txt").schema(schema).load()

    df.show()
    df.createOrReplaceTempView("test")
    spark.sql("select name,salary from test").show()

    println("Application Ended...")
    spark.stop()
  }

}

Data type conversion:

package com.example.hou

import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StringType

object Utils {
  def castTo(value:String,dataType:DataType) ={
    dataType match {
      case _:LongType =>value.toLong
      case _:StringType => value
    }
  }

}

Keywords: Big Data Spark SQL Apache

Added by kinaski on Tue, 12 Nov 2019 21:13:18 +0200