I've been analyzing my website traffic using Hadoop and MapReduce. Our logs are recorded hourly on a gzipped file. But, since the server may be restarted while writing to the file, every now and then a file gets corrupted. When this happens the default Hadoop implementation aborts the entire job. So, I had to dive into the Hadoop source code and find a way to make it more lenient towards corrupted files.
The trick is to create a LineRecordReader that, instead of raising the EOFException, catches it and tells that there are no more lines to read in the file. As the default TextInputFormat has a hardcoded LineRecordReader, it is necessary to extend the FileInputFormat and override the createRecordReader method to return my version of FileInputFormat.
Here's what the code looks like:
package org.bandarra.hadoop; import org.apache.commons.compress.utils.Charsets; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; import java.io.EOFException; import java.io.IOException; /** * Created by andreban on 12/9/13. */ public class LenientTextInputFormat extends FileInputFormat { private static class LenientLineRecordReader extends LineRecordReader { public LenientLineRecordReader(byte[] recordDelimiter) { super(recordDelimiter); } @Override public boolean nextKeyValue() throws IOException { try { return super.nextKeyValue(); } catch(EOFException ex) { ex.printStackTrace(); return false; } } } @Override public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get("textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) { recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); } return new LenientLineRecordReader(recordDelimiterBytes); } }