package com.netflix.hystrix.metric;
import com.netflix.hystrix.HystrixCommandKey;
import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class HystrixCommandCompletionStream implements HystrixEventStream<HystrixCommandCompletion> {
private final HystrixCommandKey commandKey;
private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlySubject;
private final Observable<HystrixCommandCompletion> readOnlyStream;
private static final ConcurrentMap<String, HystrixCommandCompletionStream> streams = new ConcurrentHashMap<String, HystrixCommandCompletionStream>();
public static HystrixCommandCompletionStream getInstance(HystrixCommandKey commandKey) {
HystrixCommandCompletionStream initialStream = streams.get(commandKey.name());
if (initialStream != null) {
return initialStream;
} else {
synchronized (HystrixCommandCompletionStream.class) {
HystrixCommandCompletionStream existingStream = streams.get(commandKey.name());
if (existingStream == null) {
HystrixCommandCompletionStream newStream = new HystrixCommandCompletionStream(commandKey);
streams.putIfAbsent(commandKey.name(), newStream);
return newStream;
} else {
return existingStream;
}
}
}
}
HystrixCommandCompletionStream(final HystrixCommandKey commandKey) {
this.commandKey = commandKey;
this.writeOnlySubject = new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>(PublishSubject.<HystrixCommandCompletion>create());
this.readOnlyStream = writeOnlySubject.share();
}
public static void reset() {
streams.clear();
}
public void write(HystrixCommandCompletion event) {
writeOnlySubject.onNext(event);
}
@Override
public Observable<HystrixCommandCompletion> observe() {
return readOnlyStream;
}
@Override
public String toString() {
return "HystrixCommandCompletionStream(" + commandKey.name() + ")";
}
}