Big Data Analytics

Distributing Auxiliary Job Data

The bulk of the data that you process in a MapReduce job will probably be stored in large files spread across the HDFS. You can reliably store petabytes of information in HDFS and individual jobs can process several terabytes at a time. The HDFS access model, however, assumes that the data from a file should be read into a single mapper. The individual files stored in HDFS are very large and can possibly be broken into different chunks for processing in parallel.

Sometimes it is necessary for every Mapper to read a single file; for example, a distributed spell-check application would require every Mapper to read in a copy of the dictionary before processing documents. The dictionary will be small (only a few megabytes), but needs to be widely available so that all nodes can reach it.

Hadoop provides a mechanism specifically for this purpose, called the distributed cache. The distributed cache can contain small data files needed for initialization or libraries of code that may need to be accessed on all nodes in the cluster.

To use the distributed cache to disseminate files, create an instance of the DistributedCache class when setting up your job. Use the DistributedCache.addCacheFile() method to add names of files which should be sent to all nodes on the system. The file names are specified as URI objects; unless qualified otherwise, they assume that the file is present on the HDFS in the path indicated. You can copy local files to HDFS with the FileSystem.copyFromLocalFile() method.

When you want to retrieve files from the distributed cache (e.g., when the mapper is in its configure() step and wants to load config data like the dictionary mentioned above), use the DistributedCache.getLocalCacheFiles() method to retrieve the list of paths local to the current node for the cached files. These are copies of all cached files, placed in the local file system of each worker machine. (They will be in a subdirectory of mapred.local.dir.) Each of the paths returned by getLocalCacheFiles() can be accessed via regular Java file I/O mechanisms, such as java.io.FileInputStream.

As a cautionary note: If you use the local JobRunner in Hadoop (i.e., what happens if you call JobClient.runJob() in a program with no or an empty hadoop-conf.xml accessible), then no local data directory is created; the getLocalCacheFiles() call will return an empty set of results. Unit test code should take this into account.

Suppose that we were writing an inverted index builder. We do not want to include very common words such "the," "a," "and," etc. These so-called stop words might all be listed in a file. All the mappers should read the stop word list when they are initialized, and then filter the index they generate against this list. We can disseminate a list of stop words to all the Mappers with the following code. The first listing will put the stop-words file into the distributed cache:

public static final String LOCAL_STOPWORD_LIST =
"/home/aaron/stop_words.txt";

public static final String HDFS_STOPWORD_LIST = "/data/stop_words.txt";

void cacheStopWordList(JobConf conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path hdfsPath = new Path(HDFS_STOPWORD_LIST);

// upload the file to hdfs. Overwrite any existing copy.
fs.copyFromLocalFile(false, true, new Path(LOCAL_STOPWORD_LIST),
hdfsPath);

DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
}


This code copies the local stop_words.txt file into HDFS, and then tells the distributed cache to send the HDFS copy to all nodes in the system. The next listing actually uses the file in the mapper:

class IndexMapperExample implements Mapper {
void configure(JobConf conf) {
try {
String stopwordCacheName = new Path(HDFS_STOPWORD_LIST).getName();
Path [] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
if (null != cacheFiles && cacheFiles.length > 0) {
for (Path cachePath : cacheFiles) {
if (cachePath.getName().equals(stopwordCacheName)) {
loadStopWords(cachePath);
break;
}
}
}
} catch (IOException ioe) {
System.err.println("IOException reading from distributed cache");
System.err.println(ioe.toString());
}
}

void loadStopWords(Path cachePath) throws IOException {
// note use of regular java.io methods here - this is a local file now
BufferedReader wordReader = new BufferedReader(
new FileReader(cachePath.toString()));
try {
String line;
this.stopWords = new HashSet();
while ((line = wordReader.readLine()) != null) {
this.stopWords.add(line);
}
} finally {
wordReader.close();
}
}

/* actual map() method, etc go here */
}

The code above belongs in the Mapper instance associated with the index generation process. We retrieve the list of files cached in the distributed cache. We then compare the basename of each file (using Path.getName()) with the one we expect for our stop word list. Once we find this file, we read the words, one per line, into a Set instance that we will consult during the mapping process.

The distributed cache has additional uses too. For instance, you can use the DistributedCache.addArchiveToClassPath() method to send a .jar file to all the nodes. It will be inserted into the classpath as well, so that classes in the archive can be accessed by all the nodes.