Spark introduction and spark deployment, principle and development environment construction

Spark introduction and spark deployment, principle and development environment construction

Introduction to spark

Spark is a fast, universal and scalable big data analysis and calculation engine based on memory.

It is a general memory parallel computing framework developed by the AMP Laboratory (Algorithms, Machines, and People Lab) at the University of California, Berkeley

Spark is supported by many big data companies, including Hortonworks, IBM, Intel, Cloudera, MapR, pivot, Baidu, Alibaba, Tencent, JD, Ctrip and Youku Tudou. At present, baidu spark has been applied to big search, direct number, baidu big data and other businesses; Ali uses GraphX to build a large-scale graph calculation and graph mining system, and realizes the recommendation algorithms of many production systems; Tencent spark cluster has reached the scale of 8000 units, which is the largest known spark cluster in the world.

Version history

hadoop vs. version history

spark comparison version history

spark vs. MapReduce framework

RDD: Abstract elastic distributed datasets

spark built-in module

Spark Core: it realizes the basic functions of spark, including task scheduling, memory management, error recovery, interaction with storage system and other modules. Spark Core also contains API definitions for Resilient Distributed DataSet (RDD).

Spark SQL: a package used by spark to manipulate structured data. Through spark SQL, we can use SQL or HQL of Apache Hive version to query data. Spark SQL supports a variety of data sources, such as Hive table, Parquet and JSON.

Spark Streaming: a component provided by spark for streaming real-time data. The API used to operate data flow is provided and highly corresponds to the RDD API in Spark Core.

Common features of the machine learning library: mllib. It includes classification, regression, clustering, collaborative filtering, etc. it also provides additional support functions such as model evaluation and data import.

Spark GraphX: a component mainly used for graph parallel computing and graph mining system.
Cluster manager: Spark is designed to efficiently scale computing from one computing node to thousands of computing nodes. In order to meet such requirements and obtain maximum flexibility, Spark supports running on various cluster managers, including Hadoop YARN, Apache Mesos, and a simple scheduler built in Spark, called independent scheduler.

spark features

Fast running speed:

Compared with Hadoop MapReduce, Spark's memory based operation is more than 100 times faster, and the hard disk based operation is also more than 10 times faster. Spark implements an efficient DAG execution engine that can efficiently process data streams based on memory. The intermediate result of the calculation is in memory

Good usability:

Spark supports API s of Java, Python and Scala. It also supports more than 80 advanced algorithms, enabling users to quickly build different applications. Moreover, spark supports interactive Python and Scala shells. It is very convenient to use spark clusters in these shells to verify the solution to the problem

Strong versatility:

Spark provides a unified solution. Spark can be used for interactive query (Spark SQL), real-time streaming (Spark Streaming), machine learning (Spark MLlib) and graph calculation (GraphX). These different types of processing can be used seamlessly in the same application. It reduces the human cost of development and maintenance and the material cost of deploying the platform

High compatibility:

Spark can be easily integrated with other open source products. For example, spark can use Hadoop's YARN and Apache Mesos as its resource management and scheduler, and can process all Hadoop supported data, including HDFS, HBase, etc. This is particularly important for users who have deployed Hadoop clusters, because they can use the powerful processing power of spark without any data migration

spark deployment

Spark operation mode

Spark cluster deployment is generally divided into two modes: stand-alone mode and cluster mode
Most distributed frameworks support stand-alone mode, which is convenient for developers to debug the running environment of the framework. However, in a production environment, stand-alone mode is not used. Therefore, Spark clusters are deployed directly in the cluster mode.
The deployment modes currently supported by Spark are listed in detail below.

Local mode

Deploying a single spark service locally is more suitable for simply understanding the spark directory structure, getting familiar with configuration files, and simply running demo examples and other debugging scenarios.

Standalone mode

Spark's own task scheduling mode allows multiple spark machines to schedule tasks internally, but only spark's own task scheduling

YARN mode

Spark uses the YARN component of Hadoop for resource and task scheduling. In a real sense, spark cooperates with external docking.

Mesos mode

Spark uses the Mesos platform to schedule resources and tasks. Spark client directly connects to Mesos; There is no need to build an additional spark cluster.

(Mesos is a cluster management platform. It can be understood as the kernel of a distributed system, which is responsible for the allocation of cluster resources. The resources here refer to CPU resources, memory resources, storage resources, network resources, etc. in Mesos, Spark, Storm, Hadoop, Marathon and other frameworks can be run.)

