Reading and saving of SparkCore data

  • Spark's data reading and data saving can be distinguished from two dimensions: file format and file system.

  • The file format is divided into Text file, Json file, Csv file, Sequence file and Object file

  • File system is divided into local file system, HDFS and database

1, Reading and saving of file data

1.1 Text file

  • Data reading: textFile(String)
  • Data saving: saveAsTextFile(String)
  • code implementation
object Operate_Text {

    def main(args: Array[String]): Unit = {

        //1. Create SparkConf and set App name
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")

        //2. Create SparkContext, which is the entry to submit Spark App
        val sc: SparkContext = new SparkContext(conf)

        //3.1 reading input files
        val inputRDD: RDD[String] = sc.textFile("input/1.txt")

        //3.2 saving data
        inputRDD.saveAsTextFile("output")

        //4. Close the connection
        sc.stop()
    }
}
  • Note: if it is a cluster path: hdfs://master:9000/input/1.txt

1.2 Json file

  • If each line in the JSON file is a JSON record, you can read the JSON file as a text file, and then use the relevant JSON library to JSON parse each piece of data.
  • Data preparation: Create 1. 0 in the input directory Txt file, which stores the following contents
{"username": "zhangsan","age": 20}
{"username": "lisi","age": 18}
{"username": "wangwu","age": 16}
  • code implementation
object Operate_Json {

    def main(args: Array[String]): Unit = {

        //1. Create SparkConf and set App name
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")

        //2. Create SparkContext, which is the entry to submit Spark App
        val sc: SparkContext = new SparkContext(conf)

        //3.1 read Json input file
        val jsonRDD: RDD[String] = sc.textFile("input/user.json")

        //3.2 import the package needed to parse Json and parse Json
        import scala.util.parsing.json.JSON
        val resultRDD: RDD[Option[Any]] = jsonRDD.map(JSON.parseFull)

        //3.3 printing results
        resultRDD.collect().foreach(println)

        //4. Close the connection
        sc.stop()
    }
}
  • Note: reading JSON files using RDD is very complicated. At the same time, SparkSQL integrates a good way to process JSON files, so SparkSQL is mostly used to process JSON files in applications.

1.3 Sequence file

  • SequenceFile file is a flat file designed by Hadoop to store key value pairs in binary form. In SparkContext, you can call sequenceFile[keyClass, valueClass](path).
  • code implementation
object Operate_Sequence {

    def main(args: Array[String]): Unit = {

        //1. Create SparkConf and set App name
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")

        //2. Create SparkContext, which is the entry to submit Spark App
        val sc: SparkContext = new SparkContext(conf)

        //3.1 create rdd
        val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2),(3,4),(5,6)))

        //3.2 save data as SequenceFile
        dataRDD.saveAsSequenceFile("output")

        //3.3 reading SequenceFile file
        sc.sequenceFile[Int,Int]("output").collect().foreach(println)

        //4. Close the connection
        sc.stop()
    }
}
  • Note: the SequenceFile file is only for PairRDD

1.4 Object file

  • Object file is a file saved after serializing objects, which adopts Java serialization mechanism. You can receive a path through the objectFile[k,v](path) function, read the object file and return the corresponding RDD. You can also call saveAsObjectFile() to output the object file. Because it is serialization, the type should be specified.
  • code implementation
object Operate_Object {

    def main(args: Array[String]): Unit = {

        //1. Create SparkConf and set App name
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")

        //2. Create SparkContext, which is the entry to submit Spark App
        val sc: SparkContext = new SparkContext(conf)

        //3.1 create RDD
        val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4))

        //3.2 saving data
        dataRDD.saveAsObjectFile("output")

        //3.3 reading data
        sc.objectFile[(Int)]("output").collect().foreach(println)

        //4. Close the connection
        sc.stop()
    }
}

2, File system data reading and saving

2.1 HDFS

  • The whole ecosystem of Spark is fully compatible with Hadoop, so Spark also supports the file types or database types supported by Hadoop. In addition, since there are two versions of Hadoop API, old and new, Spark also provides two sets of creation operation interfaces in order to be compatible with all versions of Hadoop. Hadoop RDD and newhadoop RDD are the two most abstract function interfaces for external storage creation operations

2.2 MySQL

  • Support accessing relational database through Java JDBC. It needs to be through JdbcRDD. The example is as follows:
  • Add Maven dependency
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>
  • Read data from MySQL
package com.spark.day06

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.DriverManager


/*
* Read data from MySQL database
*
*   sc: SparkContext,   Spark Entry of program execution, context object
    getConnection: () => Connection,  Get database connection
    sql: String,  Execute SQL statement
    lowerBound: Long, Start position of query
    upperBound: Long, End position of query
    numPartitions: Int, Number of partitions
    mapRow: (ResultSet) => T  Process the result set
    *
    * jdbc Connect database
    *   Register driver
    *   Get connection
    *   Create database operation object PrepareStatement
    *   Execute SQL
    *   Processing result set
    *   Close connection
    *
* */

object Spark04_MySQL_read {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Test").setMaster("local[*]")
    val sc = new SparkContext(conf)

    // Create RDD
    // Four elements of database connection:
    var driver = "com.mysql.jdbc.Driver"
    var url = "jdbc:mysql://172.23.4.221:3306/test"
    var username = "root"
    var password = "123456"
    var sql = "select * from wyk_csdn where id >= ? and id <= ?"
    val resRDD = new JdbcRDD(
      sc,
      () => {
        // Register driver
        Class.forName(driver)

        // Get connection
        DriverManager.getConnection(url, username, password)
      },
      sql,
      1,
      20,
      2,
      rs => (rs.getInt(1), rs.getString(2), rs.getString(3))
    )

    resRDD.collect().foreach(println)

    sc.stop()
  }
}
  • Write data to MySQL
package com.spark.day06

import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{Connection, DriverManager, PreparedStatement}


/*
* Write data from MySQL database
*
    *
    * jdbc Connect database
    *   Register driver
    *   Get connection
    *   Create database operation object PrepareStatement
    *   Execute SQL
    *   Processing result set
    *   Close connection
    *
* */

object Spark05_MySQL_write {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Test").setMaster("local[*]")
    val sc = new SparkContext(conf)

    // Create RDD
    // Four elements of database connection:
    var driver = "com.mysql.jdbc.Driver"
    var url = "jdbc:mysql://172.23.4.221:3306/test"
    var username = "root"
    var password = "123456"

    val rdd: RDD[(Int, String, String)] = sc.makeRDD(List((1, "banzhang", "2021-05-20 10:18:35")))

///
//  The following code needs ps to realize serialization, but ps is not our own defined type, so there is no way to implement it
//    //Register driver
//    Class.forName(driver)
//
//    //Get connection
//    val conn: Connection = DriverManager.getConnection(url, username, password)
//
//    //SQL statement declaring database operations
//    var sql:String = "insert into wyk_csdn(id, name, ins_ts) values(?, ?, ?)"
//
//    //Create database operation object
//    val ps: PreparedStatement = conn.prepareStatement(sql)
//
//
//    //Create a connection object in the loop body. Each time you traverse an element in the RDD, you need to create a connection object. It is inefficient and is not recommended
//    rdd.foreach{
//      case (id, name, ins_ts) => {
//
//
//        //Assign values to parameters
//        ps.setInt(1, id)
//        ps.setString(2, name)
//        ps.setString(3,ins_ts)
//
//        //Execute SQL statement
//        ps.executeUpdate()
//      }
//    }
//    //Close connection
//    ps.close()
//    conn.close()
///

    rdd.foreachPartition{
      // Data is the data of a partition of rdd
      datas => {
        //  Register driver
        Class.forName(driver)

        // Get connection
        val conn: Connection = DriverManager.getConnection(url, username, password)

        // SQL statement declaring database operations
        var sql:String = "insert into wyk_csdn(id, name, ins_ts) values(?, ?, ?)"

        // Create database operation object
        val ps: PreparedStatement = conn.prepareStatement(sql)

        // Traverse the data in the current partition
        // The foreach here is not an operator, but a set method
        datas.foreach{
          case (id, name, ins_ts) => {


            // Assign values to parameters
            ps.setInt(1, id)
            ps.setString(2, name)
            ps.setString(3,ins_ts)

            // Execute SQL statement
            ps.executeUpdate()
          }
        }
        ps.close()
        conn.close()
      }
    }


    sc.stop()
  }
}

 

Added by misslilbit02 on Sat, 12 Feb 2022 04:17:51 +0200