package org.jboss.resteasy.plugins.providers.sse.client;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
class SseEventSourceScheduler
{
private static class DaemonThreadFactory implements ThreadFactory
{
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DaemonThreadFactory(final String name)
{
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = name + "-thread-";
}
@Override
public Thread newThread(Runnable r)
{
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
t.setDaemon(true);
return t;
}
}
private final ScheduledExecutorService scheduledExecutorService;
private final boolean shutdownExecutorService;
private final Phaser phaser;
private final AtomicBoolean closed;
SseEventSourceScheduler(final ScheduledExecutorService scheduledExecutorService, final String threadName)
{
this.scheduledExecutorService = scheduledExecutorService == null
? Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory(threadName))
: scheduledExecutorService;
this.shutdownExecutorService = scheduledExecutorService == null;
this.phaser = new Phaser(1);
this.closed = new AtomicBoolean(false);
}
void schedule(final Runnable runnable, long delay, TimeUnit unit) throws RejectedExecutionException
{
if (this.closed.get())
{
return;
}
try
{
this.scheduledExecutorService.schedule(new Runnable()
{
@Override
public void run()
{
if (SseEventSourceScheduler.this.closed.get())
{
return;
}
int registrationPhase = SseEventSourceScheduler.this.phaser.register();
try
{
if (registrationPhase != 0)
{
return;
}
runnable.run();
}
finally
{
SseEventSourceScheduler.this.phaser.arriveAndDeregister();
}
}
}, delay, unit);
}
catch (RejectedExecutionException e)
{
if (this.shutdownExecutorService && this.closed.get())
{
return;
}
throw e;
}
}
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
{
if (!this.closed.get())
{
return false;
}
try
{
this.phaser.awaitAdvanceInterruptibly(0, timeout, unit);
}
catch (TimeoutException e)
{
return false;
}
return true;
}
void shutdownNow()
{
if (this.closed.compareAndSet(false, true))
{
this.phaser.arriveAndDeregister();
if (this.shutdownExecutorService)
{
this.scheduledExecutorService.shutdownNow();
}
}
}
}