Several methods of sharing global information in Hadoop

I can't understand a pure technical article so long ago. I suddenly said that it contains sensitive words. I got off the shelf and asked me to resend it...

------

 

In the process of writing Hadoop MapReduce program, sometimes we need to use some shared global data in mappers or reducers. For example, when processing integer data tables, sometimes we need to let each Reducer know the value range of each column or the connectivity relationship of each Reducer in some graph algorithms.

 

Add key/value pair Universal, but inefficient
Put the shared file on HDFS and use Hadoop's file operation API to access General purpose, general efficiency (readable and writable)
Add the shared information to the JobConf/Configure object, and use the set/get series of methods to access the storage Better for small information, most efficient
Add shared information to the DistributedCache object Suitable for large amount of shared information (read only)

 

1. The basic method is to add the information to be shared to the key/value pair. This method is simple and easy to use (value is represented by Text, and then the normal data is followed by interval and global data), but the network efficiency and processing efficiency are seriously affected. In addition, sometimes the content of MR needs to be redesigned.

2. Put the shared file on HDFS, and use the file API of HDFS to access it in each Mapper/Reducer. This method is more general, but it needs to involve the file operation of HDFS, which is more complex and the efficiency will be affected.

The API for reading and writing HDFS is a little different from the standard Java file API, which requires specific objects to create InputStream/OutputStream. Here is an example of reading information from an HDFS file.

The key point is: first, get the current file system according to the current JobConf (it reads the relevant information from the configuration file under Hadoop by default, which is also applicable to the single node mode); then open the file using the member method of the file system (it returns an FSDataInputStream, which is a subclass of InputStream). Do not try to use the general Java file API to open the input stream or directly use Hadoop Path to open the file, such as new Scanner(p.toString()) or new Scanner(new Path(hdfs.getHomeDirectory(),p).toString()), there will be an exception that the file cannot be found (even if the file is in the displayed directory)

 

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

class XXX{
private int N;
List<Integer> D=new ArrayList<Integer>();
.....

	private void setConfByHDFS(Path p, JobConf conf) throws IOException {
		FileSystem hdfs = FileSystem.get(conf);
		Scanner s = new Scanner(hdfs.open(p));//Open file input stream with hdfs.open
		N = s.nextInt();for (int i = 0; i < N; i++) {
			D.add(s.nextInt());
		}
		s.close();
	}
}

3. Use JobConf's set * method to write configuration information, and then use JobConf's get * method to read relevant information in Mapper/Reducer's configure method.

 

Because information is written to JobConf, HDFS is not designed for reading and writing, which is the most efficient. But this method is difficult to share a lot of information. It is better to set some global variables.

When implementing, you need to overload Mapper/Reducer's configure method.

The set * method creates a specified type value according to the specified name in JobConf, and the get * method accesses the stored value according to the name. For the basic type, an additional parameter can be used to specify the default value returned when the access fails (null is returned when the class method fails). Set int / getInt, set float / getfloat can be used to access types such as int and float; set/get can be used to access a single String directly; setStrings/getStrings can access an array of String type.

 

class XXX{
...
	public static class CSVReducer extends MapReduceBase implements
			Reducer<IntWritable, IntWritable, IntWritable, VectorIntWritable> {
		private int N=0;
		private ArrayList<Integer> D = new ArrayList<Integer>();

		@Override
		public void configure(JobConf job) {//Only here can I access JobConf
			super.configure(job);
			N=job.getInt("csvcount.conf.num", -1);//Access shared information
			String str = job.get("csvcount.conf.d");
			for (String s : str.split(",")) {
				D.add(Integer.parseInt(s));
			}
		}

		@Override
		public void reduce(IntWritable key, Iterator<IntWritable> values,
				OutputCollector<IntWritable, VectorIntWritable> output, Reporter reporter) throws IOException {
			int[] res = new int[D.get(key.get())];
			// System.out.println(D.get(key.get()));
			...
		}
	}

	private void setConfByConfigure(Path p, JobConf conf) throws IOException {//Call this function class to write global shared information after creating the task.
		FileSystem hdfs = FileSystem.get(conf);
		Scanner s = new Scanner(hdfs.open(p));
		int N = s.nextInt();
		ArrayList<Integer> D = new ArrayList<Integer>();
		for (int i = 0; i < N; i++) {
			D.add(s.nextInt());
		}
		s.close();
		conf.setInt("csvcount.conf.num", N);//Write shared information
		conf.set("csvcount.conf.d", D.toString().replaceAll("[\\[\\] ]", ""));
	}

4. Write distributed cache. It is a simple mechanism for Hadoop to share some read-only global information. Hadoop copies all the files added to DistributedCache to the local temporary directory of the relevant node (remember the temporary directory item that needs to write the local path when Hadoop is configured?) Therefore, the read-write operation of these files is the read-write operation of local files. Because these files are only copied from HDFS to the local and not returned, their write operations are meaningless and cannot be shared.

When using it, you need to call the static method addCacheFile of DistributedCache to add the URI of the shared file / directory to the task JobConf; before accessing, use another static method getLocalCachedFiles of DistributedCache to list all the shared files in the job, and then you can open the file using the standard Java file API.

You need to overload the configure method in Mapper/Reducer.

 

public class WordCount2 extends Configured implements Tool {

	public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
		private Set<String> patternsToSkip = new HashSet<String>();
		public void configure(JobConf job) {//Overloaded configure method to get DistributedCache information from job
			if (job.getBoolean("wordcount2.skip.patterns", false)) {
				Path[] patternsFiles = new Path[0];
				try {
					patternsFiles = DistributedCache.getLocalCacheFiles(job);//Get the array of DistributedCache files
				} catch (IOException ioe) {
					System.err.println("Caught exception while getting cached files: "
							+ StringUtils.stringifyException(ioe));
				}
				for (Path patternsFile : patternsFiles)
					parseSkipFile(patternsFile);
			}
		}

		private void parseSkipFile(Path patternsFile) {
			try {
				BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));//Open file normally
				String pattern = null;
				while ((pattern = fis.readLine()) != null) {
					patternsToSkip.add(pattern);
				}
				fis.close();
			} catch (IOException ioe) {}
		}

		public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {....}
	}

	public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {...}

	public static void main(String[] args) throws Exception {
		JobConf conf = new JobConf(getConf(), WordCount2.class);
		conf.setJobName("wordcount2");
		...
		List<String> other_args = new ArrayList<String>();
		for (int i = 0; i < args.length; ++i) {
			if ("-skip".equals(args[i])) {
				DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);//Set DistributedCache
				conf.setBoolean("wordcount2.skip.patterns", true);
			} else {
				other_args.add(args[i]);
			}
		}

		FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
		FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));

		JobClient.runJob(conf);
		return 0;
	}
}

 

 

Originally placed in http://blog.csdn.net/yanxiangtianji

Please indicate the source of reprint

 

 

 

52 original articles published, 48 praised, 500000 visitors+
Private letter follow

Keywords: Hadoop Java Apache network

Added by llimllib on Thu, 06 Feb 2020 06:18:43 +0200