/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.avro.generic;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.util.Utf8;
import org.apache.avro.util.WeakIdentityHashMap;

DatumReader for generic Java objects.
/** {@link DatumReader} for generic Java objects. */
public class GenericDatumReader<D> implements DatumReader<D> { private final GenericData data; private Schema actual; private Schema expected; private DatumReader<D> fastDatumReader = null; private ResolvingDecoder creatorResolver = null; private final Thread creator; public GenericDatumReader() { this(null, null, GenericData.get()); }
Construct where the writer's and reader's schemas are the same.
/** Construct where the writer's and reader's schemas are the same. */
public GenericDatumReader(Schema schema) { this(schema, schema, GenericData.get()); }
Construct given writer's and reader's schema.
/** Construct given writer's and reader's schema. */
public GenericDatumReader(Schema writer, Schema reader) { this(writer, reader, GenericData.get()); } public GenericDatumReader(Schema writer, Schema reader, GenericData data) { this(data); this.actual = writer; this.expected = reader; } protected GenericDatumReader(GenericData data) { this.data = data; this.creator = Thread.currentThread(); }
Return the GenericData implementation.
/** Return the {@link GenericData} implementation. */
public GenericData getData() { return data; }
Return the writer's schema.
/** Return the writer's schema. */
public Schema getSchema() { return actual; } @Override public void setSchema(Schema writer) { this.actual = writer; if (expected == null) { expected = actual; } creatorResolver = null; fastDatumReader = null; }
Get the reader's schema.
/** Get the reader's schema. */
public Schema getExpected() { return expected; }
Set the reader's schema.
/** Set the reader's schema. */
public void setExpected(Schema reader) { this.expected = reader; creatorResolver = null; } private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>> RESOLVER_CACHE = ThreadLocal .withInitial(WeakIdentityHashMap::new);
Gets a resolving decoder for use by this GenericDatumReader. Unstable API. Currently uses a thread local cache to prevent constructing the resolvers too often, because that is very expensive.
/** * Gets a resolving decoder for use by this GenericDatumReader. Unstable API. * Currently uses a thread local cache to prevent constructing the resolvers too * often, because that is very expensive. */
protected final ResolvingDecoder getResolver(Schema actual, Schema expected) throws IOException { Thread currThread = Thread.currentThread(); ResolvingDecoder resolver; if (currThread == creator && creatorResolver != null) { return creatorResolver; } Map<Schema, ResolvingDecoder> cache = RESOLVER_CACHE.get().get(actual); if (cache == null) { cache = new WeakIdentityHashMap<>(); RESOLVER_CACHE.get().put(actual, cache); } resolver = cache.get(expected); if (resolver == null) { resolver = DecoderFactory.get().resolvingDecoder(Schema.applyAliases(actual, expected), expected, null); cache.put(expected, resolver); } if (currThread == creator) { creatorResolver = resolver; } return resolver; } @Override @SuppressWarnings("unchecked") public D read(D reuse, Decoder in) throws IOException { if (data.isFastReaderEnabled()) { if (this.fastDatumReader == null) { this.fastDatumReader = data.getFastReaderBuilder().createDatumReader(actual, expected); } return fastDatumReader.read(reuse, in); } ResolvingDecoder resolver = getResolver(actual, expected); resolver.configure(in); D result = (D) read(reuse, expected, resolver); resolver.drain(); return result; }
Called to read data.
/** Called to read data. */
protected Object read(Object old, Schema expected, ResolvingDecoder in) throws IOException { Object datum = readWithoutConversion(old, expected, in); LogicalType logicalType = expected.getLogicalType(); if (logicalType != null) { Conversion<?> conversion = getData().getConversionFor(logicalType); if (conversion != null) { return convert(datum, expected, logicalType, conversion); } } return datum; } protected Object readWithConversion(Object old, Schema expected, LogicalType logicalType, Conversion<?> conversion, ResolvingDecoder in) throws IOException { return convert(readWithoutConversion(old, expected, in), expected, logicalType, conversion); } protected Object readWithoutConversion(Object old, Schema expected, ResolvingDecoder in) throws IOException { switch (expected.getType()) { case RECORD: return readRecord(old, expected, in); case ENUM: return readEnum(expected, in); case ARRAY: return readArray(old, expected, in); case MAP: return readMap(old, expected, in); case UNION: return read(old, expected.getTypes().get(in.readIndex()), in); case FIXED: return readFixed(old, expected, in); case STRING: return readString(old, expected, in); case BYTES: return readBytes(old, expected, in); case INT: return readInt(old, expected, in); case LONG: return in.readLong(); case FLOAT: return in.readFloat(); case DOUBLE: return in.readDouble(); case BOOLEAN: return in.readBoolean(); case NULL: in.readNull(); return null; default: throw new AvroRuntimeException("Unknown type: " + expected); } }
Convert a underlying representation of a logical type (such as a ByteBuffer) to a higher level object (such as a BigDecimal).
Throws:
  • IllegalArgumentException – if a null schema or logicalType is passed in while datum and conversion are not null. Please be noticed that the exception type has changed. With version 1.8.0 and earlier, in above circumstance, the exception thrown out depends on the implementation of conversion (most likely a NullPointerException). Now, an IllegalArgumentException will be thrown out instead.
/** * Convert a underlying representation of a logical type (such as a ByteBuffer) * to a higher level object (such as a BigDecimal). * * @throws IllegalArgumentException if a null schema or logicalType is passed in * while datum and conversion are not null. * Please be noticed that the exception type * has changed. With version 1.8.0 and earlier, * in above circumstance, the exception thrown * out depends on the implementation of * conversion (most likely a * NullPointerException). Now, an * IllegalArgumentException will be thrown out * instead. */
protected Object convert(Object datum, Schema schema, LogicalType type, Conversion<?> conversion) { return Conversions.convertToLogicalType(datum, schema, type, conversion); }
Called to read a record instance. May be overridden for alternate record representations.
/** * Called to read a record instance. May be overridden for alternate record * representations. */
protected Object readRecord(Object old, Schema expected, ResolvingDecoder in) throws IOException { final Object record = data.newRecord(old, expected); final Object state = data.getRecordState(record, expected); for (Field field : in.readFieldOrder()) { int pos = field.pos(); String name = field.name(); Object oldDatum = null; if (old != null) { oldDatum = data.getField(record, name, pos, state); } readField(record, field, oldDatum, in, state); } return record; }
Called to read a single field of a record. May be overridden for more efficient or alternate implementations.
/** * Called to read a single field of a record. May be overridden for more * efficient or alternate implementations. */
protected void readField(Object record, Field field, Object oldDatum, ResolvingDecoder in, Object state) throws IOException { data.setField(record, field.name(), field.pos(), read(oldDatum, field.schema(), in), state); }
Called to read an enum value. May be overridden for alternate enum representations. By default, returns a GenericEnumSymbol.
/** * Called to read an enum value. May be overridden for alternate enum * representations. By default, returns a GenericEnumSymbol. */
protected Object readEnum(Schema expected, Decoder in) throws IOException { return createEnum(expected.getEnumSymbols().get(in.readEnum()), expected); }
Called to create an enum value. May be overridden for alternate enum representations. By default, returns a GenericEnumSymbol.
/** * Called to create an enum value. May be overridden for alternate enum * representations. By default, returns a GenericEnumSymbol. */
protected Object createEnum(String symbol, Schema schema) { return data.createEnum(symbol, schema); }
Called to read an array instance. May be overridden for alternate array representations.
/** * Called to read an array instance. May be overridden for alternate array * representations. */
protected Object readArray(Object old, Schema expected, ResolvingDecoder in) throws IOException { Schema expectedType = expected.getElementType(); long l = in.readArrayStart(); long base = 0; if (l > 0) { LogicalType logicalType = expectedType.getLogicalType(); Conversion<?> conversion = getData().getConversionFor(logicalType); Object array = newArray(old, (int) l, expected); do { if (logicalType != null && conversion != null) { for (long i = 0; i < l; i++) { addToArray(array, base + i, readWithConversion(peekArray(array), expectedType, logicalType, conversion, in)); } } else { for (long i = 0; i < l; i++) { addToArray(array, base + i, readWithoutConversion(peekArray(array), expectedType, in)); } } base += l; } while ((l = in.arrayNext()) > 0); return pruneArray(array); } else { return pruneArray(newArray(old, 0, expected)); } } private Object pruneArray(Object object) { if (object instanceof GenericArray<?>) { ((GenericArray<?>) object).prune(); } return object; }
Called by the default implementation of readArray to retrieve a value from a reused instance. The default implementation is for GenericArray.
/** * Called by the default implementation of {@link #readArray} to retrieve a * value from a reused instance. The default implementation is for * {@link GenericArray}. */
@SuppressWarnings("unchecked") protected Object peekArray(Object array) { return (array instanceof GenericArray) ? ((GenericArray) array).peek() : null; }
Called by the default implementation of readArray to add a value. The default implementation is for Collection.
/** * Called by the default implementation of {@link #readArray} to add a value. * The default implementation is for {@link Collection}. */
@SuppressWarnings("unchecked") protected void addToArray(Object array, long pos, Object e) { ((Collection) array).add(e); }
Called to read a map instance. May be overridden for alternate map representations.
/** * Called to read a map instance. May be overridden for alternate map * representations. */
protected Object readMap(Object old, Schema expected, ResolvingDecoder in) throws IOException { Schema eValue = expected.getValueType(); long l = in.readMapStart(); LogicalType logicalType = eValue.getLogicalType(); Conversion<?> conversion = getData().getConversionFor(logicalType); Object map = newMap(old, (int) l); if (l > 0) { do { if (logicalType != null && conversion != null) { for (int i = 0; i < l; i++) { addToMap(map, readMapKey(null, expected, in), readWithConversion(null, eValue, logicalType, conversion, in)); } } else { for (int i = 0; i < l; i++) { addToMap(map, readMapKey(null, expected, in), readWithoutConversion(null, eValue, in)); } } } while ((l = in.mapNext()) > 0); } return map; }
Called by the default implementation of readMap to read a key value. The default implementation returns delegates to readString(Object, Decoder).
/** * Called by the default implementation of {@link #readMap} to read a key value. * The default implementation returns delegates to * {@link #readString(Object, org.apache.avro.io.Decoder)}. */
protected Object readMapKey(Object old, Schema expected, Decoder in) throws IOException { return readString(old, expected, in); }
Called by the default implementation of readMap to add a key/value pair. The default implementation is for Map.
/** * Called by the default implementation of {@link #readMap} to add a key/value * pair. The default implementation is for {@link Map}. */
@SuppressWarnings("unchecked") protected void addToMap(Object map, Object key, Object value) { ((Map) map).put(key, value); }
Called to read a fixed value. May be overridden for alternate fixed representations. By default, returns GenericFixed.
/** * Called to read a fixed value. May be overridden for alternate fixed * representations. By default, returns {@link GenericFixed}. */
protected Object readFixed(Object old, Schema expected, Decoder in) throws IOException { GenericFixed fixed = (GenericFixed) data.createFixed(old, expected); in.readFixed(fixed.bytes(), 0, expected.getFixedSize()); return fixed; }
Called to create an fixed value. May be overridden for alternate fixed representations. By default, returns GenericFixed.
Deprecated:As of Avro 1.6.0 this method has been moved to GenericData.createFixed(Object, Schema)
/** * Called to create an fixed value. May be overridden for alternate fixed * representations. By default, returns {@link GenericFixed}. * * @deprecated As of Avro 1.6.0 this method has been moved to * {@link GenericData#createFixed(Object, Schema)} */
@Deprecated protected Object createFixed(Object old, Schema schema) { return data.createFixed(old, schema); }
Called to create an fixed value. May be overridden for alternate fixed representations. By default, returns GenericFixed.
Deprecated:As of Avro 1.6.0 this method has been moved to GenericData.createFixed(Object, byte[], Schema)
/** * Called to create an fixed value. May be overridden for alternate fixed * representations. By default, returns {@link GenericFixed}. * * @deprecated As of Avro 1.6.0 this method has been moved to * {@link GenericData#createFixed(Object, byte[], Schema)} */
@Deprecated protected Object createFixed(Object old, byte[] bytes, Schema schema) { return data.createFixed(old, bytes, schema); }
Called to create new record instances. Subclasses may override to use a different record implementation. The returned instance must conform to the schema provided. If the old object contains fields not present in the schema, they should either be removed from the old object, or it should create a new instance that conforms to the schema. By default, this returns a Record.
Deprecated:As of Avro 1.6.0 this method has been moved to GenericData.newRecord(Object, Schema)
/** * Called to create new record instances. Subclasses may override to use a * different record implementation. The returned instance must conform to the * schema provided. If the old object contains fields not present in the schema, * they should either be removed from the old object, or it should create a new * instance that conforms to the schema. By default, this returns a * {@link GenericData.Record}. * * @deprecated As of Avro 1.6.0 this method has been moved to * {@link GenericData#newRecord(Object, Schema)} */
@Deprecated protected Object newRecord(Object old, Schema schema) { return data.newRecord(old, schema); }
Called to create new array instances. Subclasses may override to use a different array implementation. By default, this returns a Array.
/** * Called to create new array instances. Subclasses may override to use a * different array implementation. By default, this returns a * {@link GenericData.Array}. */
@SuppressWarnings("unchecked") protected Object newArray(Object old, int size, Schema schema) { return data.newArray(old, size, schema); }
Called to create new array instances. Subclasses may override to use a different map implementation. By default, this returns a HashMap.
/** * Called to create new array instances. Subclasses may override to use a * different map implementation. By default, this returns a {@link HashMap}. */
@SuppressWarnings("unchecked") protected Object newMap(Object old, int size) { return data.newMap(old, size); }
Called to read strings. Subclasses may override to use a different string representation. By default, this calls readString(Object, Decoder).
/** * Called to read strings. Subclasses may override to use a different string * representation. By default, this calls {@link #readString(Object,Decoder)}. */
protected Object readString(Object old, Schema expected, Decoder in) throws IOException { Class stringClass = getStringClass(expected); if (stringClass == String.class) { return in.readString(); } if (stringClass == CharSequence.class) { return readString(old, in); } return newInstanceFromString(stringClass, in.readString()); }
Called to read strings. Subclasses may override to use a different string representation. By default, this calls Decoder.readString(Utf8).
/** * Called to read strings. Subclasses may override to use a different string * representation. By default, this calls {@link Decoder#readString(Utf8)}. */
protected Object readString(Object old, Decoder in) throws IOException { return in.readString(old instanceof Utf8 ? (Utf8) old : null); }
Called to create a string from a default value. Subclasses may override to use a different string representation. By default, this calls Utf8(String).
/** * Called to create a string from a default value. Subclasses may override to * use a different string representation. By default, this calls * {@link Utf8#Utf8(String)}. */
protected Object createString(String value) { return new Utf8(value); }
Determines the class to used to represent a string Schema. By default uses GenericData.STRING_PROP to determine whether Utf8 or String is used. Subclasses may override for alternate representations.
/** * Determines the class to used to represent a string Schema. By default uses * {@link GenericData#STRING_PROP} to determine whether {@link Utf8} or * {@link String} is used. Subclasses may override for alternate * representations. */
protected Class findStringClass(Schema schema) { String name = schema.getProp(GenericData.STRING_PROP); if (name == null) return CharSequence.class; switch (GenericData.StringType.valueOf(name)) { case String: return String.class; default: return CharSequence.class; } } private Map<Schema, Class> stringClassCache = new IdentityHashMap<>(); private Class getStringClass(Schema s) { Class c = stringClassCache.get(s); if (c == null) { c = findStringClass(s); stringClassCache.put(s, c); } return c; } private final Map<Class, Constructor> stringCtorCache = new HashMap<>(); @SuppressWarnings("unchecked") protected Object newInstanceFromString(Class c, String s) { try { Constructor ctor = stringCtorCache.get(c); if (ctor == null) { ctor = c.getDeclaredConstructor(String.class); ctor.setAccessible(true); stringCtorCache.put(c, ctor); } return ctor.newInstance(s); } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException | InstantiationException e) { throw new AvroRuntimeException(e); } }
Called to read byte arrays. Subclasses may override to use a different byte array representation. By default, this calls Decoder.readBytes(ByteBuffer).
/** * Called to read byte arrays. Subclasses may override to use a different byte * array representation. By default, this calls * {@link Decoder#readBytes(ByteBuffer)}. */
protected Object readBytes(Object old, Schema s, Decoder in) throws IOException { return readBytes(old, in); }
Called to read byte arrays. Subclasses may override to use a different byte array representation. By default, this calls Decoder.readBytes(ByteBuffer).
/** * Called to read byte arrays. Subclasses may override to use a different byte * array representation. By default, this calls * {@link Decoder#readBytes(ByteBuffer)}. */
protected Object readBytes(Object old, Decoder in) throws IOException { return in.readBytes(old instanceof ByteBuffer ? (ByteBuffer) old : null); }
Called to read integers. Subclasses may override to use a different integer representation. By default, this calls Decoder.readInt().
/** * Called to read integers. Subclasses may override to use a different integer * representation. By default, this calls {@link Decoder#readInt()}. */
protected Object readInt(Object old, Schema expected, Decoder in) throws IOException { return in.readInt(); }
Called to create byte arrays from default values. Subclasses may override to use a different byte array representation. By default, this calls ByteBuffer.wrap(byte[]).
/** * Called to create byte arrays from default values. Subclasses may override to * use a different byte array representation. By default, this calls * {@link ByteBuffer#wrap(byte[])}. */
protected Object createBytes(byte[] value) { return ByteBuffer.wrap(value); }
Skip an instance of a schema.
/** Skip an instance of a schema. */
public static void skip(Schema schema, Decoder in) throws IOException { switch (schema.getType()) { case RECORD: for (Field field : schema.getFields()) skip(field.schema(), in); break; case ENUM: in.readEnum(); break; case ARRAY: Schema elementType = schema.getElementType(); for (long l = in.skipArray(); l > 0; l = in.skipArray()) { for (long i = 0; i < l; i++) { skip(elementType, in); } } break; case MAP: Schema value = schema.getValueType(); for (long l = in.skipMap(); l > 0; l = in.skipMap()) { for (long i = 0; i < l; i++) { in.skipString(); skip(value, in); } } break; case UNION: skip(schema.getTypes().get(in.readIndex()), in); break; case FIXED: in.skipFixed(schema.getFixedSize()); break; case STRING: in.skipString(); break; case BYTES: in.skipBytes(); break; case INT: in.readInt(); break; case LONG: in.readLong(); break; case FLOAT: in.readFloat(); break; case DOUBLE: in.readDouble(); break; case BOOLEAN: in.readBoolean(); break; case NULL: in.readNull(); break; default: throw new RuntimeException("Unknown type: " + schema); } } }