Big Data Analytics

Input Formats

The InputFormat defines how to read data from a file into the Mapper instances. Hadoop comes with several implementations of InputFormat; some work with text files and describe different ways in which the text files can be interpreted. Others, like SequenceFileInputFormat, are purpose-built for reading particular binary file formats. These types are described in more detail in Module 4.

More powerfully, you can define your own InputFormat implementations to format the input to your programs however you want. For example, the default TextInputFormat reads lines of text files. The key it emits for each record is the byte offset of the line read (as a LongWritable), and the value is the contents of the line up to the terminating '\n' character (as a Text object). If you have multi-line records each separated by a $ character, you could write your own InputFormat that parses files into records split on this character instead.

Another important job of the InputFormat is to divide the input data sources (e.g., input files) into fragments that make up the inputs to individual map tasks. These fragments are called "splits" and are encapsulated in instances of the InputSplit interface. Most files, for example, are split up on the boundaries of the underlying blocks in HDFS, and are represented by instances of the FileInputSplit class. Other files may be unsplittable, depending on application-specific data. Dividing up other data sources (e.g., tables from a database) into splits would be performed in a different, application-specific fashion. When dividing the data into input splits, it is important that this process be quick and cheap. The data itself should not need to be accessed to perform this process (as it is all done by a single machine at the start of the MapReduce job).

The TextInputFormat divides files into splits strictly by byte offsets. It then reads individual lines of the files from the split in as record inputs to the Mapper. The RecordReader associated with TextInputFormat must be robust enough to handle the fact that the splits do not necessarily correspond neatly to line-ending boundaries. In fact, the RecordReader will read past the theoretical end of a split to the end of a line in one record. The reader associated with the next split in the file will scan for the first full line in the split to begin processing that fragment. All RecordReader implementations must use some similar logic to ensure that they do not miss records that span InputSplit boundaries.
Custom File Formats

In this section we will describe how to develop a custom InputFormat that reads files of a particular format.

Rather than implement InputFormat directly, it is usually best to subclass the FileInputFormat. This abstract class provides much of the basic handling necessary to manipulate files. If we want to parse the file in a particular way, then we must override the getRecordReader() method, which returns an instance of RecordReader: an object that can read from the input source. To motivate this discussion with concrete code, we will develop an InputFormat and RecordReader implementation which can read lists of objects and positions from files. We assume that we are reading text files where each line contains the name of an object and then its coordinates as a set of three comma-separated floating-point values. For instance, some sample data may look like the following:

ball, 3.5, 12.7, 9.0
car, 15, 23.76, 42.23
device, 0.0, 12.4, -67.1

We must read individual lines of the file, separate the key (Text) from the three floats, and then read those into a Point3D object as we developed earlier.

The ObjectPositionInputFormat class itself is very straightforward. Since it will be reading from files, all we need to do is define a factory method for RecordReader implementations:

public class ObjectPositionInputFormat extends
FileInputFormat {

public RecordReader getRecordReader(
InputSplit input, JobConf job, Reporter reporter)
throws IOException {

reporter.setStatus(input.toString());
return new ObjPosRecordReader(job, (FileSplit)input);
}
}

Listing 5.3: InputFormat for object-position files

Note that we define the types of the keys and values emitted by the InputFormat in its definition; these must match the types read in as input by the Mapper in its class definition.

The RecordReader implementation is where the actual file information is read and parsed. We will implement this by making use of the LineRecordReader class; this is the RecordReader implementation used by TextInputFormat to read individual lines from files and return them unparsed. We will wrap the LineRecordReader with our own implementation which converts the values to the expected types. By using LineRecordReader, we do not need to worry about what happens if a record spans an InputSplit boundary, since this underlying record reader already has logic to take care of this fact.

class ObjPosRecordReader implements RecordReader {

private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;

public ObjPosRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);

lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
}

public boolean next(Text key, Point3D value) throws IOException {
// get the next line
if (!lineReader.next(lineKey, lineValue)) {
return false;
}

// parse the lineValue which is in the format:
// objName, x, y, z
String [] pieces = lineValue.toString().split(",");
if (pieces.length != 4) {
throw new IOException("Invalid record received");
}

// try to parse floating point components of value
float fx, fy, fz;
try {
fx = Float.parseFloat(pieces[1].trim());
fy = Float.parseFloat(pieces[2].trim());
fz = Float.parseFloat(pieces[3].trim());
} catch (NumberFormatException nfe) {
throw new IOException("Error parsing floating point value in record");
}

// now that we know we'll succeed, overwrite the output objects

key.set(pieces[0].trim()); // objName is the output key.

value.x = fx;
value.y = fy;
value.z = fz;

return true;
}

public Text createKey() {
return new Text("");
}

public Point3D createValue() {
return new Point3D();
}

public long getPos() throws IOException {
return lineReader.getPos();
}

public void close() throws IOException {
lineReader.close();
}

public float getProgress() throws IOException {
return lineReader.getProgress();
}
}

Listing 5.4: RecordReader for object-position files

You can control the InputFormat used by your MapReduce job with the JobConf.setInputFormat() method.

Exercise: Write an InputFormat and RecordReader that read strings of text separated by '$' characters instead of newlines.
Alternate Data Sources

An InputFormat describes both how to present the data to the Mapper and where the data originates from. Most implementations descend from FileInputFormat, which reads from files on the local machine or HDFS. If your data does not come from a source like this, you can write an InputFormat implementation that reads from an alternate source. For example, HBase (a distributed database system) provides a TableInputFormat that reads records from a database table. You could imagine a system where data is streamed to each machine over the network on a particular port; the InputFormat reads data from the port and parses it into individual records for mapping.