package com.microsoft.sqlserver.jdbc;
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenRequestContext;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
class ScopeTokenCache {
private final AtomicBoolean wip;
private AccessToken cache;
private final ReplayProcessor<AccessToken> emitterProcessor = ReplayProcessor.create(1);
private final FluxSink<AccessToken> sink = emitterProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
private final Function<TokenRequestContext, Mono<AccessToken>> getNew;
private TokenRequestContext request;
ScopeTokenCache(Function<TokenRequestContext, Mono<AccessToken>> getNew) {
this.wip = new AtomicBoolean(false);
this.getNew = getNew;
}
void setRequest(TokenRequestContext request) {
this.request = request;
}
Mono<AccessToken> getToken() {
if (cache != null && !cache.isExpired()) {
return Mono.just(cache);
}
return Mono.defer(() -> {
if (!wip.getAndSet(true)) {
return getNew.apply(request).doOnNext(ac -> cache = ac).doOnNext(sink::next).doOnError(sink::error)
.doOnTerminate(() -> wip.set(false));
} else {
return emitterProcessor.next();
}
});
}
}