spark official website

Introduction to Local mode deployment

Download the installation package: Official Website - > Download - > release Archives - > spark-2.1.1 - > spark-2.1.1-bin-hadoop 2 7.tgz Download

Unzip the Spark installation package

wangting@ops01:/opt/software >tar -xf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/

Directory rename

wangting@ops01:/opt/software >cd /opt/module/
wangting@ops01:/opt/module >mv spark-2.1.1-bin-hadoop2.7 spark-local

directory structure

wangting@ops01:/opt/module >cd spark-local/
wangting@ops01:/opt/module/spark-local >
wangting@ops01:/opt/module/spark-local >ll
total 104
drwxr-xr-x 2 wangting wangting  4096 Apr 26  2017 bin
drwxr-xr-x 2 wangting wangting  4096 Apr 26  2017 conf
drwxr-xr-x 5 wangting wangting  4096 Apr 26  2017 data
drwxr-xr-x 4 wangting wangting  4096 Apr 26  2017 examples
drwxr-xr-x 2 wangting wangting 12288 Apr 26  2017 jars
-rw-r--r-- 1 wangting wangting 17811 Apr 26  2017 LICENSE
drwxr-xr-x 2 wangting wangting  4096 Apr 26  2017 licenses
-rw-r--r-- 1 wangting wangting 24645 Apr 26  2017 NOTICE
drwxr-xr-x 8 wangting wangting  4096 Apr 26  2017 python
drwxr-xr-x 3 wangting wangting  4096 Apr 26  2017 R
-rw-r--r-- 1 wangting wangting  3817 Apr 26  2017 README.md
-rw-r--r-- 1 wangting wangting   128 Apr 26  2017 RELEASE
drwxr-xr-x 2 wangting wangting  4096 Apr 26  2017 sbin
drwxr-xr-x 2 wangting wangting  4096 Apr 26  2017 yarn

The official demo example calculates Pi

wangting@ops01:/opt/module/spark-local >bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[4] /opt/module/spark-local/examples/jars/spark-examples_2.11-2.1.1.jar 20

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/22 10:52:01 INFO SparkContext: Running Spark version 2.1.1
21/07/22 10:52:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/22 10:52:02 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-cee9f744-b8dd-4c75-83be-3884f3b4425b
21/07/22 10:52:02 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
21/07/22 10:52:02 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/22 10:52:02 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/22 10:52:02 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://11.8.37.50:4040
21/07/22 10:52:02 INFO SparkContext: Added JAR file:/opt/module/spark-local/examples/jars/spark-examples_2.11-2.1.1.jar at spark://11.8.37.50:46388/jars/spark-examples_2.11-2.1.1.jar with timestamp 1626922322910
21/07/22 10:52:02 INFO Executor: Starting executor ID driver on host localhost
21/07/22 10:52:04 INFO Executor: Fetching spark://11.8.37.50:46388/jars/spark-examples_2.11-2.1.1.jar with timestamp 1626922322910
21/07/22 10:52:04 INFO TransportClientFactory: Successfully created connection to /11.8.37.50:46388 after 28 ms (0 ms spent in bootstraps)
21/07/22 10:52:04 INFO Utils: Fetching spark://11.8.37.50:46388/jars/spark-examples_2.11-2.1.1.jar to /tmp/spark-86795505-2fa5-4e6e-9331-f26233e462b2/userFiles-174c50ee-3aff-4523-a89b-17910dcd467e/fetchFileTemp632487868763480019.tmp
21/07/22 10:52:04 INFO Executor: Adding file:/tmp/spark-86795505-2fa5-4e6e-9331-f26233e462b2/userFiles-174c50ee-3aff-4523-a89b-17910dcd467e/spark-examples_2.11-2.1.1.jar to class loader
21/07/22 10:52:05 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.386012 s
Pi is roughly 3.140143570071785				# < < < < < < result output
21/07/22 10:52:05 INFO SparkUI: Stopped Spark web UI at http://11.8.37.50:4040
21/07/22 10:52:05 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/22 10:52:05 INFO MemoryStore: MemoryStore cleared
21/07/22 10:52:05 INFO BlockManager: BlockManager stopped
21/07/22 10:52:05 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/22 10:52:05 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/22 10:52:05 INFO SparkContext: Successfully stopped SparkContext
21/07/22 10:52:05 INFO ShutdownHookManager: Shutdown hook called
21/07/22 10:52:05 INFO ShutdownHookManager: Deleting directory /tmp/spark-86795505-2fa5-4e6e-9331-f26233e462b2

bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[4] /opt/module/spark-local/examples/jars/spark-examples_2.11-2.1.1.jar 20

– class: indicates the main class of the jar package to execute
–master local[2]
(1) local: if the number of threads is not specified, all calculations run in one thread without any parallel calculation
(2) local[K]: Specifies that K cores are used to run calculations. For example, local[4] runs 4 cores to execute calculations
(3) local [*]: automatically set the number of threads according to the maximum number of CPU cores. For example, the CPU has 4 cores, and Spark helps you automatically set 4 threads for computing
spark-examples_2.11-2.1.1.jar: the jar package to run

20: Input parameters to run the program (the number of times to calculate pi. The more times to calculate pi, the higher the accuracy. Here is only an application example to define parameters)

Official wordcount example

wordcount will realize the total number of words in multiple files and count the word frequency

Create experiment directory and files

wangting@ops01:/opt/module/spark-local >mkdir input
wangting@ops01:/opt/module/spark-local >cd input/
wangting@ops01:/opt/module/spark-local/input >echo "hello spark" >> 1.txt
wangting@ops01:/opt/module/spark-local/input >echo "hello scala" >> 1.txt
wangting@ops01:/opt/module/spark-local/input >echo "hello flower" >> 1.txt
wangting@ops01:/opt/module/spark-local/input >echo "hello wangt" >> 1.txt
wangting@ops01:/opt/module/spark-local/input >echo "hello hello" >> 2.txt
wangting@ops01:/opt/module/spark-local/input >echo "hello niubi" >> 2.txt
wangting@ops01:/opt/module/spark-local/input >echo "wang wang" >> 2.txt
wangting@ops01:/opt/module/spark-local/input >echo "wangt ting" >> 2.txt
wangting@ops01:/opt/module/spark-local/input >cat 1.txt 
hello spark
hello scala
hello flower
hello wangt
wangting@ops01:/opt/module/spark-local/input >cat 2.txt 
hello hello
hello niubi
wang wang
wangt ting

Enter the spark shell command line

wangting@ops01:/opt/module/spark-local/input >cd ..
wangting@ops01:/opt/module/spark-local >bin/spark-shell 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/07/22 11:01:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/22 11:01:09 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
21/07/22 11:01:09 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
21/07/22 11:01:10 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://11.8.37.50:4040
Spark context available as 'sc' (master = local[*], app id = local-1626922864098).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 

[note]:

  1. Spark submit and spark shell are just two ways of task execution
  2. Spark submit is to upload jar s to the cluster and execute spark tasks;
  3. Spark shell, which is equivalent to a command-line tool, is also an Application;
  4. When the spark shell is opened, a SparkSubmit process will be started. The port is 4040, which is the port number of the weiUI of the application. If the command line remains connected, the process port will survive. If the command line exits, the process port will also be closed.

Execute wordcount task

