1. EXPERIMENTAL DATA
Links: https://pan.baidu.com/s/1CSgRQ8OXe50ya_DwkLX8yw
Extraction Code: kexq
2. Experimental Background
1.1 Experimental Background
(1) On-line monitoring of power equipment
On-line monitoring of power equipment refers to the continuous or periodic automatic monitoring and detection of the condition of power equipment without power outage. The technologies used include sensor technology, wide area communication technology and information processing technology.
On-line monitoring of power equipment is an important means to realize the status operation, maintenance and management of power equipment, and to improve the lean level of production and operation management. It has a positive and far-reaching significance to improve the level of power network intelligence and realize the status operation management of power equipment.
(2) Monitoring large data
With the advancement of smart grid construction, on-line monitoring of smart primary and conventional power equipment has been greatly developed and become a trend, and monitoring data has become increasingly large, which gradually constitutes large data for power equipment monitoring, which brings great technical challenges to the on-line monitoring system of power equipment in data storage and processing.
Large data for power equipment monitoring is characterized by large volume, many types, low value density and fast processing speed.The monitoring system of power grid company currently relies too much on centralized SAN storage and data integration based on SOA. It mainly uses "enterprise relational database". Constrained by capacity, scalability and access speed, it only stores "mature data" which is processed twice at present. However, the related query and transaction processing that it is good at are useless in data analysis. There is an urgent need for a new large numberData storage and processing techniques are used to cope with this.
(3) Typical large data computing scenarios
Batch calculation: It is characterized by regular occurrence, batch processing, large amount of data and poor real-time performance.Tools for batch computing include: Aliyun MaxCompute, Hadoop MapReduce, and so on.
Online computing: Its computing features are user triggered, frequent interaction, fast response.Online computing tools are: Ali cloud analysis database, etc.
Streaming computing: Its computing features are message triggering, single processing, real-time computing.Stream computing tools are: Ali cloud flow calculation, Apache Storm, and so on.
(4) Phase analysis of partial discharge
Partial discharge phase resolved partial discharge (PRPD) converts the local discharge parameters (discharge number N, apparent discharge Q or discharge amplitude, and the phase phi of the discharge) monitored in several power frequency cycles into one power frequency cycle, calculates its statistical regularity, obtains the discharge spectrum, and statistics the discharge characteristics, which are used for pattern recognition.
PD signal analysis mainly consists of three sub-processes: 1) Extraction of the basic parameters n-q-phi.Scanning PD signal, statistical peak discharge and corresponding discharge phase in the signal.2) Spectrum construction and statistical feature calculation.The phase windows are divided, the distribution of average discharge and number of discharges is counted, and the phase distribution spectrum of average discharge is calculated. 3) Discharge type identification.
Sk reflects how skewed the shape of the spectrum is relative to the shape of the normal distribution
The steepness Ku reflects the protrusion of the spectral shape relative to the normal distribution shape
(5) MaxCompute
MaxCompute (formerly ODPS) is a massive data processing platform provided by Ali Cloud.Mainly serves the storage and calculation of batch structured data at EB level.MaxCompute has been widely used in data warehouse and BI analysis of large Internet enterprises, log analysis of websites, transaction analysis of e-commerce websites, etc.The data in MaxCompute is organized by project, table, partition.
1) Project (Project Space):
Projects are the basic organizational unit of MaxCompute, similar to the concepts of Database or Schema in traditional databases, and are the primary boundaries for multiuser isolation and access control.A user can have permissions to multiple Project spaces at the same time. With security authorization, objects in another Project space can be accessed in one Project space, such as tables, resources, functions, and instances.
2) Table (Table)
A table is the data storage unit of the MaxCompute. It is also logically a two-dimensional structure consisting of rows and columns. Each row represents a record, and each column represents a field of the same data type. A record can contain one or more columns. The name and type of each column constitute the Schema of this table.
3) Partition (Partition)
When you create a table, you specify partition space, that is, you specify several fields in the table as partition columns.Partitions can be analogized to directories under the file system.MaxCompute treats each value of a partition column as a partition (directory), and the user can specify a multilevel partition, with multiple fields of the table as partitions of the table, just as the relationship between multilevel directories is.When using data, if you specify the partition name you want to access, only the corresponding partition will be read, avoiding full table scanning, improving processing efficiency and reducing costs.
4) Task (Task)
Task is the basic computing unit of MaxCompute.SQL and MapReduce functions are accomplished through tasks.Task includes: computational tasks: SQL DML statements, MapReduce; non-computational tasks: DDL in SQL (no execution plan).
5) Instance (Instance)
In MaxCompute, some tasks are instantiated when they are executed and exist as MaxCompute instances (hereinafter referred to as instances or Instances).Instances go through two phases: Running and Terminated.
(6)odpscmd
odpscmd is a Java program that provides command access to MaxCompute.With this client, you can complete a variety of tasks including data query, data upload, download and so on.A JRE environment is required to run. Please download and install JRE version 1.6+.
(7)MapReduce
MapReduce was originally proposed by Google as a distributed data processing model, and has since received extensive attention in the industry and been widely used in a variety of business scenarios.For example, search, Web access log analysis, text statistical analysis, mass data mining, machine learning, natural language processing, advertising recommendations, and so on.
MapReduce processes data in two main phases: the Map phase and the Reduce phase.The Map phase is executed first, then the Reduce phase.The processing logic of Map and Reduce is customized by the user, but is in accordance with the MapReduce framework.
II. EXPERIMENTAL STEPS
2.1 Apply to Aliyun MaxCompute
2.2 Create projects, create tables, and upload data
- Log in to the administration console and browse the MaxCompute project
According to the resources allocated by the experiment, login to the Ali Cloud Management Console with the assigned account and password.
Enter the Ali Cloud Management Console and select "Big Data (Number Plus)-->DataWorks" from the left navigation bar to enter the first page of DataWorks.Here you can see the MaxCompute project that has been created. - Install configuration odpscmd
Prepare your JRE environment locally, download and install version 1.6+ of JRE.
Download odpscmd tool from Ali Yun official website: https://github.com/aliyun/aliyun-odps-console/releases?spm=a2c4g.11186623.2.15.ed2fa95ecaTeNe
Unzip and configure <ODPS_CLIENT>/conf/odps_config.ini
project_name=[project_name] access_id=****************** access_key=********************* end_point=http://service.odps.aliyun.com/api tunnel_endpoint=http://dt.odps.aliyun.com log_view_host=http://logview.odps.aliyun.comhttps_check=true
- Create tables and add partitions
(1) Creating Discharge Spectrum Chart DW_PT
In odpscmd, execute the following SQL statement to build the table.
create table if not exists DW_PT( PID string, ---'Spectrogram ID' WID bigint, ---'window ID' TotalQ bigint, ---'Total discharge' AvgQ double, ---'Average discharge' Nums bigint, ---'Number of discharges' MaxV bigint) ---'Peak discharge' partitioned by (DeviceID string);
When "ok" appears, indicating successful table building, you can use the "ls tables;" command to view the created table.
Add partitions to the DW_PT table.
alter table DW_PT add if not exists partition(DeviceID='001'); You can use "show partitions DW_PT;" to verify that the partition you added was successful.
(2) Creating statistical feature tables
In odpscmd, execute the following SQL statement to build the table. CREATE TABLE IF NOT EXISTS DW_TJ ( PID STRING, ---'Spectrum ID' SKN DOUBLE, ---'Discharge Number Skew' SKQ DOUBLE, ---'Discharge Volume Skewing' KUN DOUBLE, -- -'steepness of discharge times' KUQ DOUBLE ---'Discharge steepness' ) PARTITIONED BY ( DeviceID STRING ); Add Partition alter table DW_TJ add if not exists partition(DeviceID='001');
- Data upload using Tunnel
In this experiment, spectral data (DW_PT) is used to calculate the statistical characteristic data (DW_TJ).The user can upload the spectral data provided by the experiment directly and use it for calculation.The spectral data file is pt.csv in the attachment.
Run the tunnel command in odpscmd to upload the local data file pt.csv to the DW_PT table.The path in the command below, when executed, is modified based on the actual path.
tunnel upload d:/Clouder/jfdata/pt.csv DW_PT/deviceid='001';
2.3 MapReduce Program Development, Local Debugging and Running
- Local Development Environment Preparation
This experiment uses Eclipse as the development environment, please download and install it in advance.
Find and download the ODPS for eclipse Plug-in in the official navigation, and unzip and copy it to the plugins subdirectory under the Eclipse installation directory.Start Eclipse and check if there is an ODPS directory in the Wizard option.
ODPS for eclipse plug-in download address:
https://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/cn/odps/0.0.90/assets/download/odps-eclipse-plugin-bundle-0.16.0.zip?spm=5176.doc27981.2.3.cCapmQ&file=odps-eclipse-plugin-bundle-0.16.0.zip
When an ODPS-type project can be created, the local development environment is ready. - MapReduce Program Development
Create an ODPS project in Eclipse named TJ.For Eclipse to access MaxCompute correctly, the odpscmd's local path needs to be configured correctly when the project is created.
Add Mapper classes, Reducer classes, MapReduce Driver classes, and R classes in turn.
The TJMapper.java code is as follows:
import java.io.IOException; import com.aliyun.odps.data.Record; import com.aliyun.odps.mapred.MapperBase; public class TJMapper extends MapperBase { private Record val; private Record k; @Override public void setup(TaskContext context) throws IOException { k=context.createMapOutputKeyRecord(); val=context.createMapOutputValueRecord(); } @Override public void map(long recordNum, Record record, TaskContext context) throws IOException { String pid= record.getString("pid"); k.set(new Object[] {pid}); val=record; context.write(k,val); } @Override public void cleanup(TaskContext context) throws IOException { } }
The TJReducer.java code is as follows:
import java.io.IOException; import java.util.Iterator; import java.util.*; import com.aliyun.odps.data.Record; import com.aliyun.odps.mapred.ReducerBase; public class TJReducer extends ReducerBase { private Record result; private long alln=0;//Total number of discharges private long allq=0;//Total discharge private List<R> recs; @Override public void setup(TaskContext context) throws IOException { result=context.createOutputRecord(); recs=new ArrayList<R>(); } @Override public void reduce(Record key, Iterator<Record> values, TaskContext context)throws IOException { float a_n=0;//Mean Number of Times float a_q=0;//Mean Discharge Volume float f_n=0;//Number Variance float f_q=0;//Discharge Variance while (values.hasNext()) { Record temp=values.next(); R r=new R(); r.nums=temp.getBigint("nums"); r.avgq=temp.getDouble("avgq"); r.totalq=temp.getBigint("totalq"); r.max=temp.getBigint("maxv"); r.wid=temp.getBigint("wid"); recs.add(r); alln=alln+temp.getBigint("nums"); allq=allq+temp.getBigint("totalq"); } //Mean calculation for(int i=0;i<recs.size();i++) { recs.get(i).n_p=(float)recs.get(i).nums/alln; recs.get(i).q_p=(float)recs.get(i).totalq/allq; a_n=a_n+recs.get(i).n_p*recs.get(i).wid; a_q=a_q+recs.get(i).q_p*recs.get(i).wid; } //Variance calculation for(int i=0;i<recs.size();i++) { float a=recs.get(i).n_p*(float)Math.pow(recs.get(i).wid-a_n, 2); f_n=f_n+a; float b=recs.get(i).q_p*(float)Math.pow(recs.get(i).wid-a_q, 2); f_q=f_q+b; } f_n=(float)Math.sqrt((double)f_n); f_q=(float)Math.sqrt((double)f_q); //Molecule of sk formula float ln=0; float lq=0; //Calculate sk for(int i=0;i<recs.size();i++) { float a=recs.get(i).n_p*(float)Math.pow(recs.get(i).wid-a_n, 3); ln=ln+a; float b=recs.get(i).q_p*(float)Math.pow(recs.get(i).wid-a_q, 3); lq=lq+b; } double sk_n=ln/Math.pow(f_n, 3); double sk_q=lq/Math.pow(f_q, 3); //Add Result Set result.set("pid", key.getString(0)); result.set("skn", sk_n); result.set("skq", sk_q); //Reset Temporary Cumulative Variables ln=0; lq=0; //Calculate positive semiaxis ku for(int i=0;i<recs.size();i++) { float a=recs.get(i).n_p*(float)Math.pow(recs.get(i).wid-a_n, 4); ln=ln+a; float b=recs.get(i).q_p*(float)Math.pow(recs.get(i).wid-a_q, 4); lq=lq+b; } double ku_n=ln/Math.pow(f_n, 4)-3; double ku_q=lq/Math.pow(f_q, 4)-3; //Add Result Set result.set("kun", ku_n); result.set("kuq", ku_q); context.write(result); } @Override public void cleanup(TaskContext context) throws IOException { } }
The TJDriver.java code is as follows:
import com.aliyun.odps.OdpsException; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.mapred.JobClient; import com.aliyun.odps.mapred.RunningJob; import com.aliyun.odps.mapred.conf.JobConf; import com.aliyun.odps.mapred.utils.InputUtils; import com.aliyun.odps.mapred.utils.OutputUtils; import com.aliyun.odps.mapred.utils.SchemaUtils; public class TJDriver { public static void main(String[] args) throws OdpsException { JobConf job = new JobConf(); // TODO: specify map output types job.setMapOutputKeySchema(SchemaUtils.fromString("pid:string")); job.setMapOutputValueSchema(SchemaUtils.fromString("pid:string,wid:bigint,totalq:bigint,avgq:double,nums:bigint,maxv:bigint")); // TODO: specify input and output tables InputUtils.addTable(TableInfo.builder().tableName(args[0]).partSpec("deviceid=001").build(),job); OutputUtils.addTable(TableInfo.builder().tableName(args[1]).partSpec("deviceid=001").build(),job); // TODO: specify a mapper job.setMapperClass(TJMapper.class); // TODO: specify a reducer job.setReducerClass(TJReducer.class); RunningJob rj = JobClient.runJob(job); rj.waitForCompletion(); } }
The code for R.java is as follows:
public class R { public long wid;//Window ID public long totalq;//Total discharge of a window public double avgq;//Average discharge of a window public long nums;//Number of discharges in a window public long max;//Maximum discharge of a window public float n_p; //Probability of number of discharges for a window public float q_p;//Discharge probability of a window public R(){ } }
- Local Test
Open TJDriver.java and right-click "Run as-A Run Configurations"
On the ODPS Config tab, select the correct ODPS project.
On the Arguments tab, enter the run parameters: dw_pt dw_tj, and click Run to execute the local test run.
On the first run, Eclipse downloads a small amount of test data from MaxCompute for testing.After running, you can see the input data for the test and the resulting data in Warehouse. - Package and upload resources
Once the local test results are correct, you can export the jar package.Execute File Export under Eclipse, select Export JAR File, and export locally.
Under odpscmd, execute the command to add resources and upload jar to MaxCompute.
add jar d:/Clouder/jar/TJ.jar; - Execute program on MaxCompute Under odpscmd, execute the jar command and run the program. jar -resources TJ.jar -classpath d:\Clouder\jar\TJ.jar TJDriver dw_pt dw_tj;
homework
3.1 Homework
The traditional MapReduce model requires that after each MapReduce operation, the data must fall on a distributed file system (such as an HDFS or MaxCompute table).In general, MapReduce applications consist of multiple MapReduce jobs, each of which needs to be written to disk after completion. The next Map task, in many cases, only reads the data once to prepare for the subsequent Huffle phase, which actually results in redundant IO operations.MaxCompute's compute scheduling logic can support more complex programming models, in which case the next Reeduce operation can be performed directly after Reduce without the need to insert a Map operation in between.Based on this, MaxCompute provides an extended MapReduce model to support connecting any number of Reduce operations after a Map, such as Map->Reduce->Reduce.
The homework of this course requires that the calculation process of spectral analysis and statistical characteristics be merged, and implemented by Map-Reduce-Reduce, which reads the table once and keeps the intermediate results in memory, producing the dw_tj result table directly.You can use pipeline to connect multiple Reduces.
The reference code for MapReduce Driver is as follows:
import com.aliyun.odps.OdpsException; import com.aliyun.odps.OdpsType; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.pipeline.Pipeline; import com.aliyun.odps.Column; import com.aliyun.odps.mapred.Job; public class TJDriver { public static void main(String[] args) throws OdpsException { Job job = new Job(); Pipeline pipeline = Pipeline.builder() .addMapper(PTMapper.class) .setOutputKeySchema( new Column[] { new Column("wid", OdpsType.BIGINT) }) .setOutputValueSchema( new Column[] { new Column("PID", OdpsType.STRING), new Column("wid", OdpsType.BIGINT), new Column("maxv", OdpsType.BIGINT) }) .setOutputKeySortColumns(new String[] { "wid" }) .setPartitionColumns(new String[] { "wid" }) .setOutputGroupingColumns(new String[] { "wid" }) .addReducer(PTRduecer.class) .setOutputKeySchema( new Column[] { new Column("pid", OdpsType.STRING) }) .setOutputValueSchema( new Column[] { new Column("wid", OdpsType.BIGINT), new Column("totalq", OdpsType.BIGINT), new Column("avgq", OdpsType.BIGINT), new Column("nums", OdpsType.BIGINT), new Column("maxv", OdpsType.BIGINT)}) .addReducer(TJReducer.class).createPipeline(); // TODO: specify map output types job.setPipeline(pipeline); job.addInput(TableInfo.builder().tableName(args[0]).build()); job.addOutput(TableInfo.builder().tableName(args[1]).build()); job.submit(); job.waitForCompletion(); System.exit(job.isSuccessful() == true ? 0 : 1); } }