/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.avro.message;

import org.apache.avro.Schema;
import org.apache.avro.SchemaNormalization;
import org.apache.avro.generic.GenericData;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

A MessageDecoder that reads a binary-encoded datum. This checks for the datum header and decodes the payload with the schema that corresponds to the 8-byte schema fingerprint.

Instances can decode message payloads for known schemas, which are schemas added using addSchema(Schema), schemas resolved by the SchemaStore passed to the constructor, or the expected schema passed to the constructor. Messages encoded using an unknown schema will cause instances to throw a MissingSchemaException.

It is safe to continue using instances of this class after BaseDecoder.decode throws BadHeaderException or MissingSchemaException.

This class is thread-safe.

/** * A {@link MessageDecoder} that reads a binary-encoded datum. This checks for * the datum header and decodes the payload with the schema that corresponds to * the 8-byte schema fingerprint. * <p> * Instances can decode message payloads for known {@link Schema schemas}, which * are schemas added using {@link #addSchema(Schema)}, schemas resolved by the * {@link SchemaStore} passed to the constructor, or the expected schema passed * to the constructor. Messages encoded using an unknown schema will cause * instances to throw a {@link MissingSchemaException}. * <p> * It is safe to continue using instances of this class after {@link #decode} * throws {@link BadHeaderException} or {@link MissingSchemaException}. * <p> * This class is thread-safe. */
public class BinaryMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> { private static final ThreadLocal<byte[]> HEADER_BUFFER = ThreadLocal.withInitial(() -> new byte[10]); private static final ThreadLocal<ByteBuffer> FP_BUFFER = ThreadLocal.withInitial(() -> { byte[] header = HEADER_BUFFER.get(); return ByteBuffer.wrap(header).order(ByteOrder.LITTLE_ENDIAN); }); private final GenericData model; private final Schema readSchema; private final SchemaStore resolver; private final Map<Long, RawMessageDecoder<D>> codecByFingerprint = new ConcurrentHashMap<>();
Creates a new BinaryMessageEncoder that uses the given data model to construct datum instances described by the schema.

The readSchema is as used the expected schema (read schema). Datum instances created by this class will be described by the expected schema.

If readSchema is null, the write schema of an incoming buffer is used as read schema for that datum instance.

The schema used to decode incoming buffers is determined by the schema fingerprint encoded in the message header. This class can decode messages that were encoded using the readSchema (if any) and other schemas that are added using addSchema(Schema).

Params:
  • model – the data model for datum instances
  • readSchema – the Schema used to construct datum instances
/** * Creates a new {@link BinaryMessageEncoder} that uses the given * {@link GenericData data model} to construct datum instances described by the * {@link Schema schema}. * <p> * The {@code readSchema} is as used the expected schema (read schema). Datum * instances created by this class will be described by the expected schema. * <p> * If {@code readSchema} is {@code null}, the write schema of an incoming buffer * is used as read schema for that datum instance. * <p> * The schema used to decode incoming buffers is determined by the schema * fingerprint encoded in the message header. This class can decode messages * that were encoded using the {@code readSchema} (if any) and other schemas * that are added using {@link #addSchema(Schema)}. * * @param model the {@link GenericData data model} for datum instances * @param readSchema the {@link Schema} used to construct datum instances */
public BinaryMessageDecoder(GenericData model, Schema readSchema) { this(model, readSchema, null); }
Creates a new BinaryMessageEncoder that uses the given data model to construct datum instances described by the schema.

The readSchema is used as the expected schema (read schema). Datum instances created by this class will be described by the expected schema.

If readSchema is null, the write schema of an incoming buffer is used as read schema for that datum instance.

The schema used to decode incoming buffers is determined by the schema fingerprint encoded in the message header. This class can decode messages that were encoded using the readSchema (if any), other schemas that are added using addSchema(Schema), or schemas returned by the resolver.

Params:
  • model – the data model for datum instances
  • readSchema – the Schema used to construct datum instances
  • resolver – a SchemaStore used to find schemas by fingerprint
/** * Creates a new {@link BinaryMessageEncoder} that uses the given * {@link GenericData data model} to construct datum instances described by the * {@link Schema schema}. * <p> * The {@code readSchema} is used as the expected schema (read schema). Datum * instances created by this class will be described by the expected schema. * <p> * If {@code readSchema} is {@code null}, the write schema of an incoming buffer * is used as read schema for that datum instance. * <p> * The schema used to decode incoming buffers is determined by the schema * fingerprint encoded in the message header. This class can decode messages * that were encoded using the {@code readSchema} (if any), other schemas that * are added using {@link #addSchema(Schema)}, or schemas returned by the * {@code resolver}. * * @param model the {@link GenericData data model} for datum instances * @param readSchema the {@link Schema} used to construct datum instances * @param resolver a {@link SchemaStore} used to find schemas by fingerprint */
public BinaryMessageDecoder(GenericData model, Schema readSchema, SchemaStore resolver) { this.model = model; this.readSchema = readSchema; this.resolver = resolver; if (readSchema != null) { addSchema(readSchema); } }
Adds a Schema that can be used to decode buffers.
Params:
  • writeSchema – a Schema to use when decoding buffers
/** * Adds a {@link Schema} that can be used to decode buffers. * * @param writeSchema a {@link Schema} to use when decoding buffers */
public void addSchema(Schema writeSchema) { long fp = SchemaNormalization.parsingFingerprint64(writeSchema); final Schema actualReadSchema = this.readSchema != null ? this.readSchema : writeSchema; codecByFingerprint.put(fp, new RawMessageDecoder<D>(model, writeSchema, actualReadSchema)); } private RawMessageDecoder<D> getDecoder(long fp) { RawMessageDecoder<D> decoder = codecByFingerprint.get(fp); if (decoder != null) { return decoder; } if (resolver != null) { Schema writeSchema = resolver.findByFingerprint(fp); if (writeSchema != null) { addSchema(writeSchema); return codecByFingerprint.get(fp); } } throw new MissingSchemaException("Cannot resolve schema for fingerprint: " + fp); } @Override public D decode(InputStream stream, D reuse) throws IOException { byte[] header = HEADER_BUFFER.get(); try { if (!readFully(stream, header)) { throw new BadHeaderException("Not enough header bytes"); } } catch (IOException e) { throw new IOException("Failed to read header and fingerprint bytes", e); } if (BinaryMessageEncoder.V1_HEADER[0] != header[0] || BinaryMessageEncoder.V1_HEADER[1] != header[1]) { throw new BadHeaderException(String.format("Unrecognized header bytes: 0x%02X 0x%02X", header[0], header[1])); } RawMessageDecoder<D> decoder = getDecoder(FP_BUFFER.get().getLong(2)); return decoder.decode(stream, reuse); }
Reads a buffer from a stream, making multiple read calls if necessary.
Params:
  • stream – an InputStream to read from
  • bytes – a buffer
Throws:
Returns:true if the buffer is complete, false otherwise (stream ended)
/** * Reads a buffer from a stream, making multiple read calls if necessary. * * @param stream an InputStream to read from * @param bytes a buffer * @return true if the buffer is complete, false otherwise (stream ended) * @throws IOException */
private boolean readFully(InputStream stream, byte[] bytes) throws IOException { int pos = 0; int bytesRead; while ((bytes.length - pos) > 0 && (bytesRead = stream.read(bytes, pos, bytes.length - pos)) > 0) { pos += bytesRead; } return (pos == bytes.length); } }