/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Stream;
import reactor.util.annotation.Nullable;
A support class that offers factory methods for implementations of the specialized Disposable
sub-interfaces (Disposable.Composite
, Disposable.Swap
). Author: Simon Baslé, Stephane Maldini
/**
* A support class that offers factory methods for implementations of the specialized
* {@link Disposable} sub-interfaces ({@link Disposable.Composite Disposable.Composite},
* {@link Disposable.Swap Disposable.Swap}).
*
* @author Simon Baslé
* @author Stephane Maldini
*/
public final class Disposables {
private Disposables() { }
Create a new empty Composite
with atomic guarantees on all mutative operations. Returns: an empty atomic Composite
/**
* Create a new empty {@link Disposable.Composite} with atomic guarantees on all mutative
* operations.
*
* @return an empty atomic {@link Disposable.Composite}
*/
public static Disposable.Composite composite() {
return new ListCompositeDisposable();
}
Create and initialize a new Composite
with atomic guarantees on all mutative operations. Returns: a pre-filled atomic Composite
/**
* Create and initialize a new {@link Disposable.Composite} with atomic guarantees on
* all mutative operations.
*
* @return a pre-filled atomic {@link Disposable.Composite}
*/
public static Disposable.Composite composite(Disposable... disposables) {
return new ListCompositeDisposable(disposables);
}
Create and initialize a new Composite
with atomic guarantees on all mutative operations. Returns: a pre-filled atomic Composite
/**
* Create and initialize a new {@link Disposable.Composite} with atomic guarantees on
* all mutative operations.
*
* @return a pre-filled atomic {@link Disposable.Composite}
*/
public static Disposable.Composite composite(
Iterable<? extends Disposable> disposables) {
return new ListCompositeDisposable(disposables);
}
Return a new Disposable
that is already disposed. Returns: a new disposed Disposable
.
/**
* Return a new {@link Disposable} that is already disposed.
*
* @return a new disposed {@link Disposable}.
*/
public static Disposable disposed() {
return new AlwaysDisposable();
}
Return a new Disposable
that can never be disposed. Calling Disposable.dispose()
is a NO-OP and Disposable.isDisposed()
always return false. Returns: a new Disposable
that can never be disposed.
/**
* Return a new {@link Disposable} that can never be disposed. Calling {@link Disposable#dispose()}
* is a NO-OP and {@link Disposable#isDisposed()} always return false.
*
* @return a new {@link Disposable} that can never be disposed.
*/
public static Disposable never() {
return new NeverDisposable();
}
Return a new simple Disposable
instance that is initially not disposed but can be by calling Disposable.dispose()
. Returns: a new Disposable
initially not yet disposed.
/**
* Return a new simple {@link Disposable} instance that is initially not disposed but
* can be by calling {@link Disposable#dispose()}.
*
* @return a new {@link Disposable} initially not yet disposed.
*/
public static Disposable single() {
return new SimpleDisposable();
}
Create a new empty Swap
with atomic guarantees on all mutative operations. Returns: an empty atomic Swap
/**
* Create a new empty {@link Disposable.Swap} with atomic guarantees on all mutative
* operations.
*
* @return an empty atomic {@link Disposable.Swap}
*/
public static Disposable.Swap swap() {
return new SwapDisposable();
}
//==== STATIC package private implementations ====
Author: David Karnok, Simon Baslé
/**
* @author David Karnok
* @author Simon Baslé
*/
static final class ListCompositeDisposable implements Disposable.Composite, Scannable {
@Nullable
List<Disposable> resources;
volatile boolean disposed;
ListCompositeDisposable() {
}
ListCompositeDisposable(Disposable... resources) {
Objects.requireNonNull(resources, "resources is null");
this.resources = new LinkedList<>();
for (Disposable d : resources) {
Objects.requireNonNull(d, "Disposable item is null");
this.resources.add(d);
}
}
ListCompositeDisposable(Iterable<? extends Disposable> resources) {
Objects.requireNonNull(resources, "resources is null");
this.resources = new LinkedList<>();
for (Disposable d : resources) {
Objects.requireNonNull(d, "Disposable item is null");
this.resources.add(d);
}
}
@Override
public void dispose() {
if (disposed) {
return;
}
List<Disposable> set;
synchronized (this) {
if (disposed) {
return;
}
disposed = true;
set = resources;
resources = null;
}
dispose(set);
}
@Override
public boolean isDisposed() {
return disposed;
}
@Override
public boolean add(Disposable d) {
Objects.requireNonNull(d, "d is null");
if (!disposed) {
synchronized (this) {
if (!disposed) {
List<Disposable> set = resources;
if (set == null) {
set = new LinkedList<>();
resources = set;
}
set.add(d);
return true;
}
}
}
d.dispose();
return false;
}
@Override
public boolean addAll(Collection<? extends Disposable> ds) {
Objects.requireNonNull(ds, "ds is null");
if (!disposed) {
synchronized (this) {
if (!disposed) {
List<Disposable> set = resources;
if (set == null) {
set = new LinkedList<>();
resources = set;
}
for (Disposable d : ds) {
Objects.requireNonNull(d, "d is null");
set.add(d);
}
return true;
}
}
}
for (Disposable d : ds) {
d.dispose();
}
return false;
}
@Override
public boolean remove(Disposable d) {
Objects.requireNonNull(d, "Disposable item is null");
if (disposed) {
return false;
}
synchronized (this) {
if (disposed) {
return false;
}
List<Disposable> set = resources;
if (set == null || !set.remove(d)) {
return false;
}
}
return true;
}
@Override
public int size() {
List<Disposable> r = resources;
return r == null ? 0 : r.size();
}
Stream<Disposable> asStream() {
List<Disposable> r = resources;
return r == null ? Stream.empty() : r.stream();
}
public void clear() {
if (disposed) {
return;
}
List<Disposable> set;
synchronized (this) {
if (disposed) {
return;
}
set = resources;
resources = null;
}
dispose(set);
}
void dispose(@Nullable List<Disposable> set) {
if (set == null) {
return;
}
List<Throwable> errors = null;
for (Disposable o : set) {
try {
o.dispose();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
if (errors == null) {
errors = new ArrayList<>();
}
errors.add(ex);
}
}
if (errors != null) {
if (errors.size() == 1) {
throw Exceptions.propagate(errors.get(0));
}
throw Exceptions.multiple(errors);
}
}
@Override
public Stream<? extends Scannable> inners() {
return this.asStream()
.filter(Scannable.class::isInstance)
.map(Scannable::from);
}
@Nullable
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.CANCELLED) {
return isDisposed();
}
return null;
}
}
Not public API. Implementation of a Swap
. Author: Simon Baslé, David Karnok
/**
* Not public API. Implementation of a {@link Swap}.
*
* @author Simon Baslé
* @author David Karnok
*/
static final class SwapDisposable implements Disposable.Swap {
volatile Disposable inner;
static final AtomicReferenceFieldUpdater<SwapDisposable, Disposable>
INNER =
AtomicReferenceFieldUpdater.newUpdater(SwapDisposable.class, Disposable.class, "inner");
@Override
public boolean update(@Nullable Disposable next) {
return Disposables.set(INNER, this, next);
}
@Override
public boolean replace(@Nullable Disposable next) {
return Disposables.replace(INNER, this, next);
}
@Override
@Nullable
public Disposable get() {
return inner;
}
@Override
public void dispose() {
Disposables.dispose(INNER, this);
}
@Override
public boolean isDisposed() {
return Disposables.isDisposed(INNER.get(this));
}
}
A very simple Disposable
that only wraps a mutable boolean for isDisposed()
. /**
* A very simple {@link Disposable} that only wraps a mutable boolean for
* {@link #isDisposed()}.
*/
static final class SimpleDisposable extends AtomicBoolean implements Disposable {
@Override
public void dispose() {
set(true);
}
@Override
public boolean isDisposed() {
return get();
}
}
Immutable disposable that is always disposed
. Calling dispose()
does nothing, and isDisposed()
always return true. /**
* Immutable disposable that is always {@link #isDisposed() disposed}. Calling
* {@link #dispose()} does nothing, and {@link #isDisposed()} always return true.
*/
static final class AlwaysDisposable implements Disposable {
@Override
public void dispose() {
//NO-OP
}
@Override
public boolean isDisposed() {
return true;
}
}
Immutable disposable that is never disposed
. Calling dispose()
does nothing, and isDisposed()
always return false. /**
* Immutable disposable that is never {@link #isDisposed() disposed}. Calling
* {@link #dispose()} does nothing, and {@link #isDisposed()} always return false.
*/
static final class NeverDisposable implements Disposable {
@Override
public void dispose() {
//NO-OP
}
@Override
public boolean isDisposed() {
return false;
}
}
//==== STATIC ATOMIC UTILS copied from Disposables ====
A singleton Disposable
that represents a disposed instance. Should not be leaked to clients. /**
* A singleton {@link Disposable} that represents a disposed instance. Should not be
* leaked to clients.
*/
//NOTE: There is a private similar DISPOSED singleton in Disposables as well
static final Disposable DISPOSED = disposed();
Atomically set the field to a Disposable
and dispose the old content. Params: - updater – the target field updater
- holder – the target instance holding the field
- newValue – the new Disposable to set
Returns: true if successful, false if the field contains the DISPOSED
instance.
/**
* Atomically set the field to a {@link Disposable} and dispose the old content.
*
* @param updater the target field updater
* @param holder the target instance holding the field
* @param newValue the new Disposable to set
* @return true if successful, false if the field contains the {@link #DISPOSED} instance.
*/
static <T> boolean set(AtomicReferenceFieldUpdater<T, Disposable> updater, T holder, @Nullable Disposable newValue) {
for (;;) {
Disposable current = updater.get(holder);
if (current == DISPOSED) {
if (newValue != null) {
newValue.dispose();
}
return false;
}
if (updater.compareAndSet(holder, current, newValue)) {
if (current != null) {
current.dispose();
}
return true;
}
}
}
Atomically replace the Disposable
in the field with the given new Disposable but do not dispose the old one. Params: - updater – the target field updater
- holder – the target instance holding the field
- newValue – the new Disposable to set, null allowed
Returns: true if the operation succeeded, false if the target field contained the common DISPOSED
instance and the given disposable is not null but is disposed.
/**
* Atomically replace the {@link Disposable} in the field with the given new Disposable
* but do not dispose the old one.
*
* @param updater the target field updater
* @param holder the target instance holding the field
* @param newValue the new Disposable to set, null allowed
* @return true if the operation succeeded, false if the target field contained
* the common {@link #DISPOSED} instance and the given disposable is not null but is disposed.
*/
static <T> boolean replace(AtomicReferenceFieldUpdater<T, Disposable> updater, T holder, @Nullable Disposable newValue) {
for (;;) {
Disposable current = updater.get(holder);
if (current == DISPOSED) {
if (newValue != null) {
newValue.dispose();
}
return false;
}
if (updater.compareAndSet(holder, current, newValue)) {
return true;
}
}
}
Atomically dispose the Disposable
in the field if not already disposed. Params: - updater – the target field updater
- holder – the target instance holding the field
Returns: true if the Disposable
held by the field was properly disposed
/**
* Atomically dispose the {@link Disposable} in the field if not already disposed.
*
* @param updater the target field updater
* @param holder the target instance holding the field
* @return true if the {@link Disposable} held by the field was properly disposed
*/
static <T> boolean dispose(AtomicReferenceFieldUpdater<T, Disposable> updater, T holder) {
Disposable current = updater.get(holder);
Disposable d = DISPOSED;
if (current != d) {
current = updater.getAndSet(holder, d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
Check if the given Disposable
is the singleton DISPOSED
. Params: - d – the disposable to check
Returns: true if d is DISPOSED
/**
* Check if the given {@link Disposable} is the singleton {@link #DISPOSED}.
*
* @param d the disposable to check
* @return true if d is {@link #DISPOSED}
*/
static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
//=====================================================
}