Passenger express logistics big data project: initialize Spark flow computing program

catalogue

Initialize Spark streaming program

1, SparkSql parameter tuning settings

1. Set session time zone

2. Sets the maximum number of bytes a single partition can hold when reading a file

3. Set the threshold for merging small files

4. Sets the number of partitions to use when shuffling data with join or aggregate

5. Set the maximum byte size that can be broadcast to all worker # nodes when executing # join # operation

2, Test whether the data can be consumed successfully

Initialize Spark streaming program

Β 

Implementation steps:

  • Create an App # singleton object in the realtime directory of the etl module and initialize the spark running environment
  • Create main method
  • Write code
    • Initialize spark environment parameters
    • Consumption kafka ogg data
    • canal data of consumption kafka
    • Print kafka's data

Reference code:

package cn.it.logistics.etl.realtime

import cn.it.logistics.common.Configuration
import org.apache.commons.lang.SystemUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * Test consumption kafka data
 * 1)Logistics related data
 * 2)Data of customer relationship management system
 */
object App {
  /**
   * Entry function
   *
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * Implementation steps:
     * 1)Initialize the running environment of spark
     * 2)Judge the current operating environment (local/linux operating environment)
     * 3)Create a sparkSession object
     * 4)Initialize the connection parameters of logistics topic data
     * 5)Initialize the connection parameters of topic data in the customer relationship system
     * 6)Consume topic data of Oracle - > Ogg - > Kafka
     * 7)Consume topic data of MySQL - > canal - > Kafka
     * 8)Start operation and wait for stop
     */
    //1) Initialize the running environment of spark
    val conf: SparkConf = new SparkConf()
      //Set the name of the app
      .set("spark.app.name", this.getClass.getSimpleName)
      //Set time zone
      .set("spark.sql.session.timeZone", "Asia/Shanghai")
      //Set the maximum number of bytes that can be accommodated in a single partition. The default is 128M, which is equivalent to the size of block block
      .set("spark.sql.files.maxPartitionBytes", "134217728")
      //Set the threshold for merging small files to avoid each small file occupying one partition
      .set("spark.sql.files.openCostInBytes", "134217728")
      //The number of partitions used when setting join or shuffle. By default, the number of partitions is 200
      .set("spark.sql.shuffle.partitions", "600")
      //Set the maximum byte size that can be broadcast to the worker node during the join operation to avoid the shuffle operation
      .set("spark.sql.autoBroadcastJoinThreshold", "67108864")

    //2) Judge the current operating environment (local/linux operating environment)
    if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) {
      //Local environment_ HADOOP_ HOME
      System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME)
      //Set the running environment and checkpoint path
      conf.set("spark.master", "local[*]").set("spark.sql.streaming.checkpointLocation", Configuration.sparkAppWinCheckpointDir)
    } else {
      //production environment 
      conf.set("spark.master", "yarn").set("spark.sql.streaming.checkpointLocation", Configuration.sparkAppDfsCheckpointDir)
    }

    //3) Create a sparkSession object
    val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //4) Initialize the connection parameters of logistics topic data
    val logisticsKafkaParams: Map[String, String] = Map[String, String](
      "kafka.bootstrap.servers" -> Configuration.kafkaAddress,
      "subscribe" -> Configuration.kafkaLogisticsTopic,
      "group.id" -> "logistics",
      //Indicates that the data is lost (when the topic is deleted or the offset does not have an available range)
      "failOnDataLoss" -> "false"
    )

    //5) Initialize the connection parameters of topic data in the customer relationship system
    val crmKafkaParams: Map[String, String] = Map[String, String](
      "kafka.bootstrap.servers" -> Configuration.kafkaAddress,
      "subscribe" -> Configuration.kafkaCrmTopic,
      "group.id" -> "logistics",
      //Indicates that the data is lost (when the topic is deleted or the offset does not have an available range)
      "failOnDataLoss" -> "false"
    )

    //Hermit transformation
    import sparkSession.implicits._

    //6) Consume topic data of Oracle - > Ogg - > Kafka
    val logisticsDF: DataFrame = sparkSession.readStream.format("kafka").options(logisticsKafkaParams).load().selectExpr("CAST(value AS STRING)").as[String].toDF()

    //7) Consume topic data of MySQL - > canal - > Kafka
    val crmDF: DataFrame = sparkSession.readStream.format("kafka").options(crmKafkaParams).load().selectExpr("CAST(value AS STRING)").as[String].toDF()

    //output data
   logisticsDF.writeStream.outputMode(OutputMode.Update()).format("console").queryName("logistics").start()
    crmDF.writeStream.outputMode(OutputMode.Update()).format("console").queryName("crm").start()

    //8) Start operation and wait for stop
    val stream = sparkSession.streams
    //stream.active: gets the list of currently active streaming queries
    stream.active.foreach(query => println(s"Query ready to start: ${query.name}"))
    //Thread blocked, waiting for termination
    stream.awaitAnyTermination()
  }
}

