KKK-Mean Clustering of MapReduce (End)
In the last blog K-Mean Clustering of MapReduce (I) In this paper, the basic principle of KKK-means clustering algorithm is introduced, and then how to use MapReduce to implement KKK-means clustering algorithm is described.
MapReduce solution
The MapReduce solution of KKK-means clustering is an iteration scheme, in which each iteration implements a MapReduce job. Therefore, an iteration version of MapReduce job needs to be created.
- Use the control program to initialize the location of KKK centroid, iteratively invoke the MapReduce job, and determine whether the iteration should continue or stop.
- Mappers need to obtain data points and all cluster centroids, where cluster centers must be shared by all mappers
- Stop when the center of mass does not change or the change is less than the threshold.
The input data are as follows
Using two-dimensional coordinates to simulate the input data of the algorithm
1.0 2.0
1.0 3.0
1.0 4.0
2.0 5.0
2.0 6.0
2.0 7.0
2.0 8.0
3.0 100.0
3.0 101.0
3.0 102.0
3.0 103.0
3.0 104.0
Mapper phase tasks
There are two tasks at this stage.
- 1. Use setup() method to read cluster centroid into memory from SequenceFile
- Iterative processing corresponds to the centroid of each cluster of input key-value pairs
- Calculate the Euclidean distance and save the center of mass closest to the input point
- Write out the key-value pairs to be processed by the reductor, and the focus key is the cluster centroid nearest to the input point.
Mapper phase coding
package com.deng.Kmeans; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.List; public class KmeansMapper extends Mapper<LongWritable, Text,PointWritable,PointWritable> { private List<PointWritable> centers=null; private static List<PointWritable> readCentersFromSequenceFile() throws IOException { List<PointWritable> points=KmeansUtil.readFromHDFS("Sequence/read"); // System.out.println("points is "+points.toString()); return points; } //Reading Cluster Centroid from File System public void setup(Context context) throws IOException { this.centers=readCentersFromSequenceFile(); } public void map(LongWritable key, Text value, Context context){ PointWritable v=new PointWritable(value.toString()); PointWritable nearest=null; double nearestDistance=Double.MAX_VALUE; for(PointWritable center:centers){ double distance=KmeansUtil.calculate(center,v); if(nearest==null){ nearest=center; nearestDistance=distance; }else{ if(nearestDistance>distance){ nearest=center; nearestDistance=distance; } } } try { context.write(nearest,v); System.out.println("Mapper key is "+nearest); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
Custom class PointWritable
Custom Point Writable is as follows
package com.deng.Kmeans; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class PointWritable implements Writable, WritableComparable<PointWritable> { private double x; private double y; private Integer id; public PointWritable(){ this.id=0; } public PointWritable(double x,double y){ this.x=x; this.y=y; this.id=0; } public PointWritable(String s){ set(s); } public void set(String s){ String[] num=s.split(" "); if(num.length==3){ if(!Character.isDigit(num[1].charAt(0))){ num[1]=num[1].substring(1); } if(!Character.isDigit(num[2].charAt(num[2].length()-1))){ num[2]=num[2].substring(0,num[2].length()-1); } this.x=Double.parseDouble(num[1]); this.y=Double.parseDouble(num[2]); this.id=Integer.parseInt(num[0]); }else if(num.length==2){ if(!Character.isDigit(num[0].charAt(0))){ num[0]=num[0].substring(1); } if(!Character.isDigit(num[1].charAt(num[1].length()-1))){ num[1]=num[1].substring(0,num[1].length()-1); } this.x=Double.parseDouble(num[0]); this.y=Double.parseDouble(num[1]); this.id=0; } System.out.println(toString()); } public double getX() { return x; } public double getY() { return y; } public Integer getId() { return id; } public void setId(int id){ this.id=id; } public void add(PointWritable o){ System.out.println(" when add operator , o is "+o); this.x+=o.getX(); this.y+=o.getY(); System.out.println("the result is "+toString()); } public void divide(int count){ this.x/=count; this.y/=count; } @Override public int compareTo(PointWritable o) { if(this.getId().compareTo(o.getId())!=0){ return this.getId().compareTo(o.getId()); }else return 0; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(this.id); dataOutput.writeDouble(this.x); dataOutput.writeDouble(this.y); } @Override public void readFields(DataInput dataInput) throws IOException { this.id=dataInput.readInt(); this.x=dataInput.readDouble(); this.y=dataInput.readDouble(); } public String toString(){ StringBuilder sb=new StringBuilder(); sb.append(getId()).append(" ").append(getX()). append(" ").append(getY()); return sb.toString(); } }
Tool class KmeansUtil
The KmeansUtil tool class is as follows:
package com.deng.Kmeans; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; import org.junit.Test; import java.io.*; import java.net.URI; import java.util.ArrayList; import java.util.List; public class KmeansUtil { //Initialize the cluster center and read the first K digits directly as the center of mass public static List<PointWritable> pick(int k, String p) throws IOException { System.out.println("init k"); BufferedReader br=new BufferedReader(new FileReader(p)); List<PointWritable> point=new ArrayList<>(); String str; int t=0; System.out.println("k is "+k); while ((str=br.readLine())!=null&&t<k){ System.out.println(str); point.add(new PointWritable(str)); t++; System.out.println(point.get(t-1)); } System.out.println(point.size()); return point; } //Write the cluster center to the file system public static void writeToHDFS(List<PointWritable> point,String p) throws IOException { Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(URI.create(p),conf); Path path=new Path(p); int t=0; NullWritable key= NullWritable.get(); PointWritable value=new PointWritable(); SequenceFile.Writer writer=null; try{ writer=SequenceFile.createWriter(fs,conf,path,key.getClass(),value.getClass()); System.out.println(point.size()); for(int i=0;i<point.size();i++){ t++; point.get(i).setId(t); value=new PointWritable(point.get(i).toString()); writer.append(key,value); } }catch (NullPointerException e){ e.printStackTrace(); } finally { IOUtils.closeStream(writer); } } //Reading Cluster Center from File System public static List<PointWritable> readFromHDFS(String p) throws IOException { Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(URI.create(p),conf); Path path=new Path(p); List<PointWritable> points = new ArrayList<>(); SequenceFile.Reader reader=null; try{ reader=new SequenceFile.Reader(fs,path,conf); Writable key=(Writable) ReflectionUtils.newInstance(reader.getKeyClass(),conf); Writable value=(Writable) ReflectionUtils.newInstance(reader.getValueClass(),conf); long positon=reader.getPosition(); while(reader.next(key,value)){ PointWritable test=new PointWritable(value.toString()); System.out.println("test is "+test); points.add(test); positon=reader.getPosition(); } }finally { IOUtils.closeStream(reader); } System.out.println("reader from HDFS accomplishment"); return points; } public static double calculate(PointWritable a,PointWritable b){ return Math.sqrt((a.getX()-b.getX())*(a.getX()-b.getX())+ (a.getY()-b.getY())*(a.getY()-b.getY())); } public static double sumOfDistance(List<PointWritable> point){ double sum=0; for(int i=0;i< point.size();i++){ for(int j=0;j<point.size();j++){ if(i==j) continue; sum+=calculate(point.get(i),point.get(j)); } } return sum; } public static double change(List<PointWritable> a,List<PointWritable> b){ return Math.abs(sumOfDistance(a)-sumOfDistance(b)); } //Write the new cluster center generated by MapReduce calculation into the file system for next reading public static void reWriteHDFS(String from,String to) throws IOException { System.out.println("now ,this operation is rewrite to HDFS, the File directory is "+from); List<PointWritable> points=readFromHDFS(from); System.out.println("when rewrite from HDFS ,the centers is "+points.toString()); writeToHDFS(points,to); } }
Reducer phase tasks
In this stage, the task is to recalculate the average value of all clusters, and then recreate the centroid of all clusters. When the calculation is completed, the key is the new cluster centroid and is written into SequenceFile.
Reducer stage coding
package com.deng.Kmeans; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class KmeansReducer extends Reducer<PointWritable,PointWritable, NullWritable,PointWritable> { public Integer id=0; public void reduce(PointWritable key,Iterable<PointWritable> values,Context context){ PointWritable newCenter=new PointWritable(); int count=0; for(PointWritable p:values){ count++; newCenter.add(p); } System.out.println("before divide operator ,the newCenter is "+newCenter+" the count is "+count); newCenter.divide(count); newCenter.setId(++id); System.out.println("after divide operator , the newCenter is "+newCenter); try { context.write(NullWritable.get(),newCenter); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
Driver program
The driver for KKK-means clustering includes not only MapReduce jobs, but also the following parts
- Initialize cluster centroid
- Preserving the current cluster centroid and the new cluster centroid after MapReduce calculation
- If the difference between the current cluster centroid and the new cluster centroid is within the threshold or exceeds the iteration times, the running is finished and the current cluster centroid is printed. Otherwise, the new cluster centroid is assigned to the current cluster centroid and written to the file system for the next iteration.
package com.deng.Kmeans; import com.deng.util.FileUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class KmeansDriver { public static List<PointWritable> initial_centroids,new_centroids,current_centroids; private static KmeansUtil util; public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { System.out.println("***************************************"); initial_centroids=new ArrayList<>(); initial_centroids=KmeansUtil.pick(3,"input/kmeans.txt"); //Initialize cluster centroid System.out.println("init accomplishment"); System.out.println(initial_centroids.toString()); KmeansUtil.writeToHDFS(initial_centroids,"Sequence/read"); //Write cluster centroid to file system System.out.println("write to hdfs accomplishment"); current_centroids=initial_centroids; //Save the initialization cluster center to the current cluster center int iterators=0; //Judge the number of iterations while (true){ run(); List<PointWritable> p=KmeansUtil.readFromHDFS("Sequence/Output/part-r-00000"); System.out.println(p.toString()); System.out.println("now the MapReduce calculate accomplishment"); KmeansUtil.writeToHDFS(p,"Sequence/read"); System.out.println("now iterators is "+iterators); if(iterators>10) break; //Stop running more than the number of iterations iterators++; System.out.println("now ,this operation is deleteDirs ** Sequence/read **"); System.out.println("now ,this operation is rewrite from Sequence/Output/part-00000 to Sequence/read"); KmeansUtil.reWriteHDFS("Sequence/Output/part-r-00000","Sequence/read"); //Write the cluster center back into the file system new_centroids=KmeansUtil.readFromHDFS("Sequence/read"); System.out.println("newCentorids is "+new_centroids.toString()); if(KmeansUtil.change(new_centroids,current_centroids)<0.0005){ //Determine whether the change of two cluster centers is within the threshold System.out.println(KmeansUtil.change(new_centroids,current_centroids)); break; }else{ current_centroids=new_centroids; } } //Cluster center no longer changes, print to console List<PointWritable> result=KmeansUtil.readFromHDFS("Sequence/read"); System.out.println("the result is :"); for(int i=0;i<result.size();i++){ System.out.println(result.get(i).getX()+" "+result.get(i).getY()); } } public static void run() throws IOException, ClassNotFoundException, InterruptedException { FileUtil.deleteDirs("Sequence/Output"); Configuration conf=new Configuration(); String[] otherArgs=new String[]{"input/kmeans.txt","Sequence/Output"}; Job job=new Job(conf,"Kmeans"); job.setJarByClass(KmeansDriver.class); job.setMapperClass(KmeansMapper.class); job.setReducerClass(KmeansReducer.class); job.setMapOutputKeyClass(PointWritable.class); job.setMapOutputValueClass(PointWritable.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(PointWritable.class); FileInputFormat.addInputPath(job,new Path(otherArgs[0])); job.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setOutputPath(job,new Path(otherArgs[1])); int code=job.waitForCompletion(true)?0:1; } }
As for why there are so many System.out.println(), because I hope that the program can show the computing process in real time just like large software.
Problems encountered
The idea of the algorithm is no problem, that is, when judging the number of iterations, the condition is written backwards, and the error is found after two weeks of checking.