package io.github.vmzakharov.ecdataframe.dataset;
import io.github.vmzakharov.ecdataframe.dsl.value.ValueType;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.eclipse.collections.impl.utility.ListIterate;
import java.io.File;
import java.io.IOException;
public class AvroDataSet
extends DataSetAbstract
{
private final Schema schema;
private final String dataFileName;
private GenericRecord currentRecord;
private DataFileReader<GenericRecord> dataFileReader;
public AvroDataSet(String schemaDefinitionFileName, String newName, String newDataFileName)
{
super(newName);
this.schema = this.loadSchema(schemaDefinitionFileName);
this.dataFileName = newDataFileName;
}
private Schema loadSchema(String schemaDefinitionFileName)
{
try
{
return new Schema.Parser().parse(new File(schemaDefinitionFileName));
}
catch (IOException e)
{
throw new RuntimeException("Failed to load a schema definition from " + schemaDefinitionFileName + "'", e);
}
}
public GenericRecord createRecord()
{
return new GenericData.Record(this.schema);
}
public GenericRecord createRecordForField(String fieldName)
{
return new GenericData.Record(this.findFieldSchema(fieldName));
}
private Schema findFieldSchema(String fieldName)
{
Schema fieldSchema = schema.getField(fieldName).schema();
if (fieldSchema.isUnion())
{
fieldSchema = ListIterate.detect(fieldSchema.getTypes(), schema -> schema.getType() != Schema.Type.NULL);
}
return fieldSchema;
}
public void write(GenericRecord... records)
{
File file = new File(this.dataFileName);
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);)
{
dataFileWriter.create(this.schema, file);
for (int i = 0; i < records.length; i++)
{
dataFileWriter.append(records[i]);
}
}
catch (IOException e)
{
throw new RuntimeException("Failed to persist data into '" + this.dataFileName + "'", e);
}
}
@Override
public void openFileForReading()
{
File file = new File(this.dataFileName);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(this.schema);
try
{
this.dataFileReader = new DataFileReader<>(file, datumReader);
}
catch (IOException e)
{
throw new RuntimeException("Unable to create a reader for file '" + this.dataFileName + "'", e);
}
}
public Object getValue(String propertyChainString)
{
return this.currentRecord.get(propertyChainString);
}
@Override
public Object next()
{
this.currentRecord = this.dataFileReader.next();
return this.currentRecord;
}
@Override
public boolean hasNext()
{
return this.dataFileReader.hasNext();
}
@Override
public void close()
{
try
{
this.dataFileReader.close();
}
catch (IOException e)
{
throw new RuntimeException("Unable to close data file reader for '" + dataFileName + "'");
}
}
public ValueType getFieldType(String fieldName)
{
Schema fieldSchema = this.findFieldSchema(fieldName);
switch (fieldSchema.getType())
{
case INT:
return ValueType.LONG;
case DOUBLE:
case FLOAT:
case FIXED:
return ValueType.DOUBLE;
case STRING:
return ValueType.STRING;
case BOOLEAN:
return ValueType.BOOLEAN;
default:
return ValueType.VOID;
}
}
}