package com.fasterxml.jackson.dataformat.avro.ser;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.UnresolvedUnionException;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.reflect.ReflectData;
import com.fasterxml.jackson.core.JsonStreamContext;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.dataformat.avro.AvroGenerator;
import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaHelper;
public abstract class AvroWriteContext
extends JsonStreamContext
{
private final static Class<?> CLS_STRING = String.class;
private final static Class<?> CLS_BIG_DECIMAL = BigDecimal.class;
private final static Class<?> CLS_GENERIC_RECORD = GenericData.Record.class;
private final static Class<?> CLS_GENERIC_ARRAY = GenericData.Array.class;
protected final AvroWriteContext _parent;
protected final AvroGenerator _generator;
protected final Schema _schema;
protected Object _currentValue;
protected AvroWriteContext(int type, AvroWriteContext parent,
AvroGenerator generator, Schema schema, Object currValue)
{
super();
_type = type;
_parent = parent;
_generator = generator;
_schema = schema;
_currentValue = currValue;
}
public static AvroWriteContext createRootContext(AvroGenerator generator, Schema schema,
BinaryEncoder encoder) {
return new RootContext(generator, schema, encoder);
}
public static AvroWriteContext nullContext() {
return NullContext.instance;
}
public abstract AvroWriteContext createChildArrayContext(Object currValue) throws JsonMappingException;
public abstract AvroWriteContext createChildObjectContext(Object currValue) throws JsonMappingException;
public void complete() throws IOException {
throw new IllegalStateException("Can not be called on "+getClass().getName());
}
@Override
public Object getCurrentValue() {
return _currentValue;
}
@Override
public void setCurrentValue(Object v) {
_currentValue = v;
}
@Override
public final AvroWriteContext getParent() { return _parent; }
@Override
public String getCurrentName() { return null; }
public boolean writeFieldName(String name) throws IOException {
return false;
}
public abstract void writeValue(Object value) throws IOException;
public abstract void writeString(String value) throws IOException;
public abstract void writeNull() throws IOException;
public abstract Object rawValue();
public boolean canClose() { return true; }
protected abstract void appendDesc(StringBuilder sb);
@Override
public final String toString()
{
StringBuilder sb = new StringBuilder(64);
appendDesc(sb);
return sb.toString();
}
protected GenericRecord _createRecord(Schema schema, Object currValue) throws JsonMappingException
{
Type type = schema.getType();
if (type == Schema.Type.UNION) {
try {
schema = resolveUnionSchema(schema, currValue);
} catch (UnresolvedUnionException e) {
schema = _recordOrMapFromUnion(schema);
}
}
if (type == Schema.Type.MAP) {
throw new IllegalStateException("_createRecord should never be called for elements of type MAP");
}
try {
return new GenericData.Record(schema);
} catch (RuntimeException e) {
throw new JsonMappingException(null, "Failed to create Record type from "+type, e);
}
}
protected GenericRecord _createRecord(Schema schema) throws JsonMappingException
{
Type type = schema.getType();
if (type == Schema.Type.UNION) {
schema = _recordOrMapFromUnion(schema);
}
if (type == Schema.Type.MAP) {
throw new IllegalStateException("_createRecord should never be called for elements of type MAP");
}
try {
return new GenericData.Record(schema);
} catch (RuntimeException e) {
throw new JsonMappingException(null, "Failed to create Record type from "+type, e);
}
}
protected GenericArray<Object> _createArray(Schema schema)
{
if (schema.getType() == Schema.Type.UNION) {
int arraySchemaIndex = schema.getIndexNamed(Type.ARRAY.getName());
if (arraySchemaIndex < 0) {
throw new IllegalStateException("No Array type found in union type: "+schema);
}
schema = schema.getTypes().get(arraySchemaIndex);
}
return new GenericData.Array<Object>(8, schema);
}
protected AvroWriteContext _createObjectContext(Schema schema, Object currValue)
throws JsonMappingException
{
Type type = schema.getType();
if (type == Schema.Type.UNION) {
if (currValue == null) {
schema = _recordOrMapFromUnion(schema);
} else {
try {
schema = resolveUnionSchema(schema, currValue);
} catch (UnresolvedUnionException e) {
schema = _recordOrMapFromUnion(schema);
}
}
type = schema.getType();
}
if (type == Schema.Type.MAP) {
return new MapWriteContext(this, _generator, schema, currValue);
}
return new ObjectWriteContext(this, _generator, _createRecord(schema), currValue);
}
protected Schema _recordOrMapFromUnion(Schema unionSchema)
{
Schema match = null;
for (Schema s : unionSchema.getTypes()) {
Schema.Type type = s.getType();
if (type == Schema.Type.RECORD || type == Schema.Type.MAP) {
if (match != null) {
throw new IllegalStateException("Multiple Record and/or Map types, can not figure out which to use for: "
+unionSchema);
}
match = s;
}
}
if (match == null) {
throw new IllegalStateException("No Record or Map type found in union type: "+unionSchema);
}
return match;
}
public static int resolveUnionIndex(Schema unionSchema, Object datum) {
final List<Schema> types = unionSchema.getTypes();
if (datum == null) {
for (int i = 0, size = types.size(); i < size; ++i) {
Schema type = types.get(i);
if (type.getType() == Type.NULL) {
return i;
}
}
} else {
Class<?> raw = datum.getClass();
if (raw == CLS_STRING) {
return _resolveStringIndex(unionSchema, types, (String) datum);
}
int ix = _findNotNullIndex(types);
if (ix >= 0) {
return ix;
}
if (raw == CLS_BIG_DECIMAL) {
return _resolveBigDecimalIndex(unionSchema, types, (BigDecimal) datum);
}
if (raw == CLS_GENERIC_RECORD) {
return _resolveRecordIndex(unionSchema, types, (GenericData.Record) datum);
}
if (raw == CLS_GENERIC_ARRAY) {
return _resolveArrayIndex(unionSchema, types, (GenericData.Array<?>) datum);
}
if (datum instanceof Map<?,?>) {
return _resolveMapIndex(unionSchema, types, datum);
}
String typeId = AvroSchemaHelper.getTypeId(datum.getClass());
for (int i = 0, size = types.size(); i < size; ++i) {
Schema schema = types.get(i);
if (typeId.equals(AvroSchemaHelper.getTypeId(schema))) {
return i;
}
}
}
return ReflectData.get().resolveUnion(unionSchema, datum);
}
public static Schema resolveUnionType(Schema unionSchema, Object datum) {
final List<Schema> types = unionSchema.getTypes();
if (datum == null) {
for (int i = 0, size = types.size(); i < size; ++i) {
Schema type = types.get(i);
if (type.getType() == Type.NULL) {
return type;
}
}
} else {
Class<?> raw = datum.getClass();
if (raw == CLS_STRING) {
return types.get(_resolveStringIndex(unionSchema, types, (String) datum));
}
Schema sch = _findNotNull(types);
if (sch != null) {
return sch;
}
if (raw == CLS_BIG_DECIMAL) {
return types.get(_resolveBigDecimalIndex(unionSchema, types, (BigDecimal) datum));
}
if (raw == CLS_GENERIC_RECORD) {
return types.get(_resolveRecordIndex(unionSchema, types, (GenericData.Record) datum));
}
if (raw == CLS_GENERIC_ARRAY) {
return types.get(_resolveArrayIndex(unionSchema, types, (GenericData.Array<?>) datum));
}
if (datum instanceof Map<?,?>) {
return types.get(_resolveMapIndex(unionSchema, types, datum));
}
String typeId = AvroSchemaHelper.getTypeId(datum.getClass());
for (int i = 0, size = types.size(); i < size; ++i) {
Schema schema = types.get(i);
if (typeId.equals(AvroSchemaHelper.getTypeId(schema))) {
return schema;
}
}
}
int ix = ReflectData.get().resolveUnion(unionSchema, datum);
return types.get(ix);
}
private static int _resolveStringIndex(Schema unionSchema, List<Schema> types,
String value)
{
for (int i = 0, size = types.size(); i < size; ++i) {
Schema schema = types.get(i);
Schema.Type t = schema.getType();
if (t == Type.STRING) {
return i;
}
if (t == Type.ENUM) {
return i;
}
if (t == Type.INT
&& value.length() == 1
&& AvroSchemaHelper.getTypeId(Character.class).equals(schema.getProp(AvroSchemaHelper.AVRO_SCHEMA_PROP_CLASS))
) {
return i;
}
if (t == Type.ARRAY
&& schema.getElementType().getType() == Type.INT
&& AvroSchemaHelper.getTypeId(Character.class).equals(schema.getElementType().getProp(AvroSchemaHelper.AVRO_SCHEMA_PROP_CLASS))
) {
return i;
}
}
return ReflectData.get().resolveUnion(unionSchema, value);
}
private static Schema _findNotNull(List<Schema> types)
{
if (types.size() == 2) {
if (types.get(0).getType() == Type.NULL) {
return types.get(1);
}
if (types.get(1).getType() == Type.NULL) {
return types.get(0);
}
}
return null;
}
private static int _findNotNullIndex(List<Schema> types)
{
if (types.size() == 2) {
if (types.get(0).getType() == Type.NULL) {
return 1;
}
if (types.get(1).getType() == Type.NULL) {
return 0;
}
}
return -1;
}
private static int _resolveBigDecimalIndex(Schema unionSchema, List<Schema> types,
BigDecimal value) {
int match = -1;
for (int i = 0, size = types.size(); i < size; ++i) {
Schema schema = types.get(i);
Schema.Type t = schema.getType();
if (t == Type.DOUBLE) {
return i;
}
if (t == Type.DOUBLE) {
match = i;
continue;
}
}
if (match < 0) {
match = ReflectData.get().resolveUnion(unionSchema, value);
}
return match;
}
private static int _resolveMapIndex(Schema unionSchema, List<Schema> types,
Object value)
{
for (int i = 0, size = types.size(); i < size; ++i) {
if (types.get(i).getType() == Type.MAP) {
return i;
}
}
return ReflectData.get().resolveUnion(unionSchema, value);
}
private static int _resolveRecordIndex(Schema unionSchema, List<Schema> types,
GenericData.Record value)
{
String name = value.getSchema().getFullName();
for (int i = 0, size = types.size(); i < size; ++i) {
Schema sch = types.get(i);
if (sch.getType() == Type.RECORD) {
if (name.equals(sch.getFullName())) {
return i;
}
}
}
return ReflectData.get().resolveUnion(unionSchema, value);
}
private static int _resolveArrayIndex(Schema unionSchema, List<Schema> types,
GenericData.Array<?> value)
{
for (int i = 0, size = types.size(); i < size; ++i) {
Schema sch = types.get(i);
if (sch.getType() == Type.ARRAY) {
return i;
}
}
return ReflectData.get().resolveUnion(unionSchema, value);
}
public static Schema resolveUnionSchema(Schema unionSchema, Object datum) {
return resolveUnionType(unionSchema, datum);
}
private final static class NullContext
extends AvroWriteContext
{
public final static NullContext instance = new NullContext();
private NullContext() {
super(TYPE_ROOT, null, null, null, null);
}
@Override
public Object rawValue() { return null; }
@Override
public final AvroWriteContext createChildArrayContext(Object currValue) {
_reportError();
return null;
}
@Override
public final AvroWriteContext createChildObjectContext(Object currValue) {
_reportError();
return null;
}
@Override
public void writeValue(Object value) {
_reportError();
}
@Override
public void writeString(String value) {
_reportError();
}
@Override
public void writeNull() {
_reportError();
}
@Override
public void appendDesc(StringBuilder sb) {
sb.append("?");
}
protected void _reportError() {
throw new IllegalStateException("Can not write Avro output without specifying Schema");
}
}
}