mapreduce implementation of big data learning find common friends JobControl implementation of directed acyclic graph

Note:
Please note that this is just a MR training project. In practical application, do not use MR calculation friend recommendation and calculation program of directed acyclic graph logic. Because MR will need to write intermediate results to disk, disk IO greatly reduces efficiency.
Hadoop is a bit behind. I will introduce the application of spark.

Common friends

Data prototype

User: friend 1, friend 2...

Final output

On the right are common friends.

directory structure


I'll give you a brief idea. Obviously, there are two steps here: step1 and step2,
And the second part depends on the results of the first step, and then comes to the final data. The logical structure of finite acyclic graph is obvious.

Code

step1Mapper

package findSharedFriends;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @Author: Braylon
 * @Date: 2020/1/27 16:59
 * @Version: 1.0
 */
public class Step1Mapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String s = value.toString();
        String[] field = s.split(":");
        String p = field[0];
        String[] f = field[1].split(",");
        for (String f0 : f) {
            context.write(new Text(f0), new Text(p));
        }
    }
}

Sketch:
The main idea is inverted index, which is not exactly the same.
That is to change the form of user: friend i into
Friend 1: user A
Friend 2: user A
...
Then enter the first stage of reduce

step1reducer

package findSharedFriends;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @Author: Braylon
 * @Date: 2020/1/27 17:01
 * @Version: 1.0
 */
public class Step1Reducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuffer str = new StringBuffer();
        for (Text v : values) {
            str.append(v).append(",");
        }
        context.write(key, new Text(str.toString()));
    }
}

Sketch:
Here reduce merges the above
Get the following data
Friend 1: user A, user B, user Z
Friend 2: user C, user B
. . . . .

step2mapper

package findSharedFriends;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @Author: Braylon
 * @Date: 2020/1/27 17:07
 * @Version: 1.0
 */
public class Step2Mapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split("\t");
        String f = fields[0];
        String[] ps = fields[1].split(",");
        for (int i = 0; i < ps.length; i++) {
            for (int j = i + 1; j < ps.length; j++) {
                context.write(new Text(ps[i]+"-"+ps[j]),new Text(f));
            }
        }
    }
}

Sketch:
Here we traverse the user list corresponding to each friend i, and then combine them into user N-user n
Get the following data format
User A-user B friend i

step2reducer

package findSharedFriends;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @Author: Braylon
 * @Date: 2020/1/27 17:16
 * @Version: 1.0
 */
public class Step2Reducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuffer sb = new StringBuffer();
        for (Text v : values) {
            sb.append(v).append("\001");
        }
        context.write(key, new Text(sb.toString()));
    }
}

reduce the above data format to get the target data.

driver class (JobControl)

Class driver

package findSharedFriends;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @Author: Braylon
 * @Date: 2020/1/27 17:18
 * @Version: 1.0
 */
public class driverClass {
    public static void main(String[] args) throws IOException, InterruptedException {
        args = new String[]{"D:\\idea\\HDFS\\src\\main\\java\\findSharedFriends\\data.txt", "D:\\idea\\HDFS\\src\\main\\java\\findSharedFriends\\out1", "D:\\idea\\HDFS\\src\\main\\java\\findSharedFriends\\out2"};
        Configuration conf = new Configuration();
        Job job1 = Job.getInstance(conf);

        job1.setMapperClass(Step1Mapper.class);
        job1.setReducerClass(Step1Reducer.class);

        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(Text.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job1,new Path(args[0]));
        FileOutputFormat.setOutputPath(job1, new Path(args[1]));

        Job job2 = Job.getInstance(conf);

        job2.setMapperClass(Step2Mapper.class);
        job2.setReducerClass(Step2Reducer.class);

        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(Text.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job2,new Path(args[1]));
        FileOutputFormat.setOutputPath(job2, new Path(args[2]));

        JobControl control = new JobControl("Braylon");
        ControlledJob cj1 = new ControlledJob(job1.getConfiguration());
        ControlledJob cj2 = new ControlledJob(job2.getConfiguration());
        cj2.addDependingJob(cj1);
        control.addJob(cj1);
        control.addJob(cj2);
        Thread thread = new Thread(control);
        thread.start();
        while (true) {
            if (control.allFinished()) {
                System.out.println("done");
                thread.interrupt();
                break;
            }
        }
    }
}

Knowledge points:

  1. args

Here, there are three args arrays: input data (txt file), step1 output file (also the input of the second stage), and step2 output file

  1. Multi job, JobControl

cj2.addDependingJob(cj1);
The second job2 requires the output of job1 as input.

30 original articles published, 32 praised, 2584 visited
Private letter follow

Keywords: Hadoop Apache Java Spark

Added by Artiom on Mon, 27 Jan 2020 12:45:06 +0200