Note:
Please note that this is just a MR training project. In practical application, do not use MR calculation friend recommendation and calculation program of directed acyclic graph logic. Because MR will need to write intermediate results to disk, disk IO greatly reduces efficiency.
Hadoop is a bit behind. I will introduce the application of spark.
Common friends
Data prototype
User: friend 1, friend 2...
Final output
On the right are common friends.
directory structure
I'll give you a brief idea. Obviously, there are two steps here: step1 and step2,
And the second part depends on the results of the first step, and then comes to the final data. The logical structure of finite acyclic graph is obvious.
Code
step1Mapper
package findSharedFriends; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @Author: Braylon * @Date: 2020/1/27 16:59 * @Version: 1.0 */ public class Step1Mapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String s = value.toString(); String[] field = s.split(":"); String p = field[0]; String[] f = field[1].split(","); for (String f0 : f) { context.write(new Text(f0), new Text(p)); } } }
Sketch:
The main idea is inverted index, which is not exactly the same.
That is to change the form of user: friend i into
Friend 1: user A
Friend 2: user A
...
Then enter the first stage of reduce
step1reducer
package findSharedFriends; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @Author: Braylon * @Date: 2020/1/27 17:01 * @Version: 1.0 */ public class Step1Reducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer str = new StringBuffer(); for (Text v : values) { str.append(v).append(","); } context.write(key, new Text(str.toString())); } }
Sketch:
Here reduce merges the above
Get the following data
Friend 1: user A, user B, user Z
Friend 2: user C, user B
. . . . .
step2mapper
package findSharedFriends; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @Author: Braylon * @Date: 2020/1/27 17:07 * @Version: 1.0 */ public class Step2Mapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String f = fields[0]; String[] ps = fields[1].split(","); for (int i = 0; i < ps.length; i++) { for (int j = i + 1; j < ps.length; j++) { context.write(new Text(ps[i]+"-"+ps[j]),new Text(f)); } } } }
Sketch:
Here we traverse the user list corresponding to each friend i, and then combine them into user N-user n
Get the following data format
User A-user B friend i
step2reducer
package findSharedFriends; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @Author: Braylon * @Date: 2020/1/27 17:16 * @Version: 1.0 */ public class Step2Reducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (Text v : values) { sb.append(v).append("\001"); } context.write(key, new Text(sb.toString())); } }
reduce the above data format to get the target data.
driver class (JobControl)
Class driver
package findSharedFriends; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @Author: Braylon * @Date: 2020/1/27 17:18 * @Version: 1.0 */ public class driverClass { public static void main(String[] args) throws IOException, InterruptedException { args = new String[]{"D:\\idea\\HDFS\\src\\main\\java\\findSharedFriends\\data.txt", "D:\\idea\\HDFS\\src\\main\\java\\findSharedFriends\\out1", "D:\\idea\\HDFS\\src\\main\\java\\findSharedFriends\\out2"}; Configuration conf = new Configuration(); Job job1 = Job.getInstance(conf); job1.setMapperClass(Step1Mapper.class); job1.setReducerClass(Step1Reducer.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(Text.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job1,new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1])); Job job2 = Job.getInstance(conf); job2.setMapperClass(Step2Mapper.class); job2.setReducerClass(Step2Reducer.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(Text.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job2,new Path(args[1])); FileOutputFormat.setOutputPath(job2, new Path(args[2])); JobControl control = new JobControl("Braylon"); ControlledJob cj1 = new ControlledJob(job1.getConfiguration()); ControlledJob cj2 = new ControlledJob(job2.getConfiguration()); cj2.addDependingJob(cj1); control.addJob(cj1); control.addJob(cj2); Thread thread = new Thread(control); thread.start(); while (true) { if (control.allFinished()) { System.out.println("done"); thread.interrupt(); break; } } } }
Knowledge points:
- args
Here, there are three args arrays: input data (txt file), step1 output file (also the input of the second stage), and step2 output file
- Multi job, JobControl
cj2.addDependingJob(cj1);
The second job2 requires the output of job1 as input.