scala> sc.textFile("/opt/module/spark-local/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

res0: Array[(String, Int)] = Array((ting,1), (scala,1), (hello,7), (flower,1), (spark,1), (niubi,1), (wangt,2), (wang,2))

[description]: (tab can be completed during command line operation)
def textFile(path: String,minPartitions: Int): org.apache.spark.rdd.RDD[String]
Textfile() - > read local file input folder data

def flatMap[U](f: String => TraversableOnce[U])(implicit evidence$4: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U]
Flatmap() - > flatten operation. Map a row of data into words according to the space separator

def map[U](f: String => U)(implicit evidence$3: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U]
Map() - > operate on each element to map words into tuples

def reduceByKey(func: (Int, Int) => Int): org.apache.spark.rdd.RDD[(String, Int)]
Reducebykey() - > aggregate and add values by key

def collect[U](f: PartialFunction[(String, Int),U](implicit evidence$29: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U] def collect():Array[(String,Int)]
Collect() - > collect data to the Driver side for display

Page view

Click the corresponding job to see more detailed information

Cluster role introduction

Master & Worker

Driver & Executor

[note]:

  1. Master and Worker are the daemons of Spark, that is, the processes necessary for Spark to run normally in a specific mode.

  2. Driver and Executor are temporary programs that will only be started when specific tasks are submitted to Spark cluster

Introduction to deployment and use of Standalone mode

Standalone mode is Spark's own resource mobilization engine. It builds a Spark cluster composed of Master + Slave, and Spark runs in the cluster.

This should be distinguished from the Standalone in Hadoop. Here, Standalone means that Spark is used only to build a cluster without the help of other frameworks. It's relative to Yarn and Mesos

Machine planning (3 sets):

ops01 11.8.37.50 master|worker

ops02 11.8.36.63 worker

ops03 11.8.36.76 worker

ops04 11.8.36.86 worker

[note]: configure the information in the / etc/hosts host resolution file

wangting@ops01:/opt/module >cat /etc/hosts
127.0.0.1 ydt-cisp-ops01
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

11.16.0.176 rancher.mydomain.com
11.8.38.123 www.tongtongcf.com

11.8.37.50 ops01
11.8.36.63 ops02
11.8.36.76 ops03
11.8.38.86 ops04

11.8.38.82 jpserver ydt-dmcp-jpserver
wangting@ops01:/opt/module >

Unzip the installation package

wangting@ops01:/opt/software >tar -xf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/

Directory rename

wangting@ops01:/opt/software >mv /opt/module/spark-2.1.1-bin-hadoop2.7 /opt/module/spark-standalone

Profile directory

wangting@ops01:/opt/software >cd /opt/module/spark-standalone/conf/
wangting@ops01:/opt/module/spark-standalone/conf >
wangting@ops01:/opt/module/spark-standalone/conf >ll
total 32
-rw-r--r-- 1 wangting wangting  987 Apr 26  2017 docker.properties.template
-rw-r--r-- 1 wangting wangting 1105 Apr 26  2017 fairscheduler.xml.template
-rw-r--r-- 1 wangting wangting 2025 Apr 26  2017 log4j.properties.template
-rw-r--r-- 1 wangting wangting 7313 Apr 26  2017 metrics.properties.template
-rw-r--r-- 1 wangting wangting  865 Apr 26  2017 slaves.template
-rw-r--r-- 1 wangting wangting 1292 Apr 26  2017 spark-defaults.conf.template
-rwxr-xr-x 1 wangting wangting 3960 Apr 26  2017 spark-env.sh.template

Modify the slave configuration definition cluster

wangting@ops01:/opt/module/spark-standalone/conf >mv slaves.template slaves

wangting@ops01:/opt/module/spark-standalone/conf >vim slaves
# limitations under the License.
#

# A Spark Worker will be started on each of the machines listed below.
ops01
ops02
ops03
ops04

Modify spark env SH file, add master node

wangting@ops01:/opt/module/spark-standalone/conf >mv spark-env.sh.template spark-env.sh
wangting@ops01:/opt/module/spark-standalone/conf >vim spark-env.sh

SPARK_MASTER_HOST=ops01
SPARK_MASTER_PORT=7077

Distribute the spark standalone directory to each node

wangting@ops01:/opt/module >scp -r spark-standalone ops02:/opt/module/
wangting@ops01:/opt/module >scp -r spark-standalone ops03:/opt/module/
wangting@ops01:/opt/module >scp -r spark-standalone ops04:/opt/module/

Check port 8080 and spark process

# Starting spark shell will open port 8080 - > front end page at the same time; So let's see if it's occupied
wangting@ops01:/home/wangting >sudo netstat -tnlpu|grep 8080
wangting@ops01:/home/wangting >
wangting@ops01:/home/wangting >jps -l | grep spark
wangting@ops01:/home/wangting >

Start spark standalone cluster

wangting@ops01:/home/wangting >cd /opt/module/spark-standalone/
wangting@ops01:/opt/module/spark-standalone >sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.master.Master-1-ops01.out
ops04: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops04.out
ops03: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops03.out
ops01: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops01.out
ops02: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops02.out
ops04: failed to launch: nice -n 0 /opt/module/spark-standalone/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://ops01:7077
ops04:   JAVA_HOME is not set
ops04: full log in /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops04.out
wangting@ops01:/opt/module/spark-standalone >
wangting@ops01:/opt/module/spark-standalone >sudo netstat -tnlpu|grep 8080
tcp6       0      0 :::8080                 :::*                    LISTEN      57689/java          
wangting@ops01:/opt/module/spark-standalone >jps -l | grep spark
57809 org.apache.spark.deploy.worker.Worker
57689 org.apache.spark.deploy.master.Master
wangting@ops01:/opt/module/spark-standalone >

Processing JAVA_HOME is not set

Although the service is started successfully, when starting the cluster, the prompt on ops04 is: ops04: JAVA_HOME is not set

Switch to ops04 server

wangting@ops04:/opt/module/spark-standalone >echo $JAVA_HOME
/usr/java8_64/jdk1.8.0_101
wangting@ops04:/opt/module/spark-standalone >vim sbin/spark-config.sh 
export JAVA_HOME=/usr/java8_64/jdk1.8.0_101

Switch back to master: ops01 restart the service

wangting@ops01:/opt/module/spark-standalone >sbin/stop-all.sh 
ops01: stopping org.apache.spark.deploy.worker.Worker
ops03: stopping org.apache.spark.deploy.worker.Worker
ops04: no org.apache.spark.deploy.worker.Worker to stop
ops02: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master
wangting@ops01:/opt/module/spark-standalone >
wangting@ops01:/opt/module/spark-standalone >sbin/start-all.sh 
starting org.apache.spark.deploy.master.Master, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.master.Master-1-ops01.out
ops01: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops01.out
ops03: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops03.out
ops04: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops04.out
ops02: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops02.out
wangting@ops01:/opt/module/spark-standalone >

The previous prompt has been handled

Browser view interface

The official demo example calculates Pi

wangting@ops01:/opt/module/spark-standalone >bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://ops01:7077 /opt/module/spark-standalone/examples/jars/spark-examples_2.11-2.1.1.jar 20
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/23 15:13:39 INFO SparkContext: Running Spark version 2.1.1
21/07/23 15:13:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/23 15:13:39 INFO SecurityManager: Changing view acls to: wangting
21/07/23 15:13:39 INFO SecurityManager: Changing modify acls to: wangting
21/07/23 15:13:39 INFO SecurityManager: Changing view acls groups to: 
21/07/23 15:13:39 INFO SecurityManager: Changing modify acls groups to: 
21/07/23 15:13:44 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
21/07/23 15:13:44 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 2.824153 s
Pi is roughly 3.1423635711817854				# < < output results
21/07/23 15:13:44 INFO SparkUI: Stopped Spark web UI at http://11.8.37.50:4040
21/07/23 15:13:44 INFO StandaloneSchedulerBackend: Shutting down all executors
21/07/23 15:13:44 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
21/07/23 15:13:44 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/23 15:13:44 INFO MemoryStore: MemoryStore cleared
21/07/23 15:13:44 INFO BlockManager: BlockManager stopped
21/07/23 15:13:44 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/23 15:13:44 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/23 15:13:44 INFO SparkContext: Successfully stopped SparkContext
21/07/23 15:13:44 INFO ShutdownHookManager: Shutdown hook called
21/07/23 15:13:44 INFO ShutdownHookManager: Deleting directory /tmp/spark-6547bdc7-5117-4c44-8f14-4328fa38ace6

Page view task status

Specify resources to perform tasks

wangting@ops01:/opt/module/spark-standalone >bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://ops01:7077 --executor-memory 8G --total-executor-cores 4 /opt/module/spark-standalone/examples/jars/spark-examples_2.11-2.1.1.jar 20
wangting@ops01:/opt/module/spark-standalone >

Page to view new task status resource changes

Configure history service

[note]: the hdfs environment is installed by default. If you don't need to build and deploy it first, you can use the local version only for experimental testing; If you build a cluster, you already have an hdfs cluster environment by default.

Since the running status of the historical task cannot be seen on the Hadoop 102:4040 page after the spark shell is stopped, the historical server is configured to record the running status of the task during development

wangting@ops01:/opt/module/spark-standalone >cd conf/
# Modify profile
wangting@ops01:/opt/module/spark-standalone/conf >mv spark-defaults.conf.template spark-defaults.conf
wangting@ops01:/opt/module/spark-standalone/conf >
wangting@ops01:/opt/module/spark-standalone/conf >vim spark-defaults.conf 
#
spark.eventLog.enabled          true
spark.eventLog.dir              hdfs://ops01:8020/directory

# Add / directory directory on hdfs
wangting@ops01:/opt/module/spark-standalone/conf >hdfs dfs -ls /
2021-07-23 15:24:45,730 INFO  [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 15 items
drwxr-xr-x   - wangting supergroup          0 2021-03-17 11:44 /20210317
drwxr-xr-x   - wangting supergroup          0 2021-03-19 10:51 /20210319
drwxr-xr-x   - wangting supergroup          0 2021-04-24 17:05 /flume
-rw-r--r--   3 wangting supergroup  338075860 2021-03-12 11:50 /hadoop-3.1.3.tar.gz
drwxr-xr-x   - wangting supergroup          0 2021-05-13 15:31 /hbase
drwxr-xr-x   - wangting supergroup          0 2021-05-26 16:56 /origin_data
drwxr-xr-x   - wangting supergroup          0 2021-06-10 10:31 /spark-history
drwxr-xr-x   - wangting supergroup          0 2021-06-10 10:39 /spark-jars
drwxr-xr-x   - wangting supergroup          0 2021-06-10 11:11 /student
drwxr-xr-x   - wangting supergroup          0 2021-04-04 11:07 /test.db
drwxr-xr-x   - wangting supergroup          0 2021-03-19 11:14 /testgetmerge
drwxr-xr-x   - wangting supergroup          0 2021-04-10 16:23 /tez
drwx------   - wangting supergroup          0 2021-04-02 15:14 /tmp
drwxr-xr-x   - wangting supergroup          0 2021-04-02 15:25 /user
drwxr-xr-x   - wangting supergroup          0 2021-06-10 11:43 /warehouse
wangting@ops01:/opt/module/spark-standalone/conf >hdfs dfs -mkdir /directory
2021-07-23 15:25:14,573 INFO  [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
wangting@ops01:/opt/module/spark-standalone/conf >hdfs dfs -ls / | grep directory
drwxr-xr-x   - wangting supergroup          0 2021-07-23 15:25 /directory

# Modify spark env configuration
wangting@ops01:/opt/module/spark-standalone/conf >vim spark-env.sh 
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080 
-Dspark.history.fs.logDirectory=hdfs://ops01:8020/directory 
-Dspark.history.retainedApplications=30"

wangting@ops01:/opt/module/spark-standalone/conf >
# Distribution profile
wangting@ops01:/opt/module/spark-standalone/conf >scp spark-env.sh spark-defaults.conf ops02:/opt/module/spark-standalone/conf/
wangting@ops01:/opt/module/spark-standalone/conf >scp spark-env.sh spark-defaults.conf ops03:/opt/module/spark-standalone/conf/ 
wangting@ops01:/opt/module/spark-standalone/conf >scp spark-env.sh spark-defaults.conf ops04:/opt/module/spark-standalone/conf/

# Start history service
wangting@ops01:/opt/module/spark-standalone >sbin/start-history-server.sh 
starting org.apache.spark.deploy.history.HistoryServer, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.history.HistoryServer-1-ops01.out
wangting@ops01:/opt/module/spark-standalone >

# Perform the task again
wangting@ops01:/opt/module/spark-standalone >bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://ops01:7077 --executor-memory 8G --total-executor-cores 4 /opt/module/spark-standalone/examples/jars/spark-examples_2.11-2.1.1.jar 20

Access the historical task page via ip:18080

http://11.8.37.50:18080/

Because the standalone here is only used for experiments, and the high availability related operations are not considered temporarily. The high availability is omitted. Generally, the yarn mode is mainly used in the formal environment in China, and the mesos mode is used abroad

Introduction to deployment and use of Yarn mode

In the yarn mode, hadoop clusters, hdfs and yarn clusters need to be prepared in advance; In the previous article, I wrote the deployment method: hadoop introduction deployment document

serviceops01(8C32G)ops02(8C24G)ops03(8C24G)ops04(8C24G)version
HdfsNameNodeDatanodeSecondaryNameNodeDatanode3.1.3
YarnNodeManagerReSourceManager / NodeManagerNodeManagerNodeManager3.1.3
MapReduce√ JobHistoryServer3.1.3

Stop spark cluster in Standalone mode

wangting@ops01:/opt/module/spark-standalone >sbin/stop-all.sh 
ops01: stopping org.apache.spark.deploy.worker.Worker
ops04: stopping org.apache.spark.deploy.worker.Worker
ops03: stopping org.apache.spark.deploy.worker.Worker
ops02: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master
wangting@ops01:/opt/module/spark-standalone >

Unzip the spark package and rename it

wangting@ops01:/opt/module/spark-standalone >cd /opt/software/
wangting@ops01:/opt/software >tar -xf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/
wangting@ops01:/opt/software >cd /opt/module/
wangting@ops01:/opt/module >mv spark-2.1.1-bin-hadoop2.7 spark-yarn
wangting@ops01:/opt/module >cd spark-yarn/

Modify the configuration spark env

Finally, add configuration item: YARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop (define Hadoop installation configuration path)

wangting@ops01:/opt/module/spark-yarn >cd conf/
wangting@ops01:/opt/module/spark-yarn/conf >mv spark-env.sh.template spark-env.sh
wangting@ops01:/opt/module/spark-yarn/conf >vim spark-env.sh
YARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop

Start HDFS and YARN

Confirm that HDFS and YARN cluster have been started (started under hadoop/sbin /)

In this step, each environment is different, depending on where you deploy locally and on which server the master of each component is located

wangting@ops01:/opt/module/hadoop-3.1.3 >sbin/start-dfs.sh 
Starting namenodes on [ops01]
Starting datanodes
Starting secondary namenodes [ops03]

wangting@ops02:/opt/module/hadoop-3.1.3/sbin >./start-yarn.sh 
Starting resourcemanager
Starting nodemanagers

The official demo example calculates Pi

wangting@ops01:/opt/module/spark-yarn >bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
/opt/module/spark-yarn/examples/jars/spark-examples_2.11-2.1.1.jar \
20

2021-07-26 11:41:56,166 INFO spark.SparkContext: Running Spark version 2.1.1
2021-07-26 11:41:56,606 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2021-07-26 11:41:56,784 INFO spark.SecurityManager: Changing view acls to: wangting
2021-07-26 11:41:56,784 INFO spark.SecurityManager: Changing modify acls to: wangting
2021-07-26 11:41:56,785 INFO spark.SecurityManager: Changing view acls groups to: 
2021-07-26 11:41:56,786 INFO spark.SecurityManager: Changing modify acls groups to: 
2021-07-26 11:41:56,786 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(wangting); groups with view permissions: Set(); users  with modify permissions: Set(wangting); groups with modify permissions: Set()
2021-07-26 11:42:21,960 INFO scheduler.DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 0.933 s
2021-07-26 11:42:21,965 INFO scheduler.DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.150791 s
Pi is roughly 3.139429569714785				# < < output results
2021-07-26 11:42:21,976 INFO server.ServerConnector: Stopped Spark@61edc883{HTTP/1.1}{0.0.0.0:4040}
2021-07-26 11:42:21,977 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6063d80a{/stages/stage/kill,null,UNAVAILABLE,@Spark}
2021-07-26 11:42:21,977 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@5ae76500{/jobs/job/kill,null,UNAVAILABLE,@Spark}
2021-07-26 11:42:22,010 INFO cluster.YarnClientSchedulerBackend: Stopped
2021-07-26 11:42:22,015 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
2021-07-26 11:42:22,027 INFO memory.MemoryStore: MemoryStore cleared
2021-07-26 11:42:22,027 INFO storage.BlockManager: BlockManager stopped
2021-07-26 11:42:22,033 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
2021-07-26 11:42:22,037 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
2021-07-26 11:42:22,038 INFO spark.SparkContext: Successfully stopped SparkContext
2021-07-26 11:42:22,040 INFO util.ShutdownHookManager: Shutdown hook called
2021-07-26 11:42:22,041 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-d3cf3aec-be6f-41f0-a950-4521641e6179

View cluster yarn resourcemanager 8088 port

You can see the historical job record

Spark yarn operation process

Spark has two modes: yarn client and yarn cluster. The main difference is that the running node of the Driver program.
Yarn client: the Driver program runs on the client and is suitable for interaction and debugging. I hope to see the output of the app immediately.
The yarn cluster: Driver program runs on the APPMaster started by the resource manager and is suitable for production environments.

Client mode

wangting@ops01:/opt/module/spark-yarn >bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10

You can directly see the output results on the console

Client mode task flow

Cluster mode

wangting@ops01:/opt/module/spark-yarn >bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10

Click in the task and you can see the final Pi output in the log output

Cluster mode task flow

Centralized spark mode comparison (excluding mesos)

patternNumber of Spark installed machinesProcess to startOwner
Local mode1nothingSpark
Standalone mode3Master and WorkerSpark
Yarn mode1Rely on existing Yan and HDFSHadoop

Spark port grooming

1) Spark history server port number: 18080 (similar to Hadoop history server port number: 19888)

2) Spark Master Web port number: 8080 (similar to the NameNode Web port number of Hadoop: 9870 (50070))

3) Internal communication service port number of Spark Master: 7077 (similar to port 8020 (9000) of Hadoop)

4) Spark view current spark shell running task port number: 4040

