Andre Bandarra's Blog

Ignoring Corrupted gzip files in Hadoop

I've been analyzing my website traffic using Hadoop and MapReduce. Our logs are recorded hourly on a gzipped file. But, since the server may be restarted while writing to the file, every now and then a file gets corrupted. When this happens the default Hadoop implementation aborts the entire job. So, I had to dive into the Hadoop source code and find a way to make it more lenient towards corrupted files.

The trick is to create a LineRecordReader that, instead of raising the EOFException, catches it and tells that there are no more lines to read in the file. As the default TextInputFormat has a hardcoded LineRecordReader, it is necessary to extend the FileInputFormat and override the createRecordReader method to return my version of FileInputFormat.

Here's what the code looks like:

package org.bandarra.hadoop;
import org.apache.commons.compress.utils.Charsets;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

import java.io.EOFException;
import java.io.IOException;

/**
* Created by andreban on 12/9/13.
*/

public class LenientTextInputFormat extends FileInputFormat {
private static class LenientLineRecordReader extends LineRecordReader {
public LenientLineRecordReader(byte[] recordDelimiter) {
super(recordDelimiter);
}

@Override
public boolean nextKeyValue() throws IOException {
try {
return super.nextKeyValue();
} catch(EOFException ex) {
ex.printStackTrace();
return false;
}
}
}

@Override
public RecordReader createRecordReader(
InputSplit split, TaskAttemptContext context) {
String delimiter =
context.getConfiguration().get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
return new LenientLineRecordReader(recordDelimiterBytes);
}
}

← Home