package com.fasterxml.jackson.dataformat.avro.deser;
import java.util.*;
import org.apache.avro.Schema;
import com.fasterxml.jackson.dataformat.avro.deser.ScalarDecoder.*;
import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaHelper;
public abstract class AvroReaderFactory
{
protected final static ScalarDecoder READER_BOOLEAN = new BooleanDecoder();
protected final static ScalarDecoder READER_BYTES = new BytesDecoder();
protected final static ScalarDecoder READER_DOUBLE = new DoubleReader();
protected final static ScalarDecoder READER_FLOAT = new FloatReader();
protected final static ScalarDecoder READER_INT = new IntReader();
protected final static ScalarDecoder READER_LONG = new LongReader();
protected final static ScalarDecoder READER_NULL = new NullReader();
protected final static ScalarDecoder READER_STRING = new StringReader();
protected final TreeMap<String, AvroStructureReader> _knownReaders
= new TreeMap<String, AvroStructureReader>();
public static AvroStructureReader createFor(Schema schema) {
return new NonResolving().createReader(schema);
}
public static AvroStructureReader createFor(Schema writerSchema,
Schema readerSchema) {
return new Resolving().createReader(writerSchema, readerSchema);
}
public ScalarDecoder createScalarValueDecoder(Schema type)
{
switch (type.getType()) {
case BOOLEAN:
return READER_BOOLEAN;
case BYTES:
return READER_BYTES;
case DOUBLE:
return READER_DOUBLE;
case ENUM:
return new EnumDecoder(AvroSchemaHelper.getFullName(type), type.getEnumSymbols());
case FIXED:
return new FixedDecoder(type.getFixedSize(), AvroSchemaHelper.getFullName(type));
case FLOAT:
return READER_FLOAT;
case INT:
{
final String typeId = AvroSchemaHelper.getTypeId(type);
return (typeId != null) ? new IntReader(typeId) : READER_INT;
}
case LONG:
return READER_LONG;
case NULL:
return READER_NULL;
case STRING:
{
final String typeId = AvroSchemaHelper.getTypeId(type);
return (typeId != null) ? new StringReader(typeId) : READER_STRING;
}
case UNION:
List<Schema> types = type.getTypes();
{
ScalarDecoder[] readers = new ScalarDecoder[types.size()];
int i = 0;
for (Schema schema : types) {
ScalarDecoder reader = createScalarValueDecoder(schema);
if (reader == null) {
return null;
}
readers[i++] = reader;
}
return new ScalarUnionDecoder(readers);
}
case ARRAY:
case MAP:
case RECORD:
return null;
}
throw new IllegalStateException("Unrecognized Avro Schema type: "+type.getType());
}
public AvroStructureReader createReader(Schema schema)
{
AvroStructureReader reader = _knownReaders.get(AvroSchemaHelper.getFullName(schema));
if (reader != null) {
return reader;
}
switch (schema.getType()) {
case ARRAY:
return createArrayReader(schema);
case MAP:
return createMapReader(schema);
case RECORD:
return createRecordReader(schema);
case UNION:
return createUnionReader(schema);
default:
return new ScalarDecoderWrapper(createScalarValueDecoder(schema));
}
}
protected AvroStructureReader createArrayReader(Schema schema)
{
Schema elementType = schema.getElementType();
ScalarDecoder scalar = createScalarValueDecoder(elementType);
String typeId = AvroSchemaHelper.getTypeId(schema);
String elementTypeId = schema.getProp(AvroSchemaHelper.AVRO_SCHEMA_PROP_ELEMENT_CLASS);
if (elementTypeId == null) {
elementTypeId = AvroSchemaHelper.getTypeId(elementType);
}
if (scalar != null) {
if (EnumSet.class.getName().equals(typeId)) {
typeId += "<" + elementTypeId + ">";
}
return ArrayReader.construct(scalar, typeId, elementTypeId);
}
return ArrayReader.construct(createReader(elementType), typeId, elementTypeId);
}
protected AvroStructureReader createMapReader(Schema schema)
{
Schema elementType = schema.getValueType();
ScalarDecoder dec = createScalarValueDecoder(elementType);
String typeId = AvroSchemaHelper.getTypeId(schema);
String keyTypeId = schema.getProp(AvroSchemaHelper.AVRO_SCHEMA_PROP_KEY_CLASS);
if (EnumMap.class.getName().equals(typeId)) {
typeId += "<" + keyTypeId + "," + Object.class.getName() + ">";
}
if (dec != null) {
String valueTypeId = AvroSchemaHelper.getTypeId(elementType);
return MapReader.construct(dec, typeId, keyTypeId, valueTypeId);
}
return MapReader.construct(createReader(elementType), typeId, keyTypeId);
}
protected AvroStructureReader createRecordReader(Schema schema)
{
final List<Schema.Field> fields = schema.getFields();
AvroFieldReader[] fieldReaders = new AvroFieldReader[fields.size()];
RecordReader reader = new RecordReader.Std(fieldReaders, AvroSchemaHelper.getTypeId(schema));
_knownReaders.put(AvroSchemaHelper.getFullName(schema), reader);
int i = 0;
for (Schema.Field field : fields) {
fieldReaders[i++] = createFieldReader(field);
}
return reader;
}
protected AvroStructureReader createUnionReader(Schema schema)
{
final List<Schema> types = schema.getTypes();
AvroStructureReader[] typeReaders = new AvroStructureReader[types.size()];
int i = 0;
for (Schema type : types) {
typeReaders[i++] = createReader(type);
}
return new UnionReader(typeReaders);
}
protected AvroFieldReader createFieldReader(Schema.Field field) {
final String name = field.name();
final Schema type = field.schema();
ScalarDecoder scalar = createScalarValueDecoder(type);
if (scalar != null) {
return scalar.asFieldReader(name, false);
}
return AvroFieldReader.construct(name, createReader(type));
}
private static class NonResolving extends AvroReaderFactory
{
protected NonResolving() { }
}
private static class Resolving extends AvroReaderFactory
{
protected Resolving() { }
public AvroStructureReader createReader(Schema writerSchema, Schema readerSchema)
{
AvroStructureReader reader = _knownReaders.get(AvroSchemaHelper.getFullName(readerSchema));
if (reader != null) {
return reader;
}
switch (writerSchema.getType()) {
case ARRAY:
return createArrayReader(writerSchema, readerSchema);
case MAP:
return createMapReader(writerSchema, readerSchema);
case RECORD:
return createRecordReader(writerSchema, readerSchema);
case UNION:
return createUnionReader(writerSchema, readerSchema);
default:
return new ScalarDecoderWrapper(createScalarValueDecoder(writerSchema));
}
}
protected AvroStructureReader createArrayReader(Schema writerSchema, Schema readerSchema)
{
readerSchema = _verifyMatchingStructure(readerSchema, writerSchema);
Schema writerElementType = writerSchema.getElementType();
ScalarDecoder scalar = createScalarValueDecoder(writerElementType);
String typeId = AvroSchemaHelper.getTypeId(readerSchema);
String elementTypeId = readerSchema.getProp(AvroSchemaHelper.AVRO_SCHEMA_PROP_ELEMENT_CLASS);
if (scalar != null) {
return ArrayReader.construct(scalar, typeId, elementTypeId);
}
return ArrayReader.construct(createReader(writerElementType, readerSchema.getElementType()), typeId, elementTypeId);
}
protected AvroStructureReader createMapReader(Schema writerSchema, Schema readerSchema)
{
readerSchema = _verifyMatchingStructure(readerSchema, writerSchema);
Schema writerElementType = writerSchema.getValueType();
ScalarDecoder dec = createScalarValueDecoder(writerElementType);
String typeId = AvroSchemaHelper.getTypeId(readerSchema);
String keyTypeId = readerSchema.getProp(AvroSchemaHelper.AVRO_SCHEMA_PROP_KEY_CLASS);
if (dec != null) {
String valueTypeId = readerSchema.getValueType().getProp(AvroSchemaHelper.AVRO_SCHEMA_PROP_CLASS);
return MapReader.construct(dec, typeId, keyTypeId, valueTypeId);
}
return MapReader.construct(createReader(writerElementType, readerSchema.getValueType()), typeId, keyTypeId);
}
protected AvroStructureReader createRecordReader(Schema writerSchema, Schema readerSchema)
{
readerSchema = _verifyMatchingStructure(readerSchema, writerSchema);
final List<Schema.Field> writerFields = writerSchema.getFields();
Map<String,Schema.Field> readerFields = new HashMap<String,Schema.Field>();
List<Schema.Field> defaultFields = new ArrayList<Schema.Field>();
{
Set<String> writerNames = new HashSet<String>();
for (Schema.Field f : writerFields) {
writerNames.add(f.name());
}
for (Schema.Field f : readerSchema.getFields()) {
String name = f.name();
if (writerNames.contains(name)) {
readerFields.put(name, f);
} else {
defaultFields.add(f);
}
}
}
AvroFieldReader[] fieldReaders = new AvroFieldReader[writerFields.size()
+ defaultFields.size()];
RecordReader reader = new RecordReader.Resolving(fieldReaders, AvroSchemaHelper.getTypeId(readerSchema));
_knownReaders.put(AvroSchemaHelper.getFullName(readerSchema), reader);
int i = 0;
for (Schema.Field writerField : writerFields) {
Schema.Field readerField = readerFields.get(writerField.name());
fieldReaders[i++] = (readerField == null)
? createFieldSkipper(writerField.name(),
writerField.schema())
: createFieldReader(readerField.name(),
writerField.schema(), readerField.schema());
}
if (!defaultFields.isEmpty()) {
for (Schema.Field defaultField : defaultFields) {
AvroFieldReader fr = AvroFieldDefaulters.createDefaulter(defaultField.name(),
AvroSchemaHelper.objectToJsonNode(defaultField.defaultVal())
);
if (fr == null) {
throw new IllegalArgumentException("Unsupported default type: "+defaultField.schema().getType());
}
fieldReaders[i++] = fr;
}
}
return reader;
}
protected AvroStructureReader createUnionReader(Schema writerSchema, Schema readerSchema)
{
final List<Schema> types = writerSchema.getTypes();
AvroStructureReader[] typeReaders = new AvroStructureReader[types.size()];
int i = 0;
for (Schema type : types) {
typeReaders[i++] = createReader(type);
}
return new UnionReader(typeReaders);
}
protected AvroFieldReader createFieldReader(String name,
Schema writerSchema, Schema readerSchema)
{
ScalarDecoder scalar = createScalarValueDecoder(writerSchema);
if (scalar != null) {
return scalar.asFieldReader(name, false);
}
return AvroFieldReader.construct(name,
createReader(writerSchema, readerSchema));
}
protected AvroFieldReader createFieldSkipper(String name,
Schema writerSchema)
{
ScalarDecoder scalar = createScalarValueDecoder(writerSchema);
if (scalar != null) {
return scalar.asFieldReader(name, true);
}
return AvroFieldReader.constructSkipper(name,
createReader(writerSchema));
}
private Schema _verifyMatchingStructure(Schema readerSchema, Schema writerSchema)
{
final Schema.Type expectedType = writerSchema.getType();
Schema.Type actualType = readerSchema.getType();
if (actualType == expectedType) {
return readerSchema;
}
if (actualType == Schema.Type.UNION) {
for (Schema sch : readerSchema.getTypes()) {
if (sch.getType() == expectedType) {
return sch;
}
}
throw new IllegalStateException(String.format(
"Mismatch between types: expected %s (name '%s'), encountered %s of %d types without match",
expectedType, writerSchema.getName(), actualType, readerSchema.getTypes().size()));
}
throw new IllegalStateException(String.format(
"Mismatch between types: expected %s (name '%s'), encountered %s",
expectedType, writerSchema.getName(), actualType));
}
}
}