5) Hadoop YARN task running status viewing port number: 8088

Local development environment + wordcount case

First install the idea code management tool, install the java environment, scala environment, idea configuration maven environment, etc. you can baidu by yourself.

Spark Shell is only used more when testing and verifying our programs. In the production environment, programs are usually compiled in the IDE, then printed into Jar packages, and then submitted to the cluster. The most commonly used is to create a maven project and use Maven to manage the dependency of Jar packages.

Programming

1) Create a Maven project

2) Prepare some wordcount materials

3) Import project dependencies

pom.xml file

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wangting</groupId>
    <artifactId>spark_wt_test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>WordCount</finalName>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

[note]:

  1. The new project is maven project, and the project name can be customized
  2. Create a new input directory under the project directory and a new 1. 0 directory under the input directory txt ; 2.txt is used as the calculation data source

​ 1.txt

hello niubi
nihao niubiplus
scala niubi
spark niubi scala spark

​ 2.txt

hello wangting
nihao wang
scala ting
spark wangting scala spark
  1. In POM After adding and configuring in XML, click maven update component on the right. At this time, there will be a download dependency process and wait patiently for the download to complete
  2. Create a scala directory in src/main, right-click mark directory as to change it to root, and the color is the same as java
  3. Create a new package in scala: com wangting. spark ; This can be customized
  4. On COM wangting. Create a new Scala class under spark package; The name is WordCount and the type is Object
