bandarra.me

Ignoring Corrupted gzip files in Hadoop

I've been analyzing my website traffic using Hadoop and MapReduce. Our logs are recorded hourly on a gzipped file. But, since the server may be restarted while writing to the file, every now and then a file gets corrupted. When this happens the default Hadoop implementation aborts the entire job. So, I had to dive into the Hadoop source code and find a way to make it more lenient towards corrupted files.

The trick is to create a LineRecordReader that, instead of raising the EOFException, catches it and tells that there are no more lines to read in the file. As the default TextInputFormat has a hardcoded LineRecordReader, it is necessary to extend the FileInputFormat and override the createRecordReader method to return my version of FileInputFormat.

Here's what the code looks like:

package org.bandarra.hadoop;
import org.apache.commons.compress.utils.Charsets;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

import java.io.EOFException;
import java.io.IOException;

/**
 * Created by andreban on 12/9/13.
 */
public class LenientTextInputFormat extends FileInputFormat {
    private static class LenientLineRecordReader extends LineRecordReader {
        public LenientLineRecordReader(byte[] recordDelimiter) {
            super(recordDelimiter);
        }

        @Override
        public boolean nextKeyValue() throws IOException {
            try {
                return super.nextKeyValue();
            } catch(EOFException ex) {
                ex.printStackTrace();
                return false;
            }
        }
    }

    @Override
    public RecordReader createRecordReader(
            InputSplit split, TaskAttemptContext context) {
        String delimiter =
            context.getConfiguration().get("textinputformat.record.delimiter");
        byte[] recordDelimiterBytes = null;
        if (null != delimiter) {
            recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
        }
        return new LenientLineRecordReader(recordDelimiterBytes);
    }
}