/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.avro.io;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.IntFunction;

import org.apache.avro.AvroTypeException;
import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.Resolver;
import org.apache.avro.Resolver.Action;
import org.apache.avro.Resolver.Container;
import org.apache.avro.Resolver.EnumAdjust;
import org.apache.avro.Resolver.Promote;
import org.apache.avro.Resolver.ReaderUnion;
import org.apache.avro.Resolver.RecordAdjust;
import org.apache.avro.Resolver.Skip;
import org.apache.avro.Resolver.WriterUnion;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.InstanceSupplier;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.FastReaderBuilder.RecordReader.Stage;
import org.apache.avro.io.parsing.ResolvingGrammarGenerator;
import org.apache.avro.reflect.ReflectionUtil;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.util.Utf8;
import org.apache.avro.util.WeakIdentityHashMap;
import org.apache.avro.util.internal.Accessor;

public class FastReaderBuilder {

  
Generic/SpecificData instance that contains basic functionalities like instantiation of objects
/** * Generic/SpecificData instance that contains basic functionalities like * instantiation of objects */
private final GenericData data;
first schema is reader schema, second is writer schema
/** first schema is reader schema, second is writer schema */
private final Map<Schema, Map<Schema, RecordReader>> readerCache = Collections .synchronizedMap(new WeakIdentityHashMap<>()); private boolean keyClassEnabled = true; private boolean classPropEnabled = true; public static FastReaderBuilder get() { return new FastReaderBuilder(GenericData.get()); } public static FastReaderBuilder getSpecific() { return new FastReaderBuilder(SpecificData.get()); } public static boolean isSupportedData(GenericData data) { return data.getClass() == GenericData.class || data.getClass() == SpecificData.class; } public FastReaderBuilder(GenericData parentData) { this.data = parentData; } public FastReaderBuilder withKeyClassEnabled(boolean enabled) { this.keyClassEnabled = enabled; return this; } public boolean isKeyClassEnabled() { return this.keyClassEnabled; } public FastReaderBuilder withClassPropEnabled(boolean enabled) { this.classPropEnabled = enabled; return this; } public boolean isClassPropEnabled() { return this.classPropEnabled; } public <D> DatumReader<D> createDatumReader(Schema schema) throws IOException { return createDatumReader(schema, schema); } @SuppressWarnings("unchecked") public <D> DatumReader<D> createDatumReader(Schema writerSchema, Schema readerSchema) throws IOException { Schema resolvedWriterSchema = Schema.applyAliases(writerSchema, readerSchema); return (DatumReader<D>) getReaderFor(readerSchema, resolvedWriterSchema); } private FieldReader getReaderFor(Schema readerSchema, Schema writerSchema) throws IOException { Action resolvedAction = Resolver.resolve(writerSchema, readerSchema, data); return getReaderFor(resolvedAction, null); } private FieldReader getReaderFor(Action action, Conversion<?> explicitConversion) throws IOException { final FieldReader baseReader = getNonConvertedReader(action); return applyConversions(action.reader, baseReader, explicitConversion); } private RecordReader createRecordReader(RecordAdjust action) throws IOException { // record readers are created in a two-step process, first registering it, then // initializing it, // to prevent endless loops on recursive types RecordReader recordReader = getRecordReaderFromCache(action.reader, action.writer); synchronized (recordReader) { // only need to initialize once if (recordReader.getInitializationStage() == Stage.NEW) { initializeRecordReader(recordReader, action); } } return recordReader; } private RecordReader initializeRecordReader(RecordReader recordReader, RecordAdjust action) throws IOException { recordReader.startInitialization(); // generate supplier for the new object instances Object testInstance = action.instanceSupplier.newInstance(null, action.reader); IntFunction<Conversion<?>> conversionSupplier = getConversionSupplier(testInstance); ExecutionStep[] readSteps = new ExecutionStep[action.fieldActions.length + action.readerOrder.length - action.firstDefault]; int i = 0; int fieldCounter = 0; // compute what to do with writer's fields for (; i < action.fieldActions.length; i++) { Action fieldAction = action.fieldActions[i]; if (fieldAction instanceof Skip) { readSteps[i] = (r, decoder) -> GenericDatumReader.skip(fieldAction.writer, decoder); } else { Field readerField = action.readerOrder[fieldCounter++]; Conversion<?> conversion = conversionSupplier.apply(readerField.pos()); FieldReader reader = getReaderFor(fieldAction, conversion); readSteps[i] = createFieldSetter(readerField, reader); } } // add defaulting if required for (; i < readSteps.length; i++) { readSteps[i] = getDefaultingStep(action.readerOrder[fieldCounter++]); } recordReader.finishInitialization(readSteps, action.reader, action.instanceSupplier); return recordReader; } private ExecutionStep createFieldSetter(Field field, FieldReader reader) { int pos = field.pos(); if (reader.canReuse()) { return (object, decoder) -> { IndexedRecord record = (IndexedRecord) object; record.put(pos, reader.read(record.get(pos), decoder)); }; } else { return (object, decoder) -> { IndexedRecord record = (IndexedRecord) object; record.put(pos, reader.read(null, decoder)); }; } } private ExecutionStep getDefaultingStep(Schema.Field field) throws IOException { Object defaultValue = data.getDefaultValue(field); if (isObjectImmutable(defaultValue)) { return createFieldSetter(field, (old, d) -> defaultValue); } else if (defaultValue instanceof Utf8) { return createFieldSetter(field, reusingReader((old, d) -> readUtf8(old, (Utf8) defaultValue))); } else if (defaultValue instanceof List && ((List<?>) defaultValue).isEmpty()) { return createFieldSetter(field, reusingReader((old, d) -> data.newArray(old, 0, field.schema()))); } else if (defaultValue instanceof Map && ((Map<?, ?>) defaultValue).isEmpty()) { return createFieldSetter(field, reusingReader((old, d) -> data.newMap(old, 0))); } else { DatumReader<Object> datumReader = createDatumReader(field.schema()); byte[] encoded = getEncodedValue(field); FieldReader fieldReader = reusingReader( (old, decoder) -> datumReader.read(old, DecoderFactory.get().binaryDecoder(encoded, null))); return createFieldSetter(field, fieldReader); } } private boolean isObjectImmutable(Object object) { return object == null || object instanceof Number || object instanceof String || object instanceof GenericEnumSymbol || object.getClass().isEnum(); } private Utf8 readUtf8(Object reuse, Utf8 newValue) { if (reuse instanceof Utf8) { Utf8 oldUtf8 = (Utf8) reuse; oldUtf8.set(newValue); return oldUtf8; } else { return new Utf8(newValue); } } private byte[] getEncodedValue(Field field) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); ResolvingGrammarGenerator.encode(encoder, field.schema(), Accessor.defaultValue(field)); encoder.flush(); return out.toByteArray(); } private IntFunction<Conversion<?>> getConversionSupplier(Object record) { if (record instanceof SpecificRecordBase) { return ((SpecificRecordBase) record)::getConversion; } else { return index -> null; } } private RecordReader getRecordReaderFromCache(Schema readerSchema, Schema writerSchema) { return readerCache.computeIfAbsent(readerSchema, k -> new WeakIdentityHashMap<>()).computeIfAbsent(writerSchema, k -> new RecordReader()); } private FieldReader applyConversions(Schema readerSchema, FieldReader reader, Conversion<?> explicitConversion) { Conversion<?> conversion = explicitConversion; if (conversion == null) { if (readerSchema.getLogicalType() == null) { return reader; } conversion = data.getConversionFor(readerSchema.getLogicalType()); if (conversion == null) { return reader; } } Conversion<?> finalConversion = conversion; return (old, decoder) -> Conversions.convertToLogicalType(reader.read(old, decoder), readerSchema, readerSchema.getLogicalType(), finalConversion); } private FieldReader getNonConvertedReader(Action action) throws IOException { switch (action.type) { case CONTAINER: switch (action.reader.getType()) { case MAP: return createMapReader(action.reader, (Container) action); case ARRAY: return createArrayReader(action.reader, (Container) action); default: throw new IllegalStateException("Error getting reader for action type " + action.getClass()); } case DO_NOTHING: return getReaderForBaseType(action.reader, action.writer); case RECORD: return createRecordReader((RecordAdjust) action); case ENUM: return createEnumReader((EnumAdjust) action); case PROMOTE: return createPromotingReader((Promote) action); case WRITER_UNION: return createUnionReader((WriterUnion) action); case READER_UNION: return getReaderFor(((ReaderUnion) action).actualAction, null); case ERROR: return (old, decoder) -> { throw new AvroTypeException(action.toString()); }; default: throw new IllegalStateException("Error getting reader for action type " + action.getClass()); } } private FieldReader getReaderForBaseType(Schema readerSchema, Schema writerSchema) throws IOException { switch (readerSchema.getType()) { case NULL: return (old, decoder) -> { decoder.readNull(); return null; }; case BOOLEAN: return (old, decoder) -> decoder.readBoolean(); case STRING: return createStringReader(readerSchema, writerSchema); case INT: return (old, decoder) -> decoder.readInt(); case LONG: return (old, decoder) -> decoder.readLong(); case FLOAT: return (old, decoder) -> decoder.readFloat(); case DOUBLE: return (old, decoder) -> decoder.readDouble(); case BYTES: return createBytesReader(); case FIXED: return createFixedReader(readerSchema, writerSchema); case RECORD: // covered by action type case UNION: // covered by action type case ENUM: // covered by action type case MAP: // covered by action type case ARRAY: // covered by action type default: throw new IllegalStateException("Error getting reader for type " + readerSchema.getFullName()); } } private FieldReader createPromotingReader(Promote promote) throws IOException { switch (promote.reader.getType()) { case BYTES: return (reuse, decoder) -> ByteBuffer.wrap(decoder.readString(null).getBytes()); case STRING: return createBytesPromotingToStringReader(promote.reader); case LONG: return (reuse, decoder) -> (long) decoder.readInt(); case FLOAT: switch (promote.writer.getType()) { case INT: return (reuse, decoder) -> (float) decoder.readInt(); case LONG: return (reuse, decoder) -> (float) decoder.readLong(); default: } break; case DOUBLE: switch (promote.writer.getType()) { case INT: return (reuse, decoder) -> (double) decoder.readInt(); case LONG: return (reuse, decoder) -> (double) decoder.readLong(); case FLOAT: return (reuse, decoder) -> (double) decoder.readFloat(); default: } break; default: } throw new IllegalStateException( "No promotion possible for type " + promote.writer.getType() + " to " + promote.reader.getType()); } private FieldReader createStringReader(Schema readerSchema, Schema writerSchema) { FieldReader stringReader = createSimpleStringReader(readerSchema); if (isClassPropEnabled()) { return getTransformingStringReader(readerSchema.getProp(SpecificData.CLASS_PROP), stringReader); } else { return stringReader; } } private FieldReader createSimpleStringReader(Schema readerSchema) { String stringProperty = readerSchema.getProp(GenericData.STRING_PROP); if (GenericData.StringType.String.name().equals(stringProperty)) { return (old, decoder) -> decoder.readString(); } else { return (old, decoder) -> decoder.readString(old instanceof Utf8 ? (Utf8) old : null); } } private FieldReader createBytesPromotingToStringReader(Schema readerSchema) { String stringProperty = readerSchema.getProp(GenericData.STRING_PROP); if (GenericData.StringType.String.name().equals(stringProperty)) { return (old, decoder) -> getStringFromByteBuffer(decoder.readBytes(null)); } else { return (old, decoder) -> getUtf8FromByteBuffer(old, decoder.readBytes(null)); } } private String getStringFromByteBuffer(ByteBuffer buffer) { return new String(buffer.array(), buffer.position(), buffer.remaining(), StandardCharsets.UTF_8); } private Utf8 getUtf8FromByteBuffer(Object old, ByteBuffer buffer) { return (old instanceof Utf8) ? ((Utf8) old).set(new Utf8(buffer.array())) : new Utf8(buffer.array()); } private FieldReader createUnionReader(WriterUnion action) throws IOException { FieldReader[] unionReaders = new FieldReader[action.actions.length]; for (int i = 0; i < action.actions.length; i++) { unionReaders[i] = getReaderFor(action.actions[i], null); } return createUnionReader(unionReaders); } private FieldReader createUnionReader(FieldReader[] unionReaders) { return reusingReader((reuse, decoder) -> { final int selection = decoder.readIndex(); return unionReaders[selection].read(null, decoder); }); } private FieldReader createMapReader(Schema readerSchema, Container action) throws IOException { FieldReader keyReader = createMapKeyReader(readerSchema); FieldReader valueReader = getReaderFor(action.elementAction, null); return new MapReader(keyReader, valueReader); } private FieldReader createMapKeyReader(Schema readerSchema) { FieldReader stringReader = createSimpleStringReader(readerSchema); if (isKeyClassEnabled()) { return getTransformingStringReader(readerSchema.getProp(SpecificData.KEY_CLASS_PROP), createSimpleStringReader(readerSchema)); } else { return stringReader; } } private FieldReader getTransformingStringReader(String valueClass, FieldReader stringReader) { if (valueClass == null) { return stringReader; } else { Function<String, ?> transformer = findClass(valueClass) .map(clazz -> ReflectionUtil.getConstructorAsFunction(String.class, clazz)).orElse(null); if (transformer != null) { return (old, decoder) -> transformer.apply((String) stringReader.read(null, decoder)); } } return stringReader; } private Optional<Class<?>> findClass(String clazz) { try { return Optional.of(data.getClassLoader().loadClass(clazz)); } catch (ReflectiveOperationException e) { return Optional.empty(); } } @SuppressWarnings("unchecked") private FieldReader createArrayReader(Schema readerSchema, Container action) throws IOException { FieldReader elementReader = getReaderFor(action.elementAction, null); return reusingReader((reuse, decoder) -> { if (reuse instanceof GenericArray) { GenericArray<Object> reuseArray = (GenericArray<Object>) reuse; long l = decoder.readArrayStart(); reuseArray.clear(); while (l > 0) { for (long i = 0; i < l; i++) { reuseArray.add(elementReader.read(reuseArray.peek(), decoder)); } l = decoder.arrayNext(); } return reuseArray; } else { long l = decoder.readArrayStart(); List<Object> array = (reuse instanceof List) ? (List<Object>) reuse : new GenericData.Array<>((int) l, readerSchema); array.clear(); while (l > 0) { for (long i = 0; i < l; i++) { array.add(elementReader.read(null, decoder)); } l = decoder.arrayNext(); } return array; } }); } private FieldReader createEnumReader(EnumAdjust action) { return reusingReader((reuse, decoder) -> { int index = decoder.readEnum(); Object resultObject = action.values[index]; if (resultObject == null) { throw new AvroTypeException("No match for " + action.writer.getEnumSymbols().get(index)); } return resultObject; }); } private FieldReader createFixedReader(Schema readerSchema, Schema writerSchema) { return reusingReader((reuse, decoder) -> { GenericFixed fixed = (GenericFixed) data.createFixed(reuse, readerSchema); decoder.readFixed(fixed.bytes(), 0, readerSchema.getFixedSize()); return fixed; }); } private FieldReader createBytesReader() { return reusingReader( (reuse, decoder) -> decoder.readBytes(reuse instanceof ByteBuffer ? (ByteBuffer) reuse : null)); } public static FieldReader reusingReader(ReusingFieldReader reader) { return reader; } public interface FieldReader extends DatumReader<Object> { @Override public Object read(Object reuse, Decoder decoder) throws IOException; public default boolean canReuse() { return false; } @Override default void setSchema(Schema schema) { throw new UnsupportedOperationException(); } } public interface ReusingFieldReader extends FieldReader { @Override public default boolean canReuse() { return true; } } public static class RecordReader implements FieldReader { public enum Stage { NEW, INITIALIZING, INITIALIZED } private ExecutionStep[] readSteps; private InstanceSupplier supplier; private Schema schema; private Stage stage = Stage.NEW; public Stage getInitializationStage() { return this.stage; } public void reset() { this.stage = Stage.NEW; } public void startInitialization() { this.stage = Stage.INITIALIZING; } public void finishInitialization(ExecutionStep[] readSteps, Schema schema, InstanceSupplier supp) { this.readSteps = readSteps; this.schema = schema; this.supplier = supp; this.stage = Stage.INITIALIZED; } @Override public boolean canReuse() { return true; } @Override public Object read(Object reuse, Decoder decoder) throws IOException { Object object = supplier.newInstance(reuse, schema); for (ExecutionStep thisStep : readSteps) { thisStep.execute(object, decoder); } return object; } } public static class MapReader implements FieldReader { private final FieldReader keyReader; private final FieldReader valueReader; public MapReader(FieldReader keyReader, FieldReader valueReader) { this.keyReader = keyReader; this.valueReader = valueReader; } @Override public Object read(Object reuse, Decoder decoder) throws IOException { long l = decoder.readMapStart(); Map<Object, Object> targetMap = new HashMap<>(); while (l > 0) { for (int i = 0; i < l; i++) { Object key = keyReader.read(null, decoder); Object value = valueReader.read(null, decoder); targetMap.put(key, value); } l = decoder.mapNext(); } return targetMap; } } public interface ExecutionStep { public void execute(Object record, Decoder decoder) throws IOException; } }