package com.wangting.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {

    //Create a SparkConf profile
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    //Create a SparkContext object
    val sc: SparkContext = new SparkContext(conf)

    //sc.textFile("").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
    //Read external files
    val textRDD: RDD[String] = sc.textFile("E:\\spark_wt_test\\input\\1.txt")

    //Cut and flatten the read content
    val flatMapRDD: RDD[String] = textRDD.flatMap(_.split(" "))

    //Structure conversion of the content in the data set - counting
    val mapRDD: RDD[(String, Int)] = flatMapRDD.map((_, 1))

    //Summarize the number of occurrences of the same word
    val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)

    //Collect the executed structure
    val res: Array[(String, Int)] = reduceRDD.collect()

    res.foreach(println)

    /* //Create a SparkConf profile
     val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
     //Create a SparkContext object
     val sc: SparkContext = new SparkContext(conf)
     //One line of code
     sc.textFile(args(0)).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile(args(1))
 */
    //Release resources
    sc.stop()
  }
}

xml add package configuration

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <version>3.0.0</version>
    <configuration>
        <archive>
            <manifest>
                <mainClass>com.wangting.spark.WordCount</mainClass>
            </manifest>
        </archive>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>
    <executions>
        <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>
</plugin>

Note the location of plugin configuration build - plugins - plugin