1, SparkSql parameter tuning settings

1. Set session time zone

ID of the session's local time zone

.set("spark.sql.session.timeZone", "Asia/Shanghai")

Session time zone use configuration 'spark sql. session. Timezone 'setting. If it is not set, it will default to the local time zone of the JVM system

2. Sets the maximum number of bytes a single partition can hold when reading a file

The maximum number of bytes that can be accommodated in a single partition when reading a file. The default is 128M, which is equivalent to the Block size

.set("spark.sql.files.maxPartitionBytes", "134217728")

Β 

3. Set the threshold for merging small files

The cost of opening a file is measured by the size of data that can be scanned at the same time. This parameter is useful when writing multiple files to the same partition.

It is beneficial to set this value larger. Partitions with small files will process faster than partitions with large files (priority scheduling). The default is 4M

To be frank, this parameter is the threshold for merging small files. Files less than this threshold will be merged to prevent too many single small files from occupying a partition.

.set("spark.sql.files.openCostInBytes", "134217728")

4. Sets the number of partitions to use when shuffling data with join or aggregate

For SparkSQL, another important parameter is the number of tasks during shuffle, which is through spark sql. shuffle. Partitions to adjust. The adjustment is based on the processing capacity of the spark cluster and the amount of data to be processed. The default value of spark is 200. Too many tasks will lead to a lot of Task start-up costs. How many tasks are there, and the processing time of each Task is too long, which is easy to straggle

.set("spark.sql.shuffle.partitions", "600")

5. Set the maximum byte size that can be broadcast to all worker # nodes when executing # join # operation

For broadcast join mode, it will be less than spark sql. The table with autobroadcastjointhreshold value (10M by default) is broadcast to other computing nodes without the shuffle process, so it will be more efficient.

.set("spark.sql.autoBroadcastJoinThreshold", "67108864")

Otherwise, the following error will be reported:

Exception in thread "broadcast-exchange-0" java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodesΒ 

reason:

From the analysis of the problem, it is a memory overflow, which means that the broadcast memory is not enough. Even if the memory resources of the whole task are continuously set, the memory allocation of both the executor and the driver is doubled, but it still doesn't work.

Therefore, the maximum byte size of this configuration is used to broadcast the table to all working nodes when the connection is executed. By setting this value to - 1, broadcasting can be disabled.

2, Test whether the data can be consumed successfully

Test steps:

  • Start docker and start Order and Mysql database (including OGG service and canal server service)
  • Start the counting program (cn.it.logistics.generate.App under the logistics generate project)
  • Start App singleton object

Β 

  • πŸ“’ Blog home page: https://lansonli.blog.csdn.net
  • πŸ“’ Welcome to like πŸ‘ Collection ⭐ Leaving a message. πŸ“ Please correct any mistakes!
  • πŸ“’ This article was originally written by Lansonli and started on CSDN blog πŸ™‰
  • πŸ“’ Big data series articles will be updated every day. Don't forget that others are still running when you stop to have a rest. I hope you can seize the time to study and make every effort to go to a better life ✨

Keywords: Big Data kafka Spark

Added by ratcateme on Wed, 02 Mar 2022 22:37:24 +0200