package com.netflix.hystrix.metric;
import com.netflix.hystrix.HystrixCollapserKey;
import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class HystrixCollapserEventStream implements HystrixEventStream<HystrixCollapserEvent> {
private final HystrixCollapserKey collapserKey;
private final Subject<HystrixCollapserEvent, HystrixCollapserEvent> writeOnlyStream;
private final Observable<HystrixCollapserEvent> readOnlyStream;
private static final ConcurrentMap<String, HystrixCollapserEventStream> streams = new ConcurrentHashMap<String, HystrixCollapserEventStream>();
public static HystrixCollapserEventStream getInstance(HystrixCollapserKey collapserKey) {
HystrixCollapserEventStream initialStream = streams.get(collapserKey.name());
if (initialStream != null) {
return initialStream;
} else {
synchronized (HystrixCollapserEventStream.class) {
HystrixCollapserEventStream existingStream = streams.get(collapserKey.name());
if (existingStream == null) {
HystrixCollapserEventStream newStream = new HystrixCollapserEventStream(collapserKey);
streams.putIfAbsent(collapserKey.name(), newStream);
return newStream;
} else {
return existingStream;
}
}
}
}
HystrixCollapserEventStream(final HystrixCollapserKey collapserKey) {
this.collapserKey = collapserKey;
this.writeOnlyStream = new SerializedSubject<HystrixCollapserEvent, HystrixCollapserEvent>(PublishSubject.<HystrixCollapserEvent>create());
this.readOnlyStream = writeOnlyStream.share();
}
public static void reset() {
streams.clear();
}
public void write(HystrixCollapserEvent event) {
writeOnlyStream.onNext(event);
}
public Observable<HystrixCollapserEvent> observe() {
return readOnlyStream;
}
@Override
public String toString() {
return "HystrixCollapserEventStream(" + collapserKey.name() + ")";
}
}