Process combing

[note]:

The above is just a review of the development process. Each step must be different in the actual scene. Here is just to familiarize yourself with the steps of the development process

  1. Prepare idea and configure maven environment

  2. Create a maven project

  3. In the project, you can create an input directory and write some materials instead of hdfs to get data to test the code

  4. Create a directory Scala (sources root) of the same type as java in src/main

  5. Note that the dependency configuration in the pom file is increased. After the addition, you can find maven near the upper right. Click the button to find the first similar update to download the dependency locally

  6. You can put the code maintenance management under the scala package

  7. Finally, you can use package to package in the Lifecycle option in maven

  8. After packaging, there will be a target directory under the project directory; The package name is WordCount defined in pom. If there is no problem with the process, there will be a WordCount Jar package

  9. Finally, transfer the jar package to the server to run the jar package, and the whole development process is completed

Associated source code

1. Download the zip package of spark-2.1.1 source code from the official website and unzip it locally

When the source code is not associated to the source code block, click the option 2. Download method

3. Select choose resources, and select the path of the extracted spark source directory in the path selection

4. After associating the corresponding path, click ok to complete the import

Local execution part version error reporting processing

Due to the version problem, if the local operating system is Windows and Hadoop related things are used in the program, such as writing files to HDFS, the following exceptions will be encountered

21/07/29 10:07:41 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
	at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278)
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:293)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
	at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362)
	at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$29.apply(SparkContext.scala:1013)
	at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$29.apply(SparkContext.scala:1013)
	at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:179)
	at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:179)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:179)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:198)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

[note]: you only need to configure such a parameter. It doesn't matter whether there is hadoop deployment in the actual path. Just ensure that this parameter is available and the process can go through, because the local static file is called during the actual code test
After processing the error of adding environmental parameters:

Keywords: Scala Operation & Maintenance Big Data Hadoop Spark

Added by benreisner on Mon, 03 Jan 2022 22:14:19 +0200