package org.apache.avro.file;
import java.io.IOException;
import java.io.EOFException;
import java.io.InputStream;
import java.io.File;
import java.util.Arrays;
import org.apache.avro.InvalidAvroMagicException;
import org.apache.avro.io.DecoderFactory;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.avro.io.DatumReader;
import static org.apache.avro.file.DataFileConstants.SYNC_SIZE;
import static org.apache.avro.file.DataFileConstants.MAGIC;
public class DataFileReader<D> extends DataFileStream<D> implements FileReader<D> {
private SeekableInputStream sin;
private long blockStart;
public static <D> FileReader<D> openReader(File file, DatumReader<D> reader) throws IOException {
SeekableFileInput input = new SeekableFileInput(file);
try {
return openReader(input, reader);
} catch (final Throwable e) {
IOUtils.closeQuietly(input);
throw e;
}
}
public static <D> FileReader<D> openReader(SeekableInput in, DatumReader<D> reader) throws IOException {
if (in.length() < MAGIC.length)
throw new InvalidAvroMagicException("Not an Avro data file");
byte[] magic = new byte[MAGIC.length];
in.seek(0);
for (int c = 0; c < magic.length; c = in.read(magic, c, magic.length - c)) {
}
in.seek(0);
if (Arrays.equals(MAGIC, magic))
return new DataFileReader<>(in, reader);
if (Arrays.equals(DataFileReader12.MAGIC, magic))
return new DataFileReader12<>(in, reader);
throw new InvalidAvroMagicException("Not an Avro data file");
}
public static <D> DataFileReader<D> (SeekableInput in, DatumReader<D> reader, Header header, boolean sync)
throws IOException {
DataFileReader<D> dreader = new DataFileReader<>(in, reader, header);
if (sync)
dreader.sync(in.tell());
else
dreader.seek(in.tell());
return dreader;
}
public DataFileReader(File file, DatumReader<D> reader) throws IOException {
this(new SeekableFileInput(file), reader, true);
}
public DataFileReader(SeekableInput sin, DatumReader<D> reader) throws IOException {
this(sin, reader, false);
}
protected DataFileReader(SeekableInput sin, DatumReader<D> reader, boolean closeOnError) throws IOException {
super(reader);
try {
this.sin = new SeekableInputStream(sin);
initialize(this.sin);
blockFinished();
} catch (final Throwable e) {
if (closeOnError) {
IOUtils.closeQuietly(sin);
}
throw e;
}
}
protected (SeekableInput sin, DatumReader<D> reader, Header header) throws IOException {
super(reader);
this.sin = new SeekableInputStream(sin);
initialize(this.sin, header);
}
public void seek(long position) throws IOException {
sin.seek(position);
vin = DecoderFactory.get().binaryDecoder(this.sin, vin);
datumIn = null;
blockRemaining = 0;
blockStart = position;
}
@Override
public void sync(long position) throws IOException {
seek(position);
if ((position == 0) && (getMeta("avro.sync") != null)) {
initialize(sin);
return;
}
try {
int i = 0, b;
InputStream in = vin.inputStream();
vin.readFixed(syncBuffer);
do {
int j = 0;
for (; j < SYNC_SIZE; j++) {
if (getHeader().sync[j] != syncBuffer[(i + j) % SYNC_SIZE])
break;
}
if (j == SYNC_SIZE) {
blockStart = position + i + SYNC_SIZE;
return;
}
b = in.read();
syncBuffer[i++ % SYNC_SIZE] = (byte) b;
} while (b != -1);
} catch (EOFException e) {
}
blockStart = sin.tell();
}
@Override
protected void blockFinished() throws IOException {
blockStart = sin.tell() - vin.inputStream().available();
}
public long previousSync() {
return blockStart;
}
@Override
public boolean pastSync(long position) throws IOException {
return ((blockStart >= position + SYNC_SIZE) || (blockStart >= sin.length()));
}
@Override
public long tell() throws IOException {
return sin.tell();
}
static class SeekableInputStream extends InputStream implements SeekableInput {
private final byte[] oneByte = new byte[1];
private SeekableInput in;
SeekableInputStream(SeekableInput in) throws IOException {
this.in = in;
}
@Override
public void seek(long p) throws IOException {
if (p < 0)
throw new IOException("Illegal seek: " + p);
in.seek(p);
}
@Override
public long tell() throws IOException {
return in.tell();
}
@Override
public long length() throws IOException {
return in.length();
}
@Override
public int read(byte[] b) throws IOException {
return in.read(b, 0, b.length);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return in.read(b, off, len);
}
@Override
public int read() throws IOException {
int n = read(oneByte, 0, 1);
if (n == 1) {
return oneByte[0] & 0xff;
} else {
return n;
}
}
@Override
public long skip(long skip) throws IOException {
long position = in.tell();
long length = in.length();
long remaining = length - position;
if (remaining > skip) {
in.seek(skip);
return in.tell() - position;
} else {
in.seek(remaining);
return in.tell() - position;
}
}
@Override
public void close() throws IOException {
in.close();
super.close();
}
@Override
public int available() throws IOException {
long remaining = (in.length() - in.tell());
return (remaining > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) remaining;
}
}
}