catalogue
Data analysis of Internet of things devices
Equipment monitoring data preparation
Data analysis of Internet of things devices
In the era of Internet of things, a large number of sensors are collecting and producing data involving various fields every day. It makes the Internet of things an ideal tool for analyzing data stream in real time.
Simulate the data statistical analysis of an intelligent Internet of things system, generate equipment data and send it to Kafka. Structured Streaming real-time consumption statistics. Real time statistical analysis of IOT equipment status signal data:
1) equipment with signal strength greater than 30;
2) quantity of various equipment types;
3) average signal strength of various equipment types;
Equipment monitoring data preparation
Write a program to simulate and generate the monitoring data of Internet of things devices and send it to Kafka Topic. There are few fields here for demonstration, but there are many fields in the actual production project.
Create Topic
Start the Kafka Broker service and create a Topic [search log Topic]. The command is as follows:
#View topic information /export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181 #Delete topic /export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic iotTopic #Create topic /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 3 --topic iotTopic #Simulated producer /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic iotTopic #Simulated consumer /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic iotTopic --from-beginning
Analog data
Simulate the equipment monitoring log data, and encapsulate the field information into the CaseClass sample class [DeviceData]:
The specific code of log data class [mockiotdata] generated by simulation is as follows:
package cn.itcast.structedstreaming import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer import org.json4s.jackson.Json import scala.util.Random object MockIotDatas { def main(args: Array[String]): Unit = { // Send Kafka Topic val props = new Properties() props.put("bootstrap.servers", "node1:9092") props.put("acks", "1") props.put("retries", "3") props.put("key.serializer", classOf[StringSerializer].getName) props.put("value.serializer", classOf[StringSerializer].getName) val producer = new KafkaProducer[String, String](props) val deviceTypes = Array( "db", "bigdata", "kafka", "route", "bigdata", "db", "bigdata", "bigdata", "bigdata" ) val random: Random = new Random() while (true) { val index: Int = random.nextInt(deviceTypes.length) val deviceId: String = s"device_${(index + 1) * 10 + random.nextInt(index + 1)}" val deviceType: String = deviceTypes(index) val deviceSignal: Int = 10 + random.nextInt(90) // Simulated construction equipment data val deviceData = DeviceData(deviceId, deviceType, deviceSignal, System.currentTimeMillis()) // Convert to JSON string val deviceJson: String = new Json(org.json4s.DefaultFormats).write(deviceData) println(deviceJson) Thread.sleep(100 + random.nextInt(500)) val record = new ProducerRecord[String, String]("iotTopic", deviceJson) producer.send(record) } // Close connection producer.close() } /** * Sending status data to IOT devices */ case class DeviceData( device: String, //Device identifier ID deviceType: String, //Device type, such as server mysql, redis, kafka or router route signal: Double, //Equipment signal time: Long //Sending data time ) }
It is equivalent to that each server in the large computer room regularly sends relevant monitoring data to Kafka. The server deployment services include database db, big data cluster bigdata, message queue Kafka and router route. Data samples:
{"device":"device_50","deviceType":"bigdata","signal":91.0,"time":1590660338429} {"device":"device_20","deviceType":"bigdata","signal":17.0,"time":1590660338790} {"device":"device_32","deviceType":"kafka","signal":93.0,"time":1590660338908} {"device":"device_82","deviceType":"bigdata","signal":72.0,"time":1590660339380} {"device":"device_32","deviceType":"kafka","signal":10.0,"time":1590660339972} {"device":"device_30","deviceType":"kafka","signal":81.0,"time":1590660340442} {"device":"device_32","deviceType":"kafka","signal":29.0,"time":1590660340787} {"device":"device_96","deviceType":"bigdata","signal":18.0,"time":1590660343554}
SQL style
According to the business requirements, extract the field information from the Kafka consumption log data, register the DataFrame as a temporary view, and use the function get_json_object extracts the field values in the JSON string, writes SQL to perform analysis, and prints the final results to the console
The code is as follows:
package cn.itcast.structedstreaming import org.apache.commons.lang3.StringUtils import org.apache.spark.SparkContext import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery} import org.apache.spark.sql.types.{DoubleType, LongType} import org.apache.spark.sql.{DataFrame, SparkSession} /** * For the status signal data of Internet of things equipment, real-time statistical analysis, based on SQL programming * 1),Equipment with signal strength greater than 30 * 2),Number of various equipment types * 3),Average signal strength of various equipment types */ object IotStreamingOnlineSQL { def main(args: Array[String]): Unit = { // 1. Build the SparkSession session instance object and set the attribute information val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[*]") .config("spark.sql.shuffle.partitions", "3") .getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") import org.apache.spark.sql.functions._ import spark.implicits._ // 2. Read data from Kafka, and the bottom layer adopts New Consumer API val iotStreamDF: DataFrame = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "node1:9092") .option("subscribe", "iotTopic") // Set the maximum value of consumption data per batch .option("maxOffsetsPerTrigger", "100000") .load() // 3. Analyze the acquired data and package it into DeviceData val etlStreamDF: DataFrame = iotStreamDF // Get the value of the value field and convert it to String type .selectExpr("CAST(value AS STRING)") // Convert data to Dataset .as[String] // The internal field name is value // Filter data .filter(StringUtils.isNotBlank(_)) // Parsing JSON data: {device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796} .select( get_json_object($"value", "$.device").as("device_id"), get_json_object($"value", "$.deviceType").as("device_type"), get_json_object($"value", "$.signal").cast(DoubleType).as("signal"), get_json_object($"value", "$.time").cast(LongType).as("time") ) // 4. Analyze and handle according to the business // Todo: signal > 30 all data are grouped according to equipment type, and the quantity and average signal strength are counted // 4.1 register DataFrame as temporary view etlStreamDF.createOrReplaceTempView("t_iots") // 4.2 writing SQL to execute query val resultStreamDF: DataFrame = spark.sql( """ |SELECT | device_type, | COUNT(device_type) AS count_device, | ROUND(AVG(signal), 2) AS avg_signal |FROM t_iots |WHERE signal > 30 GROUP BY device_type |""".stripMargin) // 5. Start the streaming application and output the results to the console val query: StreamingQuery = resultStreamDF.writeStream .outputMode(OutputMode.Complete()) .foreachBatch((batchDF: DataFrame, batchId: Long) => { println("===========================================") println(s"BatchId = ${batchId}") println("===========================================") if (!batchDF.isEmpty) { batchDF.coalesce(1).show(20, truncate = false) } }) .start() query.awaitTermination() query.stop() } }
DSL style
According to the business requirements, the consumption log data from Kafka is analyzed by calling the function based on the DataFrame data structure. The code is as follows:
package cn.itcast.structedstreaming import org.apache.commons.lang3.StringUtils import org.apache.spark.SparkContext import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery} import org.apache.spark.sql.types.{DoubleType, LongType} import org.apache.spark.sql.{DataFrame, SparkSession} /** * Real time statistical analysis of IOT equipment status signal data: * 1),Equipment with signal strength greater than 30 * 2),Number of various equipment types * 3),Average signal strength of various equipment types */ object IotStreamingOnlineDSL { def main(args: Array[String]): Unit = { // 1. Build the SparkSession session instance object and set the attribute information val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[*]") .config("spark.sql.shuffle.partitions", "3") .getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") import org.apache.spark.sql.functions._ import spark.implicits._ // 2. Read data from Kafka, and the bottom layer adopts New Consumer API val iotStreamDF: DataFrame = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "node1:9092") .option("subscribe", "iotTopic") // Set the maximum value of consumption data per batch .option("maxOffsetsPerTrigger", "100000") .load() // 3. Analyze the acquired data and package it into DeviceData val etlStreamDF: DataFrame = iotStreamDF // Get the value of the value field and convert it to String type .selectExpr("CAST(value AS STRING)") // Convert data to Dataset .as[String] // The internal field name is value // Filter data .filter(StringUtils.isNotBlank(_)) // Parsing JSON data: {device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796} .select( get_json_object($"value", "$.device").as("device_id"), get_json_object($"value", "$.deviceType").as("device_type"), get_json_object($"value", "$.signal").cast(DoubleType).as("signal"), get_json_object($"value", "$.time").cast(LongType).as("time") ) // 4. Analyze and handle according to the business // Todo: signal > 30 all data are grouped according to equipment type, and the quantity and average signal strength are counted val resultStreamDF: DataFrame = etlStreamDF // Signal strength greater than 10 .filter($"signal" > 30) // Group by device type .groupBy($"device_type") // Count quantity and evaluate signal strength .agg( count($"device_type").as("count_device"), round(avg($"signal"), 2).as("avg_signal") ) // 5. Start the streaming application and output the results to the console val query: StreamingQuery = resultStreamDF.writeStream .outputMode(OutputMode.Complete()) .format("console") .option("numRows", "10") .option("truncate", "false") .start() query.awaitTermination() query.stop() } }