/*
 * Copyright (c) 2015, 2019 Oracle and/or its affiliates. All rights reserved.
 * Copyright 2010, 2013 Coda Hale and Yammer, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.glassfish.jersey.server.internal.monitoring.core;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static org.glassfish.jersey.server.internal.monitoring.core.ReservoirConstants.COLLISION_BUFFER;
import static org.glassfish.jersey.server.internal.monitoring.core.ReservoirConstants.COLLISION_BUFFER_POWER;

An abstract TimeReservoir implementation backed by a sliding window that stores only the measurements made in the last N seconds (or other startTime unit) and allows an update with data that happened in past (which is what makes it different from Dropwizard's Metrics SlidingTimeWindowReservoir.

The snapshot this reservoir returns has limitations as mentioned in TimeReservoir.

This reservoir is capable to store up to 2^ReservoirConstants.COLLISION_BUFFER_POWER, that is 256, in a granularity of nanoseconds. In other words, up to 256 values that occurred at the same nanosecond can be stored in this reservoir. For particular nanosecond, if the collision buffer exceeds, newly added values are thrown away.
Author:Stepan Vavra
Type parameters:
  • <V> – The type of values to store in this sliding window reservoir
See Also:
/** * An abstract {@link TimeReservoir} implementation backed by a sliding window that stores only the measurements made in the last * {@code N} seconds (or other startTime unit) and allows an update with data that happened in past (which is what makes it * different from Dropwizard's Metrics SlidingTimeWindowReservoir. * <p/> * The snapshot this reservoir returns has limitations as mentioned in {@link TimeReservoir}. * <p/> * This reservoir is capable to store up to 2^{@link ReservoirConstants#COLLISION_BUFFER_POWER}, that is 256, in a granularity of * nanoseconds. In other words, up to 256 values that occurred at the same nanosecond can be stored in this reservoir. For * particular nanosecond, if the collision buffer exceeds, newly added values are thrown away. * * @param <V> The type of values to store in this sliding window reservoir * @author Stepan Vavra * @see <pre><a href="https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics * /SlidingTimeWindowReservoir.java">Dropwizard's * Metrics SlidingTimeWindowReservoir</a></pre> */
public abstract class AbstractSlidingWindowTimeReservoir<V> implements TimeReservoir<V> { private final ConcurrentNavigableMap<Long, V> measurements; private final long window; private final AtomicLong greatestTick; private final AtomicLong updateCount; private final AtomicLong startTick; private final AtomicInteger trimOff; private final SlidingWindowTrimmer<V> trimmer; private final long interval; private final TimeUnit intervalUnit;
Creates a new SlidingWindowTimeReservoir with the start time and window of startTime.
Params:
  • window – The window of startTime
  • windowUnit – The unit of window
  • startTime – The start time from which this reservoir calculates measurements
  • startTimeUnit – The start time unit
/** * Creates a new {@link SlidingWindowTimeReservoir} with the start time and window of startTime. * * @param window The window of startTime * @param windowUnit The unit of {@code window} * @param startTime The start time from which this reservoir calculates measurements * @param startTimeUnit The start time unit */
public AbstractSlidingWindowTimeReservoir(final long window, final TimeUnit windowUnit, final long startTime, final TimeUnit startTimeUnit) { this(window, windowUnit, startTime, startTimeUnit, null); }
Creates a new base sliding time window reservoir with the start time and a specified time window.
Params:
  • window – The window of startTime.
  • windowUnit – The unit of window.
  • startTime – The start time from which this reservoir calculates measurements.
  • startTimeUnit – The start time unit.
  • trimmer – The trimmer to use for trimming, if null, default trimmer is used.
/** * Creates a new base sliding time window reservoir with the start time and a specified time window. * * @param window The window of startTime. * @param windowUnit The unit of {@code window}. * @param startTime The start time from which this reservoir calculates measurements. * @param startTimeUnit The start time unit. * @param trimmer The trimmer to use for trimming, if {@code null}, default trimmer is used. */
@SuppressWarnings("unchecked") public AbstractSlidingWindowTimeReservoir(final long window, final TimeUnit windowUnit, final long startTime, final TimeUnit startTimeUnit, final SlidingWindowTrimmer<V> trimmer) { this.trimmer = trimmer != null ? trimmer : (SlidingWindowTrimmer<V>) DefaultSlidingWindowTrimmerHolder.INSTANCE; this.measurements = new ConcurrentSkipListMap<>(); this.interval = window; this.intervalUnit = windowUnit; this.window = windowUnit.toNanos(window) << COLLISION_BUFFER_POWER; this.startTick = new AtomicLong(tick(startTime, startTimeUnit)); this.greatestTick = new AtomicLong(startTick.get()); this.updateCount = new AtomicLong(0); this.trimOff = new AtomicInteger(0); this.trimmer.setTimeReservoir(this); } @Override public int size(long time, TimeUnit timeUnit) { conditionallyUpdateGreatestTick(tick(time, timeUnit)); trim(); return measurements.size(); } @Override public void update(V value, long time, TimeUnit timeUnit) { if (updateCount.incrementAndGet() % ReservoirConstants.TRIM_THRESHOLD == 0) { trim(); } long tick = tick(time, timeUnit); for (int i = 0; i < COLLISION_BUFFER; ++i) { if (measurements.putIfAbsent(tick, value) == null) { conditionallyUpdateGreatestTick(tick); return; } // increase the tick, there should be up to COLLISION_BUFFER empty slots // where to put the value for given 'time' // if empty slot is not found, throw it away as we're getting inaccurate statistics anyway tick++; } } @Override public long interval(final TimeUnit timeUnit) { return timeUnit.convert(interval, intervalUnit); } private long conditionallyUpdateGreatestTick(final long tick) { while (true) { final long currentGreatestTick = greatestTick.get(); if (tick <= currentGreatestTick) { // the tick is too small, return the greatest one return currentGreatestTick; } if (greatestTick.compareAndSet(currentGreatestTick, tick)) { // successfully updated greatestTick with the tick return tick; } } }
Updates the startTick in case that the sliding window was created AFTER the time of a value that updated this window.
Params:
  • firstEntry – The first entry of the windowed measurments
/** * Updates the startTick in case that the sliding window was created AFTER the time of a value that updated this window. * * @param firstEntry The first entry of the windowed measurments */
private void conditionallyUpdateStartTick(final Map.Entry<Long, V> firstEntry) { final Long firstEntryKey = firstEntry != null ? firstEntry.getKey() : null; if (firstEntryKey != null && firstEntryKey < startTick.get()) { while (true) { final long expectedStartTick = startTick.get(); if (startTick.compareAndSet(expectedStartTick, firstEntryKey)) { return; } } } }
Subclasses are required to instantiate UniformTimeSnapshot on their own.
Params:
  • values – The values to create the snapshot from
  • timeInterval – The time interval this snapshot conforms to
  • timeIntervalUnit – The interval unit of the time interval
  • time – The time of the request of the snapshot
  • timeUnit – The unit of the time of the snapshot request
Returns:The snapshot
/** * Subclasses are required to instantiate {@link UniformTimeSnapshot} on their own. * * @param values The values to create the snapshot from * @param timeInterval The time interval this snapshot conforms to * @param timeIntervalUnit The interval unit of the time interval * @param time The time of the request of the snapshot * @param timeUnit The unit of the time of the snapshot request * @return The snapshot */
protected abstract UniformTimeSnapshot snapshot(final Collection<V> values, final long timeInterval, final TimeUnit timeIntervalUnit, final long time, final TimeUnit timeUnit); @Override public UniformTimeSnapshot getSnapshot(long time, TimeUnit timeUnit) { trimOff.incrementAndGet(); final long baselineTick = conditionallyUpdateGreatestTick(tick(time, timeUnit)); try { // now, with the 'baselineTick' we can be sure that no trim will be performed // we just cannot guarantee that 'time' will correspond with the 'baselineTick' which is what the API warns about final ConcurrentNavigableMap<Long, V> windowMap = measurements .subMap((roundTick(baselineTick)) - window, true, baselineTick, true); // if the first update came with value lower that the 'startTick' we need to extend the window size so that the // calculation depending on the actual measured interval is not unnecessary boosted conditionallyUpdateStartTick(windowMap.firstEntry()); // calculate the actual measured interval final long measuredTickInterval = Math.min(baselineTick - startTick.get(), window); return snapshot(windowMap.values(), measuredTickInterval >> COLLISION_BUFFER_POWER, TimeUnit.NANOSECONDS, time, timeUnit); } finally { trimOff.decrementAndGet(); trim(baselineTick); } } private long tick(long time, TimeUnit timeUnit) { return timeUnit.toNanos(time) << COLLISION_BUFFER_POWER; } private void trim() { trim(greatestTick.get()); } private void trim(final long baselineTick) { if (trimEnabled()) { final long key = roundTick(baselineTick) - window; trimmer.trim(measurements, key); } } private boolean trimEnabled() { return trimOff.get() == 0; }
The purpose of this method is to deal with the fact that data for the same nanosecond can be distributed in an interval [0,256). By rounding the tick, we get the tick to which all the other ticks from the same interval belong.
Params:
  • tick – The tick
Returns:The rounded tick
/** * The purpose of this method is to deal with the fact that data for the same nanosecond can be distributed in an interval * [0,256). By rounding the tick, we get the tick to which all the other ticks from the same interval belong. * * @param tick The tick * @return The rounded tick */
private long roundTick(final long tick) { // tick / COLLISION_BUFFER * COLLISION_BUFFER return tick >> COLLISION_BUFFER_POWER << COLLISION_BUFFER_POWER; }
The holder of the lazy loaded instance of the default trimmer.
/** * The holder of the lazy loaded instance of the default trimmer. */
private static final class DefaultSlidingWindowTrimmerHolder {
The default instance of sliding window trimmer.
/** * The default instance of sliding window trimmer. */
static final SlidingWindowTrimmer<Object> INSTANCE = new SlidingWindowTrimmer<Object>() { @Override public void trim(final ConcurrentNavigableMap<Long, Object> map, final long key) { map.headMap(key).clear(); } @Override public void setTimeReservoir(final TimeReservoir<Object> reservoir) { // not used } }; } }