2021 big data Spark: Structured Streaming Internet of things device data analysis

catalogue

Data analysis of Internet of things devices

Equipment monitoring data preparation

Create Topic

Analog data

SQL style

DSL style

Data analysis of Internet of things devices

In the era of Internet of things, a large number of sensors are collecting and producing data involving various fields every day. It makes the Internet of things an ideal tool for analyzing data stream in real time.

 

Simulate the data statistical analysis of an intelligent Internet of things system, generate equipment data and send it to Kafka. Structured Streaming real-time consumption statistics. Real time statistical analysis of IOT equipment status signal data:

1) equipment with signal strength greater than 30;

2) quantity of various equipment types;

3) average signal strength of various equipment types;

 

Equipment monitoring data preparation

Write a program to simulate and generate the monitoring data of Internet of things devices and send it to Kafka Topic. There are few fields here for demonstration, but there are many fields in the actual production project.

Create Topic

Start the Kafka Broker service and create a Topic [search log Topic]. The command is as follows:

#View topic information

/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181

#Delete topic

/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic iotTopic



#Create topic

/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic iotTopic



#Simulated producer

/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic iotTopic

#Simulated consumer

/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic iotTopic --from-beginning

 

Analog data

Simulate the equipment monitoring log data, and encapsulate the field information into the CaseClass sample class [DeviceData]:

The specific code of log data class [mockiotdata] generated by simulation is as follows:

package cn.itcast.structedstreaming

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Json

import scala.util.Random

object MockIotDatas {
  def main(args: Array[String]): Unit = {
    // Send Kafka Topic
    val props = new Properties()
    props.put("bootstrap.servers", "node1:9092")
    props.put("acks", "1")
    props.put("retries", "3")
    props.put("key.serializer", classOf[StringSerializer].getName)
    props.put("value.serializer", classOf[StringSerializer].getName)
    val producer = new KafkaProducer[String, String](props)

    val deviceTypes = Array(
      "db", "bigdata", "kafka", "route", "bigdata", "db", "bigdata", "bigdata", "bigdata"
    )

    val random: Random = new Random()
    while (true) {
      val index: Int = random.nextInt(deviceTypes.length)
      val deviceId: String = s"device_${(index + 1) * 10 + random.nextInt(index + 1)}"
      val deviceType: String = deviceTypes(index)
      val deviceSignal: Int = 10 + random.nextInt(90)
      // Simulated construction equipment data
      val deviceData = DeviceData(deviceId, deviceType, deviceSignal, System.currentTimeMillis())
      // Convert to JSON string
      val deviceJson: String = new Json(org.json4s.DefaultFormats).write(deviceData)
      println(deviceJson)
      Thread.sleep(100 + random.nextInt(500))

      val record = new ProducerRecord[String, String]("iotTopic", deviceJson)
      producer.send(record)
    }

    // Close connection
    producer.close()
  }

  /**
   * Sending status data to IOT devices
   */
  case class DeviceData(
                         device: String, //Device identifier ID
                         deviceType: String, //Device type, such as server mysql, redis, kafka or router route
                         signal: Double, //Equipment signal
                         time: Long //Sending data time
                       )

}

 

It is equivalent to that each server in the large computer room regularly sends relevant monitoring data to Kafka. The server deployment services include database db, big data cluster bigdata, message queue Kafka and router route. Data samples:

{"device":"device_50","deviceType":"bigdata","signal":91.0,"time":1590660338429}

{"device":"device_20","deviceType":"bigdata","signal":17.0,"time":1590660338790}

{"device":"device_32","deviceType":"kafka","signal":93.0,"time":1590660338908}

{"device":"device_82","deviceType":"bigdata","signal":72.0,"time":1590660339380}

{"device":"device_32","deviceType":"kafka","signal":10.0,"time":1590660339972}

{"device":"device_30","deviceType":"kafka","signal":81.0,"time":1590660340442}

{"device":"device_32","deviceType":"kafka","signal":29.0,"time":1590660340787}

{"device":"device_96","deviceType":"bigdata","signal":18.0,"time":1590660343554}

 

SQL style

According to the business requirements, extract the field information from the Kafka consumption log data, register the DataFrame as a temporary view, and use the function get_json_object extracts the field values in the JSON string, writes SQL to perform analysis, and prints the final results to the console

The code is as follows:

package cn.itcast.structedstreaming

import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{DoubleType, LongType}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * For the status signal data of Internet of things equipment, real-time statistical analysis, based on SQL programming
 * 1),Equipment with signal strength greater than 30
 * 2),Number of various equipment types
 * 3),Average signal strength of various equipment types
 */
object IotStreamingOnlineSQL {
  def main(args: Array[String]): Unit = {
    // 1. Build the SparkSession session instance object and set the attribute information
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[*]")
      .config("spark.sql.shuffle.partitions", "3")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import org.apache.spark.sql.functions._
    import spark.implicits._

    // 2. Read data from Kafka, and the bottom layer adopts New Consumer API
    val iotStreamDF: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "node1:9092")
      .option("subscribe", "iotTopic")
      // Set the maximum value of consumption data per batch
      .option("maxOffsetsPerTrigger", "100000")
      .load()

    // 3. Analyze the acquired data and package it into DeviceData
    val etlStreamDF: DataFrame = iotStreamDF
      // Get the value of the value field and convert it to String type
      .selectExpr("CAST(value AS STRING)")
      // Convert data to Dataset
      .as[String] // The internal field name is value
      // Filter data
      .filter(StringUtils.isNotBlank(_))
      // Parsing JSON data: {device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796}
      .select(
        get_json_object($"value", "$.device").as("device_id"),
        get_json_object($"value", "$.deviceType").as("device_type"),
        get_json_object($"value", "$.signal").cast(DoubleType).as("signal"),
        get_json_object($"value", "$.time").cast(LongType).as("time")
      )

    // 4. Analyze and handle according to the business
    // Todo: signal > 30 all data are grouped according to equipment type, and the quantity and average signal strength are counted
    // 4.1 register DataFrame as temporary view
    etlStreamDF.createOrReplaceTempView("t_iots")
    // 4.2 writing SQL to execute query
    val resultStreamDF: DataFrame = spark.sql(
      """
        |SELECT
        |  device_type,
        |  COUNT(device_type) AS count_device,
        |  ROUND(AVG(signal), 2) AS avg_signal
        |FROM t_iots
        |WHERE signal > 30 GROUP BY device_type
        |""".stripMargin)

    // 5. Start the streaming application and output the results to the console
    val query: StreamingQuery = resultStreamDF.writeStream
      .outputMode(OutputMode.Complete())
      .foreachBatch((batchDF: DataFrame, batchId: Long) => {
        println("===========================================")
        println(s"BatchId = ${batchId}")
        println("===========================================")
        if (!batchDF.isEmpty) {
          batchDF.coalesce(1).show(20, truncate = false)
        }
      })
      .start()
    query.awaitTermination()
    query.stop()
  }
}

 

DSL style

According to the business requirements, the consumption log data from Kafka is analyzed by calling the function based on the DataFrame data structure. The code is as follows:

package cn.itcast.structedstreaming

import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{DoubleType, LongType}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * Real time statistical analysis of IOT equipment status signal data:
 * 1),Equipment with signal strength greater than 30
 * 2),Number of various equipment types
 * 3),Average signal strength of various equipment types
 */
object IotStreamingOnlineDSL {
  def main(args: Array[String]): Unit = {
    // 1. Build the SparkSession session instance object and set the attribute information
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[*]")
      .config("spark.sql.shuffle.partitions", "3")
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import org.apache.spark.sql.functions._
    import spark.implicits._

    // 2. Read data from Kafka, and the bottom layer adopts New Consumer API
    val iotStreamDF: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "node1:9092")
      .option("subscribe", "iotTopic")
      // Set the maximum value of consumption data per batch
      .option("maxOffsetsPerTrigger", "100000")
      .load()

    // 3. Analyze the acquired data and package it into DeviceData
    val etlStreamDF: DataFrame = iotStreamDF
      // Get the value of the value field and convert it to String type
      .selectExpr("CAST(value AS STRING)")
      // Convert data to Dataset
      .as[String] // The internal field name is value
      // Filter data
      .filter(StringUtils.isNotBlank(_))
      // Parsing JSON data: {device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796}
      .select(
        get_json_object($"value", "$.device").as("device_id"),
        get_json_object($"value", "$.deviceType").as("device_type"),
        get_json_object($"value", "$.signal").cast(DoubleType).as("signal"),
        get_json_object($"value", "$.time").cast(LongType).as("time")
      )

    // 4. Analyze and handle according to the business
    // Todo: signal > 30 all data are grouped according to equipment type, and the quantity and average signal strength are counted
    val resultStreamDF: DataFrame = etlStreamDF
      // Signal strength greater than 10
      .filter($"signal" > 30)
      // Group by device type
      .groupBy($"device_type")
      // Count quantity and evaluate signal strength
      .agg(
        count($"device_type").as("count_device"),
        round(avg($"signal"), 2).as("avg_signal")
      )

    // 5. Start the streaming application and output the results to the console
    val query: StreamingQuery = resultStreamDF.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .option("numRows", "10")
      .option("truncate", "false")
      .start()
    query.awaitTermination()
    query.stop()
  }
}

 

Keywords: Big Data

Added by FUNKAM35 on Sun, 20 Feb 2022 01:16:33 +0200