package reactor.core.scheduler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.jetbrains.annotations.NotNull;
import reactor.util.annotation.Nullable;
class ReactorThreadFactory implements ThreadFactory,
Supplier<String>,
Thread.UncaughtExceptionHandler {
final private String name;
final private AtomicLong counterReference;
final private boolean daemon;
final private boolean rejectBlocking;
@Nullable
final private BiConsumer<Thread, Throwable> uncaughtExceptionHandler;
ReactorThreadFactory(String name,
AtomicLong counterReference,
boolean daemon,
boolean rejectBlocking,
@Nullable BiConsumer<Thread, Throwable> uncaughtExceptionHandler) {
this.name = name;
this.counterReference = counterReference;
this.daemon = daemon;
this.rejectBlocking = rejectBlocking;
this.uncaughtExceptionHandler = uncaughtExceptionHandler;
}
@Override
public final Thread newThread(@NotNull Runnable runnable) {
String newThreadName = name + "-" + counterReference.incrementAndGet();
Thread t = rejectBlocking
? new NonBlockingThread(runnable, newThreadName)
: new Thread(runnable, newThreadName);
if (daemon) {
t.setDaemon(true);
}
if (uncaughtExceptionHandler != null) {
t.setUncaughtExceptionHandler(this);
}
return t;
}
@Override
public void uncaughtException(Thread t, Throwable e) {
if (uncaughtExceptionHandler == null) {
return;
}
uncaughtExceptionHandler.accept(t,e);
}
@Override
public final String get() {
return name;
}
static final class NonBlockingThread extends Thread implements NonBlocking {
public NonBlockingThread(Runnable target, String name) {
super(target, name);
}
}
}