Intelligent Mail Marketing Using Markov Model in MapReduce (II)
In this blog, we use MapReduce Computing Framework to generate the following output for each customer-id
customerID (Date1Date_1Date1,Amount1Amount_1Amount1) ; (Date2Date_2Date2,Amount2Amount_2Amount2);...(DateNDate_NDateN,AmountNAmount_NAmountN)
Make:
Date1≤Date2≤...≤DateNDate_1 \leq Date_2 \leq ...\leq Date_NDate1≤Date2≤...≤DateN
This Mapreduce outputs an ascending sort of transaction dates, where data is sorted by date directly using the secondary sort technology of Mapreduce (because this method does not require too much memory). The implementation process is as follows
The data can be generated by the program itself or by the data given in the book Data Algorithms. Date
Custom type CompositeKey
This type includes a custom type for (customer-id,pucharse-date) pairs, which is a combination of natural keys and natural values to be sorted.
The CompositeKey code is as follows
package com.deng.MarkovState; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class CompositeKey implements WritableComparable<CompositeKey> { private String customerID; private long timestamp; public CompositeKey(String customerID,long timestamp){ set(customerID,timestamp); } public CompositeKey(){ } public void set(String customerID,long timestamp){ this.customerID=customerID; this.timestamp=timestamp; } public String getCustomerID() { return customerID; } public long getTimestamp() { return timestamp; } //Custom comparison @Override public int compareTo(CompositeKey o) { if(this.customerID.compareTo(o.customerID)!=0){ return this.customerID.compareTo(o.customerID); }else if(this.timestamp!=o.timestamp){ return timestamp<o.timestamp?-1:1; }else{ return 0; } } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.customerID); dataOutput.writeLong(this.timestamp); } @Override public void readFields(DataInput dataInput) throws IOException { this.customerID=dataInput.readUTF(); this.timestamp=dataInput.readLong(); } public String toString(){ StringBuffer sb=new StringBuffer(); sb.append(getCustomerID()).append(",").append(getTimestamp()); return sb.toString(); } }
Composite Key Comparator
Sort the custom class CompositeKey
CompositeKeyComparator encoding
package com.deng.MarkovState; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class CompositeKeyComparator extends WritableComparator { protected CompositeKeyComparator(){ super(CompositeKey.class,true); } public int compare(WritableComparable w1,WritableComparable w2){ CompositeKey key1=(CompositeKey) w1; CompositeKey key2=(CompositeKey) w2; int comparison=key1.getCustomerID().compareTo(key2.getCustomerID()); if(comparison==0){ if(key1.getTimestamp()==key2.getTimestamp()){ return 0; }else if(key1.getTimestamp()<key2.getTimestamp()){ return -1; }else { return 1; } }else { return comparison; } } }
Packet Comparator NaturalKey Group Comparator
Grouping customer-id
NaturalKeyGroupComparator encoding
package com.deng.MarkovState; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class NaturalKeyGroupingComparator extends WritableComparator { protected NaturalKeyGroupingComparator(){ super(CompositeKey.class,true); } public int compare(WritableComparable w1,WritableComparable w2){ CompositeKey key1=(CompositeKey) w1; CompositeKey key2=(CompositeKey) w2; return key1.getCustomerID().compareTo(key2.getCustomerID()); } }
mapper phase tasks
Get < customer-id, < timestamp, amount > key-value pairs
mapper phase coding
package com.deng.MarkovState; import com.deng.util.DateUtil; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SecondarySortProjectionMapper extends Mapper<LongWritable, Text,CompositeKey,PairOfLongInt> { private final CompositeKey reduceKey=new CompositeKey(); private final PairOfLongInt reduceValue=new PairOfLongInt(); public void map(LongWritable key,Text value,Context context){ String line=value.toString(); String[] tokens=line.split(","); if(tokens.length!=4){ return ; } long date = 0; try{ date= DateUtil.getDateAsMilliSeconds(tokens[2]); }catch (Exception e){ e.printStackTrace(); } int amount=Integer.parseInt(tokens[3]); reduceKey.set(tokens[0],date); reduceValue.set(date,amount); try { context.write(reduceKey,reduceValue); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
The custom class PairOfLongInt is designed as follows
package com.deng.MarkovState; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class PairOfLongInt implements WritableComparable<PairOfLongInt> { private long timestamp; private Integer amount; public PairOfLongInt(){ } public PairOfLongInt(long timestamp,Integer amount){ set(timestamp,amount); } public void set(long timestamp,Integer amount){ this.timestamp=timestamp; this.amount=amount; } public long getTimestamp() { return timestamp; } public Integer getAmount() { return amount; } @Override public int compareTo(PairOfLongInt o) { return 0; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(timestamp); dataOutput.writeInt(amount); } @Override public void readFields(DataInput dataInput) throws IOException { this.timestamp=dataInput.readLong(); this.amount=dataInput.readInt(); } public String toString(){ StringBuffer sb=new StringBuffer(); sb.append(getTimestamp()).append(",").append(getAmount()); return sb.toString(); } }
DateUtil is designed as follows
package com.deng.util; import java.text.SimpleDateFormat; import java.util.Date; public class DateUtil { static final String DATE_FORMAT="yyyy-MM-dd"; static final SimpleDateFormat SIMPLE_DATE_FORMAT=new SimpleDateFormat(DATE_FORMAT); public static Date getDate(String dateAsString){ try{ return SIMPLE_DATE_FORMAT.parse(dateAsString); }catch (Exception e){ return null; } } public static long getDateAsMilliSeconds(String dateAsString) throws Exception{ Date date=getDate(dateAsString); return date.getTime(); } public static String getDateAsString(long timestamp){ return SIMPLE_DATE_FORMAT.format(timestamp); } }
Reducr phase tasks
Organize the data to produce the following output
customerID (Date1Date_1Date1,Amount1Amount_1Amount1) ; (Date2Date_2Date2,Amount2Amount_2Amount2);...(DateNDate_NDateN,AmountNAmount_NAmountN)
reducer phase coding
package com.deng.MarkovState; import com.deng.util.DateUtil; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SecondarySortProjectionReducer extends Reducer<CompositeKey,PairOfLongInt, NullWritable, Text> { public void reduce(CompositeKey key,Iterable<PairOfLongInt> values,Context context){ StringBuilder sb=new StringBuilder(); sb.append(key.getCustomerID()); for(PairOfLongInt pair: values){ sb.append(","); long timestamp=pair.getTimestamp(); String date= DateUtil.getDateAsString(timestamp); sb.append(date); sb.append(","); sb.append(pair.getAmount()); } try { context.write(NullWritable.get(),new Text(sb.toString())); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
The MapReduce phase is driven as follows
package com.deng.MarkovState; import com.deng.util.FileUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MarkovStateDriver { public static void main(String[] args) throws Exception { FileUtil.deleteDirs("output"); FileUtil.deleteDirs("output2"); FileUtil.deleteDirs("MarkovState"); Configuration conf=new Configuration(); String[] otherArgs=new String[]{"input/smart_email_training.txt","output"}; Job secondSortJob=new Job(conf,"Markov"); FileInputFormat.setInputPaths(secondSortJob,new Path(otherArgs[0])); FileOutputFormat.setOutputPath(secondSortJob,new Path(otherArgs[1])); secondSortJob.setJarByClass(MarkovStateDriver.class); secondSortJob.setMapperClass(SecondarySortProjectionMapper.class); secondSortJob.setReducerClass(SecondarySortProjectionReducer.class); secondSortJob.setMapOutputKeyClass(CompositeKey.class); secondSortJob.setMapOutputValueClass(PairOfLongInt.class); secondSortJob.setOutputKeyClass(NullWritable.class); secondSortJob.setOutputValueClass(Text.class); secondSortJob.setCombinerKeyGroupingComparatorClass(CompositeKeyComparator.class); secondSortJob.setGroupingComparatorClass(NaturalKeyGroupingComparator.class); System.exit(secondSortJob.waitForCompletion(true)?0:1)==0); } }