Spark sparksql foundation, DataFrame, DataSet

Spark-SQL

summary

Spark SQL is a spark module used by spark for structured data processing.

For developers, SparkSQL can simplify the development of RDD, improve the development efficiency, and the execution efficiency is very fast. Therefore, in practical work, SparkSQL is basically used. In order to simplify the development of RDD and improve development efficiency, Spark SQL provides two programming abstractions, similar to RDD in Spark Core:
DataFrame
DataSet

SparkSQL features

Easy integration

Seamless integration of SQL query and Spark programming

Unified data access

Connect different data sources in the same way

Compatible with Hive

Run SQL or HiveQL directly on the existing warehouse. Hive is the only SQL on Hadoop tool running on Hadoop in the early stage.

Standard data connection

Connect via JDBC or ODBC

What is a DataFrame

In Spark, DataFrame is a distributed data set based on RDD, which is similar to two-dimensional tables in traditional databases.

The main difference between DataFrame and RDD is that the former has schema meta information, that is, each column of the two-dimensional table dataset represented by DataFrame has a name and type. This enables Spark SQL to gain insight into more structural information, so as to optimize the data sources hidden behind the DataFrame and the transformations acting on the DataFrame, and finally achieve the goal of greatly improving runtime efficiency. In contrast to RDD, because there is no way to know the specific internal structure of the stored data elements, Spark Core can only carry out simple and general pipeline optimization at the stage level.

DataFrame also supports nested data types (struct, array, and map). From the perspective of API usability, DataFrame API provides a set of high-level relational operations, which is more friendly and lower threshold than functional RDD API

The figure above intuitively shows the difference between DataFrame and RDD.

Although the RDD[Person] on the left takes Person as the type parameter, the Spark framework itself does not understand the internal structure of the Person class. The DataFrame on the right provides detailed structure information, so that Spark SQL can clearly know which columns are included in the dataset, and what is the name and type of each column.

DataFrame is a view that provides a Schema for data. It can be treated as a table in the database.

DataFrame is also lazy, but its performance is higher than RDD. The main reason is that the optimized execution plan, that is, the query plan is optimized through spark catalyst optimizer.

What is a DataSet

DataSet is a distributed data set. DataSet is a new abstraction added in Spark 1.6 and an extension of DataFrame. It provides the advantages of RDD (strong typing, the ability to use powerful lambda functions) and the advantages of SparkSQL optimized execution engine. DataSet can also use functional transformation (operation map,flatMap,filter, etc.).

DataSet is an extension of DataFrame API and the latest data abstraction of SparkSQL

User friendly API style, with both type safety check and query optimization characteristics of DataFrame;

Use the sample class to define the data structure information in the DataSet. The name of each attribute in the sample class is directly mapped to the field name in the DataSet;

DataSet is strongly typed. For example, there can be DataSet[Car],DataSet[Person].

DataFrame is a special column of DataSet, DataFrame=DataSet[Row], so you can convert DataFrame to DataSet through the as method. Row is a type. Like Car and Person, all table structure information is represented by row. The order needs to be specified when getting data

SparkSQL core programming

In Spark Core, if you want to execute an application, you need to first build the context object sparkcontext. Spark SQL can actually be understood as an encapsulation of Spark Core, not only on the model, but also on the context object.

In the old version, SparkSQL provides two SQL query starting points: one is SQLContext, which is used for SQL queries provided by Spark itself; One is called HiveContext, which is used to connect Hive queries.

SparkSession is the starting point of Spark's latest SQL query. It is essentially a combination of SQLContext and HiveContext. Therefore, the API s available on SQLContext and HiveContext can also be used on SparkSession. SparkSession encapsulates SparkContext internally, so the calculation is actually completed by SparkContext. When we use spark shell, the spark framework will automatically create a SparkSession object called spark, just as we can automatically obtain an sc to represent the SparkContext object in the past

DataFrame

The DataFrame API of Spark SQL allows us to use DataFrame without having to register temporary tables or generate SQL expressions. The DataFrame API has both transformation and action operations.

Create DataFrame

In Spark SQL, SparkSession is the entrance to create DataFrame and execute SQL. There are three ways to create DataFrame:

Create through Spark data source;

Convert from an existing RDD;

You can also query and return from HiveTable.

  1. Create from Spark data source

View the data source formats that Spark supports to create files

scala> spark.read.
csv format table text jdbc json textFile load option options orc parquet schema

Create a user. In the bin/data directory of spark JSON file

{"username":"zhangsan","age":20}
{"username":"lisi","age":30}
{"username":"wangwu","age":40}

Read json file to create DataFrame

scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

Note: if you get data from memory, spark can know what the data type is. If it is a number, it is treated as Int by default; However, the type of the number read from the file cannot be determined. Therefore, receiving it with bigint can be converted with Long, but not with Int

Show results

df.show

+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30|    lisi|
| 40|  wangwu|
+---+--------+

2) Convert from RDD

Discussed in subsequent chapters

3) Query from Hive Table and return

Discussed in subsequent chapters

SQL syntax

SQL syntax style means that we use SQL statements to query data. This style of query must be assisted by temporary view or global view

1) Read JSON file and create DataFrame

scala> val df = spark.read.json("data/user.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

2) Create a temporary table for the DataFrame

scala> df.createOrReplaceTempView("people")

3) Query the whole table through SQL statement

scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

4) Result display

scala> sqlDF.show
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30|    lisi|
| 40|  wangwu|
+---+--------+

Note: the general temporary table is within the Session range. If you want to be effective within the application range, you can use the global temporary table. Full path access is required when using global temporary tables, such as global_temp.people

5) Create a global table for the DataFrame

scala> df.createGlobalTempView("people")

6) Query the whole table through SQL statement

scala> spark.sql("SELECT * FROM global_temp.people").show()

+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30|    lisi|
| 40|  wangwu|
+---+--------+
scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30|    lisi|
| 40|  wangwu|
+---+--------+

DSL syntax

DataFrame provides a domain specific language (DSL) to manage structured data. You can use DSL in Scala, Java, Python and R. you don't have to create temporary views using DSL syntax style

1) Create a DataFrame

scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2) View Schema information of DataFrame

scala> df.printSchema
root
|-- age: Long (nullable = true)
|-- username: string (nullable = true)

3) View only the "username" column data,

scala> df.select("username").show()
+--------+
|username|
+--------+
|zhangsan|
|lisi    |
|wangwu  |
+--------+

Note: when operations are involved, each column must use $, or use quotation mark expression: single quotation mark + field name

scala> df.select($"username",$"age" + 1).show
scala> df.select('username, 'age + 1).show()

scala> df.select('username, 'age + 1 as "newage").show()
+--------+---------+
|username|(age + 1)|
+--------+---------+
|zhangsan|21|
|lisi    |31|
|wangwu  |41|
+--------+---------+

5) View data with age greater than 30

scala> df.filter($"age">30).show
+---+---------+
|age| username|
+---+---------+
| 40|   wangwu|
+---+---------+

6) Group by "age" to view the number of data pieces

scala> df.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 20|1|
| 30|1|
| 40|1|
+---+-----+

RDD to DataFrame

When developing a program in IDEA, if RDD and DF or DS need to operate with each other, import spark needs to be introduced implicits._

The spark here is not the package name in Scala, but the variable name of the created sparksession object, so you must create a sparksession object before importing. The spark object here cannot be declared with var, because Scala only supports the introduction of val decorated objects.

No import is required in spark shell, and this operation is completed automatically.

.toDF

scala> val idRDD = sc.textFile("data/id.txt")
scala> idRDD.toDF("id").show
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
+---+

In actual development, RDD is generally transformed into DataFrame through sample classes

scala> case class User(name:String, age:Int)
defined class User

scala> sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1,t._2)).toDF.show
+--------+---+
|name    |age|
+--------+---+
|zhangsan| 30|
|    lisi| 40|
+--------+---+

Convert DataFrame to RDD

DataFrame is actually the encapsulation of RDD, so you can directly obtain the internal RDD

.rdd

scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1,t._2)).toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[46]at rdd at <console>:25

scala> val array = rdd.collect
array: Array[org.apache.spark.sql.Row] = Array([zhangsan,30], [lisi,40])

Note: the RDD storage type obtained at this time is Row

scala> array(0)
res28: org .apache.spark.sql.Row = [zhangsan,30]
scala> array(0)(0)
res29: Any = zhangsan
scala> array(0).getAs[String]("name")
res30: String = zhangsan

DataSet

DataSet is a strongly typed data set, and the corresponding type information needs to be provided.

Create DataSet

  1. Create DataSet using sample class sequence
scala> case class Person(name: String, age: Long)
defined class Person

scala> val caseClassDS = Seq(Person("zhangsan",2)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]

scala> caseClassDS.show
+---------+---+
|     name|age|
+---------+---+
| zhangsan| 2|
+---------+---+
  1. Create a DataSet using a sequence of basic types
scala> val ds = Seq(1,2,3,4,5).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.show
+-----+
|value|
+-----+
|1|
|2|
|3|
|4|
|5|
+-----+

Note: in actual use, it is rarely used to convert a sequence into a DataSet, and it is more used to obtain a DataSet through RDD

RDD to DataSet

SparkSQL can automatically convert the RDD containing the case class into a dataset. The case class defines the structure of the table, and the case class attribute becomes the column name of the table through reflection. Case classes can contain complex structures such as Seq or Array.

scala> case class User(name:String, age:Int)
defined class User

scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1,t._2)).toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

Convert DataSet to RDD

DataSet is actually an encapsulation of RDD, so you can directly obtain the internal RDD

scala> case class User(name:String, age:Int)
defined class User

scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1,t._2)).toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

scala> val rdd = res11.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at<console>:25

scala> rdd.collect
res12: Array[User] = Array(User(zhangsan,30), User(lisi,49))

DataFrame and DataSet conversion

DataFrame is actually a special case of DataSet, so they can be converted to each other.

Convert DataFrame to DataSet

scala> case class User(name:String, age:Int)
defined class User

scala> val df = sc.makeRDD(List(("zhangsan",30),("lisi",49))).toDF("name","age")
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

Convert DataSet to DataFrame

scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

Relationship among RDD, DataFrame and DataSet

In SparkSQL, Spark provides us with two new abstractions: DataFrame and DataSet. First, from the perspective of version generation:

➢ Spark1.0 => RDD

➢ Spark1.3 => DataFrame

➢ Spark1.6 => Dataset

If the same data is given to these three data structures, they will give the same results after calculation respectively. The difference is their execution efficiency and execution mode.

In later Spark versions, DataSet may gradually replace RDD and DataFrame as the only API interface.

The commonness of the three

RDD, DataFrame and DataSet are all distributed elastic data sets under spark platform, which provide convenience for processing super large data;

They all have an inert mechanism. When creating and converting, such as map methods, they will not be executed immediately. They will start traversing operations only when they encounter actions such as foreach;

The three have many common functions, such as filter, sorting, etc;

This package is required for many operations on DataFrame and Dataset: import spark implicits._ (try to import directly after creating the SparkSession object)

All three will automatically cache operations according to Spark's memory conditions, so that even if the amount of data is large, there is no need to worry about memory overflow

All three have the concept of partition

Both DataFrame and DataSet can use pattern matching to obtain the value and type of each field

The difference between the three

RDD
RDD is generally used together with spark mllib
The sparksql operation is not supported by RDD

DataFrame

Different from RDD and Dataset, the type of each Row of DataFrame is fixed as Row, and the value of each column cannot be accessed directly. The value of each field can be obtained only through parsing

DataFrame and DataSet are generally not used together with spark mllib

Both DataFrame and DataSet support SparkSQL operations, such as select and groupby. They can also register temporary tables / windows for sql statement operations

DataFrame and DataSet support some particularly convenient saving methods, such as saving as csv and bringing the header, so that the field name of each column is clear at a glance

DataSet

DataSet and DataFrame have exactly the same member functions, except that the data type of each row is different. DataFrame is actually a special case of DataSet. type DataFrame = Dataset[Row]

DataFrame can also be called Dataset[Row]. The type of each Row is Row, which is not parsed. It is impossible to know which fields each Row has and what types each field is. You can only use the getAS method mentioned above or the pattern matching mentioned in Article 7 of the commonality to take out specific segments. In the Dataset, the type of each Row is not certain. After customizing the case class, you can freely obtain the information of each Row

The mutual transformation of the three

Keywords: Big Data hive Spark

Added by Asnom on Thu, 06 Jan 2022 08:03:44 +0200