Complete experiment: statistical feature calculation of monitoring data based on spectral data

1. EXPERIMENTAL DATA

Links: https://pan.baidu.com/s/1CSgRQ8OXe50ya_DwkLX8yw
Extraction Code: kexq

2. Experimental Background

1.1 Experimental Background

(1) On-line monitoring of power equipment On-line monitoring of power equipment refers to the continuous or periodic automatic monitoring and detection of the condition of power equipment without power outage. The technologies used include sensor technology, wide area communication technology and information processing technology. On-line monitoring of power equipment is an important means to realize the status operation, maintenance and management of power equipment, and to improve the lean level of production and operation management. It has a positive and far-reaching significance to improve the level of power network intelligence and realize the status operation management of power equipment.
(2) Monitoring large data
With the advancement of smart grid construction, on-line monitoring of smart primary and conventional power equipment has been greatly developed and become a trend, and monitoring data has become increasingly large, which gradually constitutes large data for power equipment monitoring, which brings great technical challenges to the on-line monitoring system of power equipment in data storage and processing.
Large data for power equipment monitoring is characterized by large volume, many types, low value density and fast processing speed.The monitoring system of power grid company currently relies too much on centralized SAN storage and data integration based on SOA. It mainly uses "enterprise relational database". Constrained by capacity, scalability and access speed, it only stores "mature data" which is processed twice at present. However, the related query and transaction processing that it is good at are useless in data analysis. There is an urgent need for a new large numberData storage and processing techniques are used to cope with this.
(3) Typical large data computing scenarios
Batch calculation: It is characterized by regular occurrence, batch processing, large amount of data and poor real-time performance.Tools for batch computing include: Aliyun MaxCompute, Hadoop MapReduce, and so on.
Online computing: Its computing features are user triggered, frequent interaction, fast response.Online computing tools are: Ali cloud analysis database, etc.
Streaming computing: Its computing features are message triggering, single processing, real-time computing.Stream computing tools are: Ali cloud flow calculation, Apache Storm, and so on.
(4) Phase analysis of partial discharge
Partial discharge phase resolved partial discharge (PRPD) converts the local discharge parameters (discharge number N, apparent discharge Q or discharge amplitude, and the phase phi of the discharge) monitored in several power frequency cycles into one power frequency cycle, calculates its statistical regularity, obtains the discharge spectrum, and statistics the discharge characteristics, which are used for pattern recognition.
PD signal analysis mainly consists of three sub-processes: 1) Extraction of the basic parameters n-q-phi.Scanning PD signal, statistical peak discharge and corresponding discharge phase in the signal.2) Spectrum construction and statistical feature calculation.The phase windows are divided, the distribution of average discharge and number of discharges is counted, and the phase distribution spectrum of average discharge is calculated. 3) Discharge type identification.
Sk reflects how skewed the shape of the spectrum is relative to the shape of the normal distribution
The steepness Ku reflects the protrusion of the spectral shape relative to the normal distribution shape
(5) MaxCompute MaxCompute (formerly ODPS) is a massive data processing platform provided by Ali Cloud.Mainly serves the storage and calculation of batch structured data at EB level.MaxCompute has been widely used in data warehouse and BI analysis of large Internet enterprises, log analysis of websites, transaction analysis of e-commerce websites, etc.The data in MaxCompute is organized by project, table, partition.
1) Project (Project Space):
Projects are the basic organizational unit of MaxCompute, similar to the concepts of Database or Schema in traditional databases, and are the primary boundaries for multiuser isolation and access control.A user can have permissions to multiple Project spaces at the same time. With security authorization, objects in another Project space can be accessed in one Project space, such as tables, resources, functions, and instances.
2) Table (Table)
A table is the data storage unit of the MaxCompute. It is also logically a two-dimensional structure consisting of rows and columns. Each row represents a record, and each column represents a field of the same data type. A record can contain one or more columns. The name and type of each column constitute the Schema of this table.
3) Partition (Partition)
When you create a table, you specify partition space, that is, you specify several fields in the table as partition columns.Partitions can be analogized to directories under the file system.MaxCompute treats each value of a partition column as a partition (directory), and the user can specify a multilevel partition, with multiple fields of the table as partitions of the table, just as the relationship between multilevel directories is.When using data, if you specify the partition name you want to access, only the corresponding partition will be read, avoiding full table scanning, improving processing efficiency and reducing costs.
4) Task (Task)
Task is the basic computing unit of MaxCompute.SQL and MapReduce functions are accomplished through tasks.Task includes: computational tasks: SQL DML statements, MapReduce; non-computational tasks: DDL in SQL (no execution plan).
5) Instance (Instance)
In MaxCompute, some tasks are instantiated when they are executed and exist as MaxCompute instances (hereinafter referred to as instances or Instances).Instances go through two phases: Running and Terminated.
(6)odpscmd
odpscmd is a Java program that provides command access to MaxCompute.With this client, you can complete a variety of tasks including data query, data upload, download and so on.A JRE environment is required to run. Please download and install JRE version 1.6+.
(7)MapReduce
MapReduce was originally proposed by Google as a distributed data processing model, and has since received extensive attention in the industry and been widely used in a variety of business scenarios.For example, search, Web access log analysis, text statistical analysis, mass data mining, machine learning, natural language processing, advertising recommendations, and so on.
MapReduce processes data in two main phases: the Map phase and the Reduce phase.The Map phase is executed first, then the Reduce phase.The processing logic of Map and Reduce is customized by the user, but is in accordance with the MapReduce framework.

II. EXPERIMENTAL STEPS

2.1 Apply to Aliyun MaxCompute

2.2 Create projects, create tables, and upload data

  1. Log in to the administration console and browse the MaxCompute project
    According to the resources allocated by the experiment, login to the Ali Cloud Management Console with the assigned account and password.
    Enter the Ali Cloud Management Console and select "Big Data (Number Plus)-->DataWorks" from the left navigation bar to enter the first page of DataWorks.Here you can see the MaxCompute project that has been created.
  2. Install configuration odpscmd
    Prepare your JRE environment locally, download and install version 1.6+ of JRE.
    Download odpscmd tool from Ali Yun official website: https://github.com/aliyun/aliyun-odps-console/releases?spm=a2c4g.11186623.2.15.ed2fa95ecaTeNe
    Unzip and configure <ODPS_CLIENT>/conf/odps_config.ini
project_name=[project_name]
access_id=******************
access_key=*********************
end_point=http://service.odps.aliyun.com/api
tunnel_endpoint=http://dt.odps.aliyun.com
log_view_host=http://logview.odps.aliyun.comhttps_check=true
  1. Create tables and add partitions (1) Creating Discharge Spectrum Chart DW_PT
    In odpscmd, execute the following SQL statement to build the table.
create table if not exists DW_PT(
       PID     string,    ---'Spectrogram ID'
       WID     bigint,  ---'window ID'
       TotalQ     bigint,   ---'Total discharge'
       AvgQ      double,   ---'Average discharge'
       Nums     bigint,   ---'Number of discharges'
       MaxV    bigint)   ---'Peak discharge'
partitioned by (DeviceID string);

When "ok" appears, indicating successful table building, you can use the "ls tables;" command to view the created table.
Add partitions to the DW_PT table.

alter table DW_PT add if not exists partition(DeviceID='001');
You can use "show partitions DW_PT;" to verify that the partition you added was successful.

(2) Creating statistical feature tables

In odpscmd, execute the following SQL statement to build the table.
CREATE TABLE IF NOT EXISTS DW_TJ (
         PID STRING, ---'Spectrum ID'
         SKN DOUBLE, ---'Discharge Number Skew'
         SKQ DOUBLE, ---'Discharge Volume Skewing'
         KUN DOUBLE, -- -'steepness of discharge times'
        KUQ DOUBLE ---'Discharge steepness'
)
PARTITIONED BY (
         DeviceID STRING
);
Add Partition
alter table DW_TJ add if not exists partition(DeviceID='001');
  1. Data upload using Tunnel In this experiment, spectral data (DW_PT) is used to calculate the statistical characteristic data (DW_TJ).The user can upload the spectral data provided by the experiment directly and use it for calculation.The spectral data file is pt.csv in the attachment.
    Run the tunnel command in odpscmd to upload the local data file pt.csv to the DW_PT table.The path in the command below, when executed, is modified based on the actual path.
tunnel upload d:/Clouder/jfdata/pt.csv DW_PT/deviceid='001';

2.3 MapReduce Program Development, Local Debugging and Running

  1. Local Development Environment Preparation This experiment uses Eclipse as the development environment, please download and install it in advance.
    Find and download the ODPS for eclipse Plug-in in the official navigation, and unzip and copy it to the plugins subdirectory under the Eclipse installation directory.Start Eclipse and check if there is an ODPS directory in the Wizard option.
    ODPS for eclipse plug-in download address:
    https://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/cn/odps/0.0.90/assets/download/odps-eclipse-plugin-bundle-0.16.0.zip?spm=5176.doc27981.2.3.cCapmQ&file=odps-eclipse-plugin-bundle-0.16.0.zip
    When an ODPS-type project can be created, the local development environment is ready.
  2. MapReduce Program Development
    Create an ODPS project in Eclipse named TJ.For Eclipse to access MaxCompute correctly, the odpscmd's local path needs to be configured correctly when the project is created.
    Add Mapper classes, Reducer classes, MapReduce Driver classes, and R classes in turn.

The TJMapper.java code is as follows:

import java.io.IOException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.MapperBase;

public class TJMapper extends MapperBase {
         private Record val;
         private Record k;

         @Override
         public void setup(TaskContext context) throws IOException {
                   k=context.createMapOutputKeyRecord();
                   val=context.createMapOutputValueRecord();
         }
         @Override
         public void map(long recordNum, Record record, TaskContext context) throws IOException {
                   String pid= record.getString("pid");                 
                   k.set(new Object[] {pid});                                    
                   val=record;                                  
                   context.write(k,val);
         }
         @Override
         public void cleanup(TaskContext context) throws IOException {
         }
}

The TJReducer.java code is as follows:

import java.io.IOException;
import java.util.Iterator;
import java.util.*;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.ReducerBase;

public class TJReducer extends ReducerBase {

         private Record result;
         private long alln=0;//Total number of discharges  
         private long allq=0;//Total discharge                
         private List<R> recs;

         @Override
         public void setup(TaskContext context) throws IOException {
                   result=context.createOutputRecord();
                   recs=new ArrayList<R>();

         }


         @Override
         public void reduce(Record key, Iterator<Record> values, TaskContext context)throws IOException {
                   float a_n=0;//Mean Number of Times          
                   float a_q=0;//Mean Discharge Volume                        
                   float f_n=0;//Number Variance           
                   float f_q=0;//Discharge Variance                

                   while (values.hasNext()) {
                            Record temp=values.next();
                            R r=new R();
                       r.nums=temp.getBigint("nums");
                            r.avgq=temp.getDouble("avgq");
                            r.totalq=temp.getBigint("totalq");
                            r.max=temp.getBigint("maxv");
                            r.wid=temp.getBigint("wid");                   
                            recs.add(r);
                            alln=alln+temp.getBigint("nums");
                            allq=allq+temp.getBigint("totalq");                 
                   }

                   //Mean calculation
                   for(int i=0;i<recs.size();i++)
                   {
                            recs.get(i).n_p=(float)recs.get(i).nums/alln;
                            recs.get(i).q_p=(float)recs.get(i).totalq/allq;
                            a_n=a_n+recs.get(i).n_p*recs.get(i).wid;
                            a_q=a_q+recs.get(i).q_p*recs.get(i).wid;                        
                   }

                   //Variance calculation
                   for(int i=0;i<recs.size();i++)
                   {
                            float a=recs.get(i).n_p*(float)Math.pow(recs.get(i).wid-a_n, 2);                   
                            f_n=f_n+a;
                            float b=recs.get(i).q_p*(float)Math.pow(recs.get(i).wid-a_q, 2); 
                            f_q=f_q+b;                                   
                   }

                   f_n=(float)Math.sqrt((double)f_n);
                   f_q=(float)Math.sqrt((double)f_q);         

                   //Molecule of sk formula
                   float ln=0;
                   float lq=0;


                   //Calculate sk
                   for(int i=0;i<recs.size();i++)
                   {
                            float a=recs.get(i).n_p*(float)Math.pow(recs.get(i).wid-a_n, 3);                   
                            ln=ln+a;
                            float b=recs.get(i).q_p*(float)Math.pow(recs.get(i).wid-a_q, 3); 
                            lq=lq+b;                               
                   }

                   double sk_n=ln/Math.pow(f_n, 3);
                   double sk_q=lq/Math.pow(f_q, 3);


                   //Add Result Set
                   result.set("pid", key.getString(0));
                   result.set("skn", sk_n);
                   result.set("skq", sk_q);

                   //Reset Temporary Cumulative Variables
                   ln=0;
                   lq=0;
                   //Calculate positive semiaxis ku
                   for(int i=0;i<recs.size();i++)
                   {
                            float a=recs.get(i).n_p*(float)Math.pow(recs.get(i).wid-a_n, 4);                   
                            ln=ln+a;
                            float b=recs.get(i).q_p*(float)Math.pow(recs.get(i).wid-a_q, 4); 
                            lq=lq+b;                               
                   }
                   double ku_n=ln/Math.pow(f_n, 4)-3;
                   double ku_q=lq/Math.pow(f_q, 4)-3;      
                   //Add Result Set          
                   result.set("kun", ku_n);
                   result.set("kuq", ku_q);           
                   context.write(result);     
         }
         @Override
         public void cleanup(TaskContext context) throws IOException {
         }
}

The TJDriver.java code is as follows:

import com.aliyun.odps.OdpsException;

import com.aliyun.odps.data.TableInfo;

import com.aliyun.odps.mapred.JobClient;

import com.aliyun.odps.mapred.RunningJob;

import com.aliyun.odps.mapred.conf.JobConf;

import com.aliyun.odps.mapred.utils.InputUtils;

import com.aliyun.odps.mapred.utils.OutputUtils;

import com.aliyun.odps.mapred.utils.SchemaUtils;

public class TJDriver {

         public static void main(String[] args) throws OdpsException {

                   JobConf job = new JobConf();

                   // TODO: specify map output types

                   job.setMapOutputKeySchema(SchemaUtils.fromString("pid:string"));

job.setMapOutputValueSchema(SchemaUtils.fromString("pid:string,wid:bigint,totalq:bigint,avgq:double,nums:bigint,maxv:bigint"));

                   // TODO: specify input and output tables

         InputUtils.addTable(TableInfo.builder().tableName(args[0]).partSpec("deviceid=001").build(),job);

         OutputUtils.addTable(TableInfo.builder().tableName(args[1]).partSpec("deviceid=001").build(),job);

                   // TODO: specify a mapper

                   job.setMapperClass(TJMapper.class);

                   // TODO: specify a reducer

                   job.setReducerClass(TJReducer.class);

                   RunningJob rj = JobClient.runJob(job);

                   rj.waitForCompletion();

         }

}

The code for R.java is as follows:

public class R {

         public long wid;//Window ID
         public long totalq;//Total discharge of a window
         public double avgq;//Average discharge of a window
         public long nums;//Number of discharges in a window
         public long max;//Maximum discharge of a window
         public float n_p;       //Probability of number of discharges for a window
         public float q_p;//Discharge probability of a window
         public R(){                                    
         }
}
  1. Local Test
    Open TJDriver.java and right-click "Run as-A Run Configurations"
    On the ODPS Config tab, select the correct ODPS project.
    On the Arguments tab, enter the run parameters: dw_pt dw_tj, and click Run to execute the local test run.
    On the first run, Eclipse downloads a small amount of test data from MaxCompute for testing.After running, you can see the input data for the test and the resulting data in Warehouse.
  2. Package and upload resources Once the local test results are correct, you can export the jar package.Execute File Export under Eclipse, select Export JAR File, and export locally.
    Under odpscmd, execute the command to add resources and upload jar to MaxCompute.
    add jar d:/Clouder/jar/TJ.jar;
  3. Execute program on MaxCompute Under odpscmd, execute the jar command and run the program. jar -resources TJ.jar -classpath d:\Clouder\jar\TJ.jar TJDriver dw_pt dw_tj;

homework

3.1 Homework The traditional MapReduce model requires that after each MapReduce operation, the data must fall on a distributed file system (such as an HDFS or MaxCompute table).In general, MapReduce applications consist of multiple MapReduce jobs, each of which needs to be written to disk after completion. The next Map task, in many cases, only reads the data once to prepare for the subsequent Huffle phase, which actually results in redundant IO operations.MaxCompute's compute scheduling logic can support more complex programming models, in which case the next Reeduce operation can be performed directly after Reduce without the need to insert a Map operation in between.Based on this, MaxCompute provides an extended MapReduce model to support connecting any number of Reduce operations after a Map, such as Map->Reduce->Reduce.
The homework of this course requires that the calculation process of spectral analysis and statistical characteristics be merged, and implemented by Map-Reduce-Reduce, which reads the table once and keeps the intermediate results in memory, producing the dw_tj result table directly.You can use pipeline to connect multiple Reduces.

The reference code for MapReduce Driver is as follows:

import com.aliyun.odps.OdpsException;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.pipeline.Pipeline;
import com.aliyun.odps.Column;
import com.aliyun.odps.mapred.Job;

public class TJDriver {
	public static void main(String[] args) throws OdpsException {
		Job job = new Job();
		
		Pipeline pipeline = Pipeline.builder()
	            .addMapper(PTMapper.class)
	            .setOutputKeySchema(
	                    new Column[] { new Column("wid", OdpsType.BIGINT) })
	            .setOutputValueSchema(
	                    new Column[] { new Column("PID", OdpsType.STRING), new Column("wid", OdpsType.BIGINT), new Column("maxv", OdpsType.BIGINT) })
	            .setOutputKeySortColumns(new String[] { "wid" })
	            .setPartitionColumns(new String[] { "wid" })
	            .setOutputGroupingColumns(new String[] { "wid" })

	            .addReducer(PTRduecer.class)
	            .setOutputKeySchema(
	                    new Column[] { new Column("pid", OdpsType.STRING) })
	            .setOutputValueSchema(
	                    new Column[] { new Column("wid", OdpsType.BIGINT), new Column("totalq", OdpsType.BIGINT), new Column("avgq", OdpsType.BIGINT), new Column("nums", OdpsType.BIGINT), new Column("maxv", OdpsType.BIGINT)})
	            .addReducer(TJReducer.class).createPipeline();

		// TODO: specify map output types
		job.setPipeline(pipeline);
		job.addInput(TableInfo.builder().tableName(args[0]).build());
        job.addOutput(TableInfo.builder().tableName(args[1]).build());
        job.submit();
        job.waitForCompletion();
        System.exit(job.isSuccessful() == true ? 0 : 1);
		
	}
}

Keywords: Big Data Eclipse Java SQL Database

Added by nuxy on Fri, 01 May 2020 11:58:03 +0300