/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.avro.io;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Stack;

import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.avro.io.parsing.JsonGrammarGenerator;
import org.apache.avro.io.parsing.Parser;
import org.apache.avro.io.parsing.Symbol;
import org.apache.avro.util.Utf8;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.util.TokenBuffer;

A Decoder for Avro's JSON data encoding.

Construct using DecoderFactory.

JsonDecoder is not thread-safe.
/** * A {@link Decoder} for Avro's JSON data encoding. * </p> * Construct using {@link DecoderFactory}. * </p> * JsonDecoder is not thread-safe. */
public class JsonDecoder extends ParsingDecoder implements Parser.ActionHandler { private JsonParser in; private static JsonFactory jsonFactory = new JsonFactory(); Stack<ReorderBuffer> reorderBuffers = new Stack<>(); ReorderBuffer currentReorderBuffer; private static class ReorderBuffer { public Map<String, TokenBuffer> savedFields = new HashMap<>(); public JsonParser origParser = null; } private JsonDecoder(Symbol root, InputStream in) throws IOException { super(root); configure(in); } private JsonDecoder(Symbol root, String in) throws IOException { super(root); configure(in); } JsonDecoder(Schema schema, InputStream in) throws IOException { this(getSymbol(schema), in); } JsonDecoder(Schema schema, String in) throws IOException { this(getSymbol(schema), in); } private static Symbol getSymbol(Schema schema) { Objects.requireNonNull(schema, "Schema cannot be null"); return new JsonGrammarGenerator().generate(schema); }
Reconfigures this JsonDecoder to use the InputStream provided.

If the InputStream provided is null, a NullPointerException is thrown.

Otherwise, this JsonDecoder will reset its state and then reconfigure its input.
Params:
  • in – The InputStream to read from. Cannot be null.
Throws:
Returns:this JsonDecoder
/** * Reconfigures this JsonDecoder to use the InputStream provided. * <p/> * If the InputStream provided is null, a NullPointerException is thrown. * <p/> * Otherwise, this JsonDecoder will reset its state and then reconfigure its * input. * * @param in The InputStream to read from. Cannot be null. * @throws IOException * @throws NullPointerException if {@code in} is {@code null} * @return this JsonDecoder */
public JsonDecoder configure(InputStream in) throws IOException { Objects.requireNonNull(in, "InputStream cannot be null"); parser.reset(); reorderBuffers.clear(); currentReorderBuffer = null; this.in = jsonFactory.createParser(in); this.in.nextToken(); return this; }
Reconfigures this JsonDecoder to use the String provided for input.

If the String provided is null, a NullPointerException is thrown.

Otherwise, this JsonDecoder will reset its state and then reconfigure its input.
Params:
  • in – The String to read from. Cannot be null.
Throws:
Returns:this JsonDecoder
/** * Reconfigures this JsonDecoder to use the String provided for input. * <p/> * If the String provided is null, a NullPointerException is thrown. * <p/> * Otherwise, this JsonDecoder will reset its state and then reconfigure its * input. * * @param in The String to read from. Cannot be null. * @throws IOException * @throws NullPointerException if {@code in} is {@code null} * @return this JsonDecoder */
public JsonDecoder configure(String in) throws IOException { Objects.requireNonNull(in, "String to read from cannot be null"); parser.reset(); reorderBuffers.clear(); currentReorderBuffer = null; this.in = new JsonFactory().createParser(in); this.in.nextToken(); return this; } private void advance(Symbol symbol) throws IOException { this.parser.processTrailingImplicitActions(); if (in.getCurrentToken() == null && this.parser.depth() == 1) throw new EOFException(); parser.advance(symbol); } @Override public void readNull() throws IOException { advance(Symbol.NULL); if (in.getCurrentToken() == JsonToken.VALUE_NULL) { in.nextToken(); } else { throw error("null"); } } @Override public boolean readBoolean() throws IOException { advance(Symbol.BOOLEAN); JsonToken t = in.getCurrentToken(); if (t == JsonToken.VALUE_TRUE || t == JsonToken.VALUE_FALSE) { in.nextToken(); return t == JsonToken.VALUE_TRUE; } else { throw error("boolean"); } } @Override public int readInt() throws IOException { advance(Symbol.INT); if (in.getCurrentToken().isNumeric()) { int result = in.getIntValue(); in.nextToken(); return result; } else { throw error("int"); } } @Override public long readLong() throws IOException { advance(Symbol.LONG); if (in.getCurrentToken().isNumeric()) { long result = in.getLongValue(); in.nextToken(); return result; } else { throw error("long"); } } @Override public float readFloat() throws IOException { advance(Symbol.FLOAT); if (in.getCurrentToken().isNumeric()) { float result = in.getFloatValue(); in.nextToken(); return result; } else { throw error("float"); } } @Override public double readDouble() throws IOException { advance(Symbol.DOUBLE); if (in.getCurrentToken().isNumeric()) { double result = in.getDoubleValue(); in.nextToken(); return result; } else { throw error("double"); } } @Override public Utf8 readString(Utf8 old) throws IOException { return new Utf8(readString()); } @Override public String readString() throws IOException { advance(Symbol.STRING); if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) { parser.advance(Symbol.MAP_KEY_MARKER); if (in.getCurrentToken() != JsonToken.FIELD_NAME) { throw error("map-key"); } } else { if (in.getCurrentToken() != JsonToken.VALUE_STRING) { throw error("string"); } } String result = in.getText(); in.nextToken(); return result; } @Override public void skipString() throws IOException { advance(Symbol.STRING); if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) { parser.advance(Symbol.MAP_KEY_MARKER); if (in.getCurrentToken() != JsonToken.FIELD_NAME) { throw error("map-key"); } } else { if (in.getCurrentToken() != JsonToken.VALUE_STRING) { throw error("string"); } } in.nextToken(); } @Override public ByteBuffer readBytes(ByteBuffer old) throws IOException { advance(Symbol.BYTES); if (in.getCurrentToken() == JsonToken.VALUE_STRING) { byte[] result = readByteArray(); in.nextToken(); return ByteBuffer.wrap(result); } else { throw error("bytes"); } } private byte[] readByteArray() throws IOException { byte[] result = in.getText().getBytes(StandardCharsets.ISO_8859_1); return result; } @Override public void skipBytes() throws IOException { advance(Symbol.BYTES); if (in.getCurrentToken() == JsonToken.VALUE_STRING) { in.nextToken(); } else { throw error("bytes"); } } private void checkFixed(int size) throws IOException { advance(Symbol.FIXED); Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); if (size != top.size) { throw new AvroTypeException( "Incorrect length for fixed binary: expected " + top.size + " but received " + size + " bytes."); } } @Override public void readFixed(byte[] bytes, int start, int len) throws IOException { checkFixed(len); if (in.getCurrentToken() == JsonToken.VALUE_STRING) { byte[] result = readByteArray(); in.nextToken(); if (result.length != len) { throw new AvroTypeException("Expected fixed length " + len + ", but got" + result.length); } System.arraycopy(result, 0, bytes, start, len); } else { throw error("fixed"); } } @Override public void skipFixed(int length) throws IOException { checkFixed(length); doSkipFixed(length); } private void doSkipFixed(int length) throws IOException { if (in.getCurrentToken() == JsonToken.VALUE_STRING) { byte[] result = readByteArray(); in.nextToken(); if (result.length != length) { throw new AvroTypeException("Expected fixed length " + length + ", but got" + result.length); } } else { throw error("fixed"); } } @Override protected void skipFixed() throws IOException { advance(Symbol.FIXED); Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); doSkipFixed(top.size); } @Override public int readEnum() throws IOException { advance(Symbol.ENUM); Symbol.EnumLabelsAction top = (Symbol.EnumLabelsAction) parser.popSymbol(); if (in.getCurrentToken() == JsonToken.VALUE_STRING) { in.getText(); int n = top.findLabel(in.getText()); if (n >= 0) { in.nextToken(); return n; } throw new AvroTypeException("Unknown symbol in enum " + in.getText()); } else { throw error("fixed"); } } @Override public long readArrayStart() throws IOException { advance(Symbol.ARRAY_START); if (in.getCurrentToken() == JsonToken.START_ARRAY) { in.nextToken(); return doArrayNext(); } else { throw error("array-start"); } } @Override public long arrayNext() throws IOException { advance(Symbol.ITEM_END); return doArrayNext(); } private long doArrayNext() throws IOException { if (in.getCurrentToken() == JsonToken.END_ARRAY) { parser.advance(Symbol.ARRAY_END); in.nextToken(); return 0; } else { return 1; } } @Override public long skipArray() throws IOException { advance(Symbol.ARRAY_START); if (in.getCurrentToken() == JsonToken.START_ARRAY) { in.skipChildren(); in.nextToken(); advance(Symbol.ARRAY_END); } else { throw error("array-start"); } return 0; } @Override public long readMapStart() throws IOException { advance(Symbol.MAP_START); if (in.getCurrentToken() == JsonToken.START_OBJECT) { in.nextToken(); return doMapNext(); } else { throw error("map-start"); } } @Override public long mapNext() throws IOException { advance(Symbol.ITEM_END); return doMapNext(); } private long doMapNext() throws IOException { if (in.getCurrentToken() == JsonToken.END_OBJECT) { in.nextToken(); advance(Symbol.MAP_END); return 0; } else { return 1; } } @Override public long skipMap() throws IOException { advance(Symbol.MAP_START); if (in.getCurrentToken() == JsonToken.START_OBJECT) { in.skipChildren(); in.nextToken(); advance(Symbol.MAP_END); } else { throw error("map-start"); } return 0; } @Override public int readIndex() throws IOException { advance(Symbol.UNION); Symbol.Alternative a = (Symbol.Alternative) parser.popSymbol(); String label; if (in.getCurrentToken() == JsonToken.VALUE_NULL) { label = "null"; } else if (in.getCurrentToken() == JsonToken.START_OBJECT && in.nextToken() == JsonToken.FIELD_NAME) { label = in.getText(); in.nextToken(); parser.pushSymbol(Symbol.UNION_END); } else { throw error("start-union"); } int n = a.findLabel(label); if (n < 0) throw new AvroTypeException("Unknown union branch " + label); parser.pushSymbol(a.getSymbol(n)); return n; } @Override public Symbol doAction(Symbol input, Symbol top) throws IOException { if (top instanceof Symbol.FieldAdjustAction) { Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top; String name = fa.fname; if (currentReorderBuffer != null) { try (TokenBuffer tokenBuffer = currentReorderBuffer.savedFields.get(name)) { if (tokenBuffer != null) { currentReorderBuffer.savedFields.remove(name); currentReorderBuffer.origParser = in; in = tokenBuffer.asParser(); in.nextToken(); return null; } } } if (in.getCurrentToken() == JsonToken.FIELD_NAME) { do { String fn = in.getText(); in.nextToken(); if (name.equals(fn) || fa.aliases.contains(fn)) { return null; } else { if (currentReorderBuffer == null) { currentReorderBuffer = new ReorderBuffer(); } try (TokenBuffer tokenBuffer = new TokenBuffer(in)) { // Moves the parser to the end of the current event e.g. END_OBJECT tokenBuffer.copyCurrentStructure(in); currentReorderBuffer.savedFields.put(fn, tokenBuffer); } in.nextToken(); } } while (in.getCurrentToken() == JsonToken.FIELD_NAME); throw new AvroTypeException("Expected field name not found: " + fa.fname); } } else if (top == Symbol.FIELD_END) { if (currentReorderBuffer != null && currentReorderBuffer.origParser != null) { in = currentReorderBuffer.origParser; currentReorderBuffer.origParser = null; } } else if (top == Symbol.RECORD_START) { if (in.getCurrentToken() == JsonToken.START_OBJECT) { in.nextToken(); reorderBuffers.push(currentReorderBuffer); currentReorderBuffer = null; } else { throw error("record-start"); } } else if (top == Symbol.RECORD_END || top == Symbol.UNION_END) { // AVRO-2034 advance to the end of our object while (in.getCurrentToken() != JsonToken.END_OBJECT) { in.nextToken(); } if (top == Symbol.RECORD_END) { if (currentReorderBuffer != null && !currentReorderBuffer.savedFields.isEmpty()) { throw error("Unknown fields: " + currentReorderBuffer.savedFields.keySet()); } currentReorderBuffer = reorderBuffers.pop(); } // AVRO-2034 advance beyond the end object for the next record. in.nextToken(); } else { throw new AvroTypeException("Unknown action symbol " + top); } return null; } private AvroTypeException error(String type) { return new AvroTypeException("Expected " + type + ". Got " + in.getCurrentToken()); } }