-
Spark's data reading and data saving can be distinguished from two dimensions: file format and file system.
-
The file format is divided into Text file, Json file, Csv file, Sequence file and Object file
-
File system is divided into local file system, HDFS and database
1, Reading and saving of file data
1.1 Text file
- Data reading: textFile(String)
- Data saving: saveAsTextFile(String)
- code implementation
object Operate_Text { def main(args: Array[String]): Unit = { //1. Create SparkConf and set App name val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2. Create SparkContext, which is the entry to submit Spark App val sc: SparkContext = new SparkContext(conf) //3.1 reading input files val inputRDD: RDD[String] = sc.textFile("input/1.txt") //3.2 saving data inputRDD.saveAsTextFile("output") //4. Close the connection sc.stop() } }
- Note: if it is a cluster path: hdfs://master:9000/input/1.txt
1.2 Json file
- If each line in the JSON file is a JSON record, you can read the JSON file as a text file, and then use the relevant JSON library to JSON parse each piece of data.
- Data preparation: Create 1. 0 in the input directory Txt file, which stores the following contents
{"username": "zhangsan","age": 20} {"username": "lisi","age": 18} {"username": "wangwu","age": 16}
- code implementation
object Operate_Json { def main(args: Array[String]): Unit = { //1. Create SparkConf and set App name val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2. Create SparkContext, which is the entry to submit Spark App val sc: SparkContext = new SparkContext(conf) //3.1 read Json input file val jsonRDD: RDD[String] = sc.textFile("input/user.json") //3.2 import the package needed to parse Json and parse Json import scala.util.parsing.json.JSON val resultRDD: RDD[Option[Any]] = jsonRDD.map(JSON.parseFull) //3.3 printing results resultRDD.collect().foreach(println) //4. Close the connection sc.stop() } }
- Note: reading JSON files using RDD is very complicated. At the same time, SparkSQL integrates a good way to process JSON files, so SparkSQL is mostly used to process JSON files in applications.
1.3 Sequence file
- SequenceFile file is a flat file designed by Hadoop to store key value pairs in binary form. In SparkContext, you can call sequenceFile[keyClass, valueClass](path).
- code implementation
object Operate_Sequence { def main(args: Array[String]): Unit = { //1. Create SparkConf and set App name val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2. Create SparkContext, which is the entry to submit Spark App val sc: SparkContext = new SparkContext(conf) //3.1 create rdd val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2),(3,4),(5,6))) //3.2 save data as SequenceFile dataRDD.saveAsSequenceFile("output") //3.3 reading SequenceFile file sc.sequenceFile[Int,Int]("output").collect().foreach(println) //4. Close the connection sc.stop() } }
- Note: the SequenceFile file is only for PairRDD
1.4 Object file
- Object file is a file saved after serializing objects, which adopts Java serialization mechanism. You can receive a path through the objectFile[k,v](path) function, read the object file and return the corresponding RDD. You can also call saveAsObjectFile() to output the object file. Because it is serialization, the type should be specified.
- code implementation
object Operate_Object { def main(args: Array[String]): Unit = { //1. Create SparkConf and set App name val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2. Create SparkContext, which is the entry to submit Spark App val sc: SparkContext = new SparkContext(conf) //3.1 create RDD val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4)) //3.2 saving data dataRDD.saveAsObjectFile("output") //3.3 reading data sc.objectFile[(Int)]("output").collect().foreach(println) //4. Close the connection sc.stop() } }
2, File system data reading and saving
2.1 HDFS
- The whole ecosystem of Spark is fully compatible with Hadoop, so Spark also supports the file types or database types supported by Hadoop. In addition, since there are two versions of Hadoop API, old and new, Spark also provides two sets of creation operation interfaces in order to be compatible with all versions of Hadoop. Hadoop RDD and newhadoop RDD are the two most abstract function interfaces for external storage creation operations
2.2 MySQL
- Support accessing relational database through Java JDBC. It needs to be through JdbcRDD. The example is as follows:
- Add Maven dependency
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency>
- Read data from MySQL
package com.spark.day06 import org.apache.spark.rdd.JdbcRDD import org.apache.spark.{SparkConf, SparkContext} import java.sql.DriverManager /* * Read data from MySQL database * * sc: SparkContext, Spark Entry of program execution, context object getConnection: () => Connection, Get database connection sql: String, Execute SQL statement lowerBound: Long, Start position of query upperBound: Long, End position of query numPartitions: Int, Number of partitions mapRow: (ResultSet) => T Process the result set * * jdbc Connect database * Register driver * Get connection * Create database operation object PrepareStatement * Execute SQL * Processing result set * Close connection * * */ object Spark04_MySQL_read { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Test").setMaster("local[*]") val sc = new SparkContext(conf) // Create RDD // Four elements of database connection: var driver = "com.mysql.jdbc.Driver" var url = "jdbc:mysql://172.23.4.221:3306/test" var username = "root" var password = "123456" var sql = "select * from wyk_csdn where id >= ? and id <= ?" val resRDD = new JdbcRDD( sc, () => { // Register driver Class.forName(driver) // Get connection DriverManager.getConnection(url, username, password) }, sql, 1, 20, 2, rs => (rs.getInt(1), rs.getString(2), rs.getString(3)) ) resRDD.collect().foreach(println) sc.stop() } }
- Write data to MySQL
package com.spark.day06 import org.apache.spark.rdd.{JdbcRDD, RDD} import org.apache.spark.{SparkConf, SparkContext} import java.sql.{Connection, DriverManager, PreparedStatement} /* * Write data from MySQL database * * * jdbc Connect database * Register driver * Get connection * Create database operation object PrepareStatement * Execute SQL * Processing result set * Close connection * * */ object Spark05_MySQL_write { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Test").setMaster("local[*]") val sc = new SparkContext(conf) // Create RDD // Four elements of database connection: var driver = "com.mysql.jdbc.Driver" var url = "jdbc:mysql://172.23.4.221:3306/test" var username = "root" var password = "123456" val rdd: RDD[(Int, String, String)] = sc.makeRDD(List((1, "banzhang", "2021-05-20 10:18:35"))) /// // The following code needs ps to realize serialization, but ps is not our own defined type, so there is no way to implement it // //Register driver // Class.forName(driver) // // //Get connection // val conn: Connection = DriverManager.getConnection(url, username, password) // // //SQL statement declaring database operations // var sql:String = "insert into wyk_csdn(id, name, ins_ts) values(?, ?, ?)" // // //Create database operation object // val ps: PreparedStatement = conn.prepareStatement(sql) // // // //Create a connection object in the loop body. Each time you traverse an element in the RDD, you need to create a connection object. It is inefficient and is not recommended // rdd.foreach{ // case (id, name, ins_ts) => { // // // //Assign values to parameters // ps.setInt(1, id) // ps.setString(2, name) // ps.setString(3,ins_ts) // // //Execute SQL statement // ps.executeUpdate() // } // } // //Close connection // ps.close() // conn.close() /// rdd.foreachPartition{ // Data is the data of a partition of rdd datas => { // Register driver Class.forName(driver) // Get connection val conn: Connection = DriverManager.getConnection(url, username, password) // SQL statement declaring database operations var sql:String = "insert into wyk_csdn(id, name, ins_ts) values(?, ?, ?)" // Create database operation object val ps: PreparedStatement = conn.prepareStatement(sql) // Traverse the data in the current partition // The foreach here is not an operator, but a set method datas.foreach{ case (id, name, ins_ts) => { // Assign values to parameters ps.setInt(1, id) ps.setString(2, name) ps.setString(3,ins_ts) // Execute SQL statement ps.executeUpdate() } } ps.close() conn.close() } } sc.stop() } }