1. Spark overview
1.1 what is Spark
Spark is a fast, universal and scalable big data analysis framework based on memory.
1.2 Hadoop and Spark
-
Hadoop: a one-time computing framework based on disk, which is not suitable for iterative computing. When processing data, the framework will flush the storage device to read out the data, carry out logical processing, and then re store the processing results in the media.
-
Spark: memory based, suitable for iterative computing, fast task startup, but memory will be limited.
1.3 Spark core module
- Spark Core: provides rich API s for other modules.
- Spark Streaming: the streaming computing component provided by spark.
- Spark SQL: a component used to call HIve and relational database.
- Spark MLlib: Machine Learning
- Spark GraphX: a framework and algorithm library for graphics computing.
2. Get started quickly
Self installation environment
- Scala: 2.12.11
- Spark: 3.0.0
2.1 project directory structure
spark-learning -- spark-core
2.2 three ways to implement WordCount
- First kind
import org.apache.spark.{SparkConf, SparkContext} object Spark01_WordCount { def main(args: Array[String]): Unit = { // 1. Create a connection val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(sparkConf) // 2. Execute business logic val lines = sc.textFile("datas") // Flatten words val words = lines.flatMap(_.split(" ")) // Group words val wordGroup = words.groupBy(word => word) val value = wordGroup.map { case (word, list) => (word, list.size) } // Print value.foreach(println) // 3. Close the connection sc.stop } }
- Second
import org.apache.spark.{SparkConf, SparkContext} object Spark02_WordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(sparkConf) val lines = sc.textFile("datas") val words = lines.flatMap(_.split(" ")) val wordToOne = words.map( word => (word, 1) ) val wordGroup = wordToOne.groupBy( t => t._1 ) val value = wordGroup.map { case (word, list) => list.reduce( (t1, t2) => { (t1._1, t2._2 + t1._2) } ) } // Print value.foreach(println) // 3. Close the connection sc.stop } }
- Third
import org.apache.spark.{SparkConf, SparkContext} object Spark03_WordCount { def main(args: Array[String]): Unit = { // 1. Create a connection val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(sparkConf) // 2. Execute business logic val lines = sc.textFile("datas") // Flatten words val words = lines.flatMap(_.split(" ")) // Group words val wordGroup = words.map( word => (word, 1) ) val value = wordGroup.reduceByKey(_ + _) // Print value.foreach(println) // 3. Close the connection sc.stop } }
3. Spark operating environment
Since Hadoop environment has been configured before, the relevant configuration of Hadoop will not be repeated here.
3.1 Local environment
The environment that can run without any other nodes is the local environment, and the development environment runs alone in idea.
3.1.1 environmental installation
# decompression [hadoop@hadoop102 module]$ tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module/ # rename [hadoop@hadoop102 module]$ cd /opt/module/ [hadoop@hadoop102 module]$ mv spark-3.0.0-bin-hadoop3.2/ spark-local
3.1.2 test whether the installation is successful
The following interface indicates that the installation is successful.
[hadoop@hadoop102 spark-local]$ bin/spark-shell 21/10/18 13:44:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://hadoop102:4040 Spark context available as 'sc' (master = local[*], app id = local-1634535874212). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.0.0 /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_212) Type in expressions to have them evaluated. Type :help for more information. scala>
3.1.3 submission of work
bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ./examples/jars/spark-examples_2.12-3.0.0.jar 10
3.2 Standalone mode
Independent deployment mode. It embodies the classic master-slave mode.
- Cluster planning
hadoop102 | hadoop103 | Hadoop104 | |
---|---|---|---|
Spark | Worker,Matser | Worker | Worker |
3.2.1 environmental installation
# decompression [hadoop@hadoop102 spark]$ tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module/ # rename [hadoop@hadoop102 module]$ cd /opt/module/ [hadoop@hadoop102 module]$ mv spark-3.0.0-bin-hadoop3.2/ spark-standalone
3.2.2 configuration
# rename [hadoop@hadoop102 conf]$ mv slaves.template slaves # Configure slaves [hadoop@hadoop102 conf]$ vim slaves hadoop102 hadoop103 hadoop104 # rename [hadoop@hadoop102 conf]$ mv spark-env.sh.template spark-env.sh # Add configuration export JAVA_HOME=/opt/module/jdk1.8.0_212 SPARK_MASTER_HOST=hadoop102 SPARK_MASTER_PORT=7077
3.2.3 distribution
[hadoop@hadoop102 conf]$ xsync /opt/module/spark-standalone/
3.2.4 start service
The problem here is that the spark program I installed earlier is configured with environment variables, so when starting the service, I can only start the Spark Program in the original directory. After reading the log, I find that spark_home is wrong. You need to comment out the original configuration.
[hadoop@hadoop102 spark-standalone]$ sbin/start-all.sh starting org.apache.spark.deploy.master.Master, logging to /opt/module/spark-standalone/logs/spark-hadoop-org.apache.spark.deploy.master.Master-1-hadoop102.out hadoop102: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-hadoop-org.apache.spark.deploy.worker.Worker-1-hadoop102.out hadoop104: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-hadoop-org.apache.spark.deploy.worker.Worker-1-hadoop104.out hadoop103: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-hadoop-org.apache.spark.deploy.worker.Worker-1-hadoop103.out [hadoop@hadoop102 spark-standalone]$ xcall.sh jps ============hadoop102================ 1700 Jps 1560 Master 1644 Worker ============hadoop103================ 1249 Worker 1301 Jps ============hadoop104================ 1249 Jps 1195 Worker
3.2.5 viewing web pages
http://hadoop102:8080/
3.2.6 submission of work
bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://hadoop102:7077 ./examples/jars/spark-examples_2.12-3.0.0.jar 10
- Class: the class containing the main program in the Spark program
- Master: Spark is used to specify the execution mode local and master( spark://hadoop102:7077 ),yarn
- Executor memory 1g: Specifies the available memory for each executor
- Total executors core 2: Specifies the number of CPU cores available to all executors
- Application jar: packaged application jar. The name of this parameter may not be written
- application- arguments: parameters passed to the main method
3.2.7 configuration history service
# rename profile [hadoop@hadoop102 conf]$ mv spark-defaults.conf.template spark-defaults.conf # Add content spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop102:8020/spark-log # Start hdfs [hadoop@hadoop102 conf]$ start-dfs.sh # Create directory [hadoop@hadoop102 conf]$ hadoop fs -mkdir /spark-log # Modify spark-env.sh [hadoop@hadoop102 spark-standalone]$ vim conf/spark-env.sh # history server export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/spark-log -Dspark.history.retainedApplications=30" # Restart the service [hadoop@hadoop102 spark-standalone]$ sbin/stop-all.sh [hadoop@hadoop102 spark-standalone]$ sbin/start-all.sh [hadoop@hadoop102 spark-standalone]$ sbin/start-history-server.sh # Accessing web history services http://hadoop102:18080/
3.2.8 configure high availability (HA)
# Modify profile [hadoop@hadoop102 spark-standalone]$ vim conf/spark-env.sh # Comment out the next two lines # SPARK_MASTER_HOST=hadoop102 # SPARK_MASTER_PORT=7077 # Add the following configuration export SPARK_MASTER_WEBUI_PORT=8899 export SPARK_DAEMON_JAVA_OPTS=" -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop102,hadoop103,hadoop104 -Dspark.deploy.zookeeper.dir=/spark" # distribute [hadoop@hadoop102 spark-standalone]$ xsync conf/ # Start zk [hadoop@hadoop102 spark-standalone]$ zk.sh start # Restart service [hadoop@hadoop102 spark-standalone]$ sbin/stop-all.sh [hadoop@hadoop102 spark-standalone]$ sbin/start-all.sh [hadoop@hadoop102 spark-standalone]$ sbin/stop-history-server.sh [hadoop@hadoop102 spark-standalone]$ sbin/start-history-server.sh # Hadoop 103 start master [hadoop@hadoop103 spark-standalone]$ sbin/start-master.sh # View the status of two master services, one active and one standby http://hadoop103:8899/ http://hadoop102:8899/
3.3 Yarn mode
Spark framework provides its own computing resources in Standalone mode without integration with other frameworks, reducing the coupling. However, spark is a computing framework, not a resource scheduling framework, so providing resource scheduling is not his strong point, so it is better to integrate with other frameworks. More domestic use of the yarn model, more foreign use of mesos.
3.3.1 installation
# decompression [hadoop@hadoop102 spark]$ tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module/ # rename [hadoop@hadoop102 spark]$ cd /opt/module/ [hadoop@hadoop102 module]$ mv spark-3.0.0-bin-hadoop3.2/ spark-yarn
3.3.2 configuration
# rename [hadoop@hadoop102 conf]$ mv spark-env.sh.template spark-env.sh # Add content export JAVA_HOME=/opt/module/jdk1.8.0_212 export HADOOP_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop
3.3.3 start service
There is no need to start the server here, just start Hdfs and Yarn.
[hadoop@hadoop102 spark-yarn]$ sbin/start-all.sh
3.3.4 submitting tasks
[hadoop@hadoop102 spark-yarn]$ bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster ./examples/jars/spark-examples_2.12-3.0.0.jar 10
3.3.5 configuration history service
# Modify profile [hadoop@hadoop102 conf]$ mv spark-defaults.conf.template spark-defaults.conf # Add content spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop102:8020/spark-log # Modify spark-env.sh [hadoop@hadoop102 conf]$ vim spark-env.sh # history server export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/spark-log -Dspark.history.retainedApplications=30" # Resubmit task bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client ./examples/jars/spark-examples_2.12-3.0.0.jar 10 # View history server http://hadoop102:18080/ # View yarn task submission http://hadoop103:8088/cluster
3.3.6 two modes of yarn
- Client: used for testing. Since the Driver runs on the local client and is responsible for scheduling application s, interacting with the yarn cluster will generate a lot of network communication, resulting in a surge in network card traffic. The advantage is that you can see the log for debugging.
- Cluster: used for production. Because the Driver runs in NodeManager, there is no surge in network card traffic. However, since the logs are all on the server, it is difficult to view the logs. You need to go to the server.
Therefore, only one mode needs to be enabled on the server, and two modes do not need to be enabled, resulting in resource competition.
3.4 comparison of three deployment modes
pattern | Spark installation server | Process to start | Owner | Application scenario |
---|---|---|---|---|
Local | 1 | nothing | Spark | test |
Standalone | 3 | Master,Worker | Spark | Separate deployment |
Yarn | 1 | Yarn,HDFS | Hadoop | Hybrid deployment |
3.5 port number
- 4040: viewing spark shell tasks
- 7077: Spark Master internal communication address
- 8080: Spark Master Web resource viewing page in Standalone mode
- 18080: History server port
- 8088: Yan task running status query port
4. Spark core concept
The core of Spark is a computing engine. On the whole, it adopts the structure of matser slave, which is a standard master-slave structure.
The driver represents the master and is responsible for the job task scheduling of the whole cluster. The executor is responsible for task execution on behalf of the slave.
4.2 Driver
The Spark driver node is used to execute the Spark task main method and is responsible for the execution of the actual code. Dirver's main responsibilities in Spark operation:
- Convert user programs to jobs
- Scheduling tasks between executors
- Track Executor execution
- Display task operation through UI
4.3 Executor
Core functions:
- Be responsible for running the tasks that make up the Spark application and returning the results to the drive
- They provide users with memory for RDD operation through their own block manager. RDD is directly cached in the Executor, so tasks can speed up operations at run time.
4.4 Master&Worker
- Master: it is mainly responsible for resource scheduling and allocation and monitoring clusters, similar to the resource manager in Yan
- Worker: similar to NodeManager in Yan
4.5 ApplicationMatser
As a bridge between Driver and Master.
5. Spark core programming
Spark computing framework encapsulates three data structures to handle different application scenarios in order to process data with high concurrency and high throughput. The three data structures are:
- RDD: elastic distributed dataset
- Accumulator: distributed shared write only variable
- Broadcast variables: distributed shared read-only variables
The processing mode of RDD is similar to that of IO stream and also has decorator mode.
RDD will execute business logic only when the collect method is called, and the previous operations are some encapsulation.
RDD does not save data, but IO will save some data.