package org.apache.avro;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LogicalTypes {
private static final Logger LOG = LoggerFactory.getLogger(LogicalTypes.class);
public interface LogicalTypeFactory {
LogicalType fromSchema(Schema schema);
}
private static final Map<String, LogicalTypeFactory> REGISTERED_TYPES = new ConcurrentHashMap<>();
public static void register(String logicalTypeName, LogicalTypeFactory factory) {
if (logicalTypeName == null) {
throw new NullPointerException("Invalid logical type name: null");
}
if (factory == null) {
throw new NullPointerException("Invalid logical type factory: null");
}
REGISTERED_TYPES.put(logicalTypeName, factory);
}
public static LogicalType fromSchema(Schema schema) {
return fromSchemaImpl(schema, true);
}
public static LogicalType fromSchemaIgnoreInvalid(Schema schema) {
return fromSchemaImpl(schema, false);
}
private static LogicalType fromSchemaImpl(Schema schema, boolean throwErrors) {
final LogicalType logicalType;
final String typeName = schema.getProp(LogicalType.LOGICAL_TYPE_PROP);
if (typeName == null) {
return null;
}
try {
switch (typeName) {
case TIMESTAMP_MILLIS:
logicalType = TIMESTAMP_MILLIS_TYPE;
break;
case DECIMAL:
logicalType = new Decimal(schema);
break;
case UUID:
logicalType = UUID_TYPE;
break;
case DATE:
logicalType = DATE_TYPE;
break;
case TIMESTAMP_MICROS:
logicalType = TIMESTAMP_MICROS_TYPE;
break;
case TIME_MILLIS:
logicalType = TIME_MILLIS_TYPE;
break;
case TIME_MICROS:
logicalType = TIME_MICROS_TYPE;
break;
default:
final LogicalTypeFactory typeFactory = REGISTERED_TYPES.get(typeName);
if (typeFactory != null) {
logicalType = REGISTERED_TYPES.get(typeName).fromSchema(schema);
} else {
logicalType = null;
}
break;
}
if (logicalType != null) {
logicalType.validate(schema);
}
} catch (RuntimeException e) {
LOG.debug("Invalid logical type found", e);
if (throwErrors) {
throw e;
}
LOG.warn("Ignoring invalid logical type for name: {}", typeName);
return null;
}
return logicalType;
}
private static final String DECIMAL = "decimal";
private static final String UUID = "uuid";
private static final String DATE = "date";
private static final String TIME_MILLIS = "time-millis";
private static final String TIME_MICROS = "time-micros";
private static final String TIMESTAMP_MILLIS = "timestamp-millis";
private static final String TIMESTAMP_MICROS = "timestamp-micros";
public static Decimal decimal(int precision) {
return decimal(precision, 0);
}
public static Decimal decimal(int precision, int scale) {
return new Decimal(precision, scale);
}
private static final LogicalType UUID_TYPE = new LogicalType("uuid");
public static LogicalType uuid() {
return UUID_TYPE;
}
private static final Date DATE_TYPE = new Date();
public static Date date() {
return DATE_TYPE;
}
private static final TimeMillis TIME_MILLIS_TYPE = new TimeMillis();
public static TimeMillis timeMillis() {
return TIME_MILLIS_TYPE;
}
private static final TimeMicros TIME_MICROS_TYPE = new TimeMicros();
public static TimeMicros timeMicros() {
return TIME_MICROS_TYPE;
}
private static final TimestampMillis TIMESTAMP_MILLIS_TYPE = new TimestampMillis();
public static TimestampMillis timestampMillis() {
return TIMESTAMP_MILLIS_TYPE;
}
private static final TimestampMicros TIMESTAMP_MICROS_TYPE = new TimestampMicros();
public static TimestampMicros timestampMicros() {
return TIMESTAMP_MICROS_TYPE;
}
public static class Decimal extends LogicalType {
private static final String PRECISION_PROP = "precision";
private static final String SCALE_PROP = "scale";
private final int precision;
private final int scale;
private Decimal(int precision, int scale) {
super(DECIMAL);
this.precision = precision;
this.scale = scale;
}
private Decimal(Schema schema) {
super("decimal");
if (!hasProperty(schema, PRECISION_PROP)) {
throw new IllegalArgumentException("Invalid decimal: missing precision");
}
this.precision = getInt(schema, PRECISION_PROP);
if (hasProperty(schema, SCALE_PROP)) {
this.scale = getInt(schema, SCALE_PROP);
} else {
this.scale = 0;
}
}
@Override
public Schema addToSchema(Schema schema) {
super.addToSchema(schema);
schema.addProp(PRECISION_PROP, precision);
schema.addProp(SCALE_PROP, scale);
return schema;
}
public int getPrecision() {
return precision;
}
public int getScale() {
return scale;
}
@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.FIXED && schema.getType() != Schema.Type.BYTES) {
throw new IllegalArgumentException("Logical type decimal must be backed by fixed or bytes");
}
if (precision <= 0) {
throw new IllegalArgumentException("Invalid decimal precision: " + precision + " (must be positive)");
} else if (precision > maxPrecision(schema)) {
throw new IllegalArgumentException("fixed(" + schema.getFixedSize() + ") cannot store " + precision
+ " digits (max " + maxPrecision(schema) + ")");
}
if (scale < 0) {
throw new IllegalArgumentException("Invalid decimal scale: " + scale + " (must be positive)");
} else if (scale > precision) {
throw new IllegalArgumentException(
"Invalid decimal scale: " + scale + " (greater than precision: " + precision + ")");
}
}
private long maxPrecision(Schema schema) {
if (schema.getType() == Schema.Type.BYTES) {
return Integer.MAX_VALUE;
} else if (schema.getType() == Schema.Type.FIXED) {
int size = schema.getFixedSize();
return Math.round(
Math.floor(Math.log10(
Math.pow(2, 8 * size - 1) - 1)
));
} else {
return 0;
}
}
private boolean hasProperty(Schema schema, String name) {
return (schema.getObjectProp(name) != null);
}
private int getInt(Schema schema, String name) {
Object obj = schema.getObjectProp(name);
if (obj instanceof Integer) {
return (Integer) obj;
}
throw new IllegalArgumentException(
"Expected int " + name + ": " + (obj == null ? "null" : obj + ":" + obj.getClass().getSimpleName()));
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Decimal decimal = (Decimal) o;
if (precision != decimal.precision)
return false;
return scale == decimal.scale;
}
@Override
public int hashCode() {
int result = precision;
result = 31 * result + scale;
return result;
}
}
public static class Date extends LogicalType {
private Date() {
super(DATE);
}
@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.INT) {
throw new IllegalArgumentException("Date can only be used with an underlying int type");
}
}
}
public static class TimeMillis extends LogicalType {
private TimeMillis() {
super(TIME_MILLIS);
}
@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.INT) {
throw new IllegalArgumentException("Time (millis) can only be used with an underlying int type");
}
}
}
public static class TimeMicros extends LogicalType {
private TimeMicros() {
super(TIME_MICROS);
}
@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.LONG) {
throw new IllegalArgumentException("Time (micros) can only be used with an underlying long type");
}
}
}
public static class TimestampMillis extends LogicalType {
private TimestampMillis() {
super(TIMESTAMP_MILLIS);
}
@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.LONG) {
throw new IllegalArgumentException("Timestamp (millis) can only be used with an underlying long type");
}
}
}
public static class TimestampMicros extends LogicalType {
private TimestampMicros() {
super(TIMESTAMP_MICROS);
}
@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.LONG) {
throw new IllegalArgumentException("Timestamp (micros) can only be used with an underlying long type");
}
}
}
}