package org.glassfish.grizzly;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.glassfish.grizzly.attributes.AttributeBuilder;
import org.glassfish.grizzly.memory.MemoryManager;
import org.glassfish.grizzly.monitoring.MonitoringAware;
import org.glassfish.grizzly.monitoring.MonitoringConfig;
import org.glassfish.grizzly.monitoring.DefaultMonitoringConfig;
import org.glassfish.grizzly.threadpool.ThreadPoolConfig;
import org.glassfish.grizzly.threadpool.ThreadPoolProbe;
import org.glassfish.grizzly.utils.StateHolder;
public abstract class AbstractTransport implements Transport {
protected String name;
protected volatile boolean isBlocking;
protected volatile boolean isStandalone;
protected final StateHolder<State> state;
protected Processor processor;
protected ProcessorSelector processorSelector;
protected IOStrategy strategy;
protected MemoryManager memoryManager;
protected ExecutorService workerThreadPool;
protected ExecutorService kernelPool;
protected AttributeBuilder attributeBuilder;
protected int readBufferSize;
protected int writeBufferSize;
protected ThreadPoolConfig workerPoolConfig;
protected ThreadPoolConfig kernelPoolConfig;
protected boolean managedWorkerPool = true;
protected long writeTimeout = TimeUnit.MILLISECONDS.convert(DEFAULT_WRITE_TIMEOUT, TimeUnit.SECONDS);
protected long readTimeout = TimeUnit.MILLISECONDS.convert(DEFAULT_READ_TIMEOUT, TimeUnit.SECONDS);
protected final DefaultMonitoringConfig<TransportProbe> transportMonitoringConfig =
new DefaultMonitoringConfig<TransportProbe>(TransportProbe.class) {
@Override
public Object createManagementObject() {
return createJmxManagementObject();
}
};
protected final DefaultMonitoringConfig<ConnectionProbe> connectionMonitoringConfig =
new DefaultMonitoringConfig<ConnectionProbe>(ConnectionProbe.class);
protected final DefaultMonitoringConfig<ThreadPoolProbe> threadPoolMonitoringConfig =
new DefaultMonitoringConfig<ThreadPoolProbe>(ThreadPoolProbe.class);
public AbstractTransport(String name) {
this.name = name;
state = new StateHolder<State>(State.STOPPED);
}
@Override
public String getName() {
return name;
}
@Override
public void setName(String name) {
this.name = name;
notifyProbesConfigChanged(this);
}
@Override
public boolean isBlocking() {
return isBlocking;
}
@Override
public void configureBlocking(boolean isBlocking) {
this.isBlocking = isBlocking;
notifyProbesConfigChanged(this);
}
@Override
public boolean isStandalone() {
return isStandalone;
}
@Override
public StateHolder<State> getState() {
return state;
}
@Override
public int getReadBufferSize() {
return readBufferSize;
}
@Override
public void setReadBufferSize(int readBufferSize) {
this.readBufferSize = readBufferSize;
notifyProbesConfigChanged(this);
}
@Override
public int getWriteBufferSize() {
return writeBufferSize;
}
@Override
public void setWriteBufferSize(int writeBufferSize) {
this.writeBufferSize = writeBufferSize;
notifyProbesConfigChanged(this);
}
@Override
public boolean isStopped() {
final State currentState = state.getState();
return currentState == State.STOPPED || currentState == State.STOPPING;
}
@Override
public boolean isPaused() {
return (state.getState() == State.PAUSED);
}
@Override
public Processor obtainProcessor(IOEvent ioEvent, Connection connection) {
if (processor != null && processor.isInterested(ioEvent)) {
return processor;
} else if (processorSelector != null) {
return processorSelector.select(ioEvent, connection);
}
return null;
}
@Override
public Processor getProcessor() {
return processor;
}
@Override
public void setProcessor(Processor processor) {
this.processor = processor;
notifyProbesConfigChanged(this);
}
@Override
public ProcessorSelector getProcessorSelector() {
return processorSelector;
}
@Override
public void setProcessorSelector(ProcessorSelector selector) {
processorSelector = selector;
notifyProbesConfigChanged(this);
}
@Override
public IOStrategy getIOStrategy() {
return strategy;
}
@Override
public void setIOStrategy(IOStrategy IOStrategy) {
this.strategy = IOStrategy;
final ThreadPoolConfig strategyConfig = IOStrategy.createDefaultWorkerPoolConfig(this);
if (strategyConfig == null) {
workerPoolConfig = null;
} else {
if (workerPoolConfig == null) {
setWorkerThreadPoolConfig(strategyConfig);
}
}
notifyProbesConfigChanged(this);
}
@Override
public MemoryManager getMemoryManager() {
return memoryManager;
}
@Override
public void setMemoryManager(MemoryManager memoryManager) {
this.memoryManager = memoryManager;
notifyProbesConfigChanged(this);
}
@Override
public ExecutorService getWorkerThreadPool() {
return workerThreadPool;
}
@Override
public ExecutorService getKernelThreadPool() {
return kernelPool;
}
@Override
public void setKernelThreadPool(ExecutorService kernelPool) {
this.kernelPool = kernelPool;
}
@Override
public void setKernelThreadPoolConfig(ThreadPoolConfig kernelPoolConfig) {
if (isStopped()) {
this.kernelPoolConfig = kernelPoolConfig;
}
}
@Override
public void setWorkerThreadPoolConfig(ThreadPoolConfig workerPoolConfig) {
if (isStopped()) {
this.workerPoolConfig = workerPoolConfig;
}
}
@Override
public ThreadPoolConfig getKernelThreadPoolConfig() {
return ((isStopped()) ? kernelPoolConfig : kernelPoolConfig.copy());
}
@Override
public ThreadPoolConfig getWorkerThreadPoolConfig() {
return ((isStopped()) ? workerPoolConfig : workerPoolConfig.copy());
}
@SuppressWarnings("unchecked")
@Override
public void setWorkerThreadPool(final ExecutorService threadPool) {
managedWorkerPool = false;
if (threadPool instanceof MonitoringAware) {
if (threadPoolMonitoringConfig.hasProbes()) {
((MonitoringAware<ThreadPoolProbe>) threadPool).getMonitoringConfig()
.addProbes(threadPoolMonitoringConfig.getProbes());
}
}
setWorkerThreadPool0(threadPool);
}
protected void setWorkerThreadPool0(final ExecutorService threadPool) {
this.workerThreadPool = threadPool;
notifyProbesConfigChanged(this);
}
protected void setKernelPool0(final ExecutorService kernelPool) {
this.kernelPool = kernelPool;
}
@Override
public AttributeBuilder getAttributeBuilder() {
return attributeBuilder;
}
@Override
public void setAttributeBuilder(AttributeBuilder attributeBuilder) {
this.attributeBuilder = attributeBuilder;
notifyProbesConfigChanged(this);
}
protected abstract void closeConnection(Connection connection) throws IOException;
@Override
public MonitoringConfig<ConnectionProbe> getConnectionMonitoringConfig() {
return connectionMonitoringConfig;
}
@Override
public MonitoringConfig<TransportProbe> getMonitoringConfig() {
return transportMonitoringConfig;
}
@Override
public MonitoringConfig<ThreadPoolProbe> getThreadPoolMonitoringConfig() {
return threadPoolMonitoringConfig;
}
@Override
public long getReadTimeout(TimeUnit timeUnit) {
if (readTimeout <= 0) {
return -1;
} else {
return timeUnit.convert(readTimeout, TimeUnit.MILLISECONDS);
}
}
@Override
public void setReadTimeout(long timeout, TimeUnit timeUnit) {
if (timeout <= 0) {
readTimeout = -1;
} else {
readTimeout = TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
}
}
@Override
public long getWriteTimeout(TimeUnit timeUnit) {
if (writeTimeout <= 0) {
return -1;
} else {
return timeUnit.convert(writeTimeout, TimeUnit.MILLISECONDS);
}
}
@Override
public void setWriteTimeout(long timeout, TimeUnit timeUnit) {
if (timeout <= 0) {
writeTimeout = -1;
} else {
writeTimeout = TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
}
}
protected static void notifyProbesBeforeStart(final AbstractTransport transport) {
final TransportProbe[] probes =
transport.transportMonitoringConfig.getProbesUnsafe();
if (probes != null) {
for (TransportProbe probe : probes) {
probe.onBeforeStartEvent(transport);
}
}
}
protected static void notifyProbesBeforeStop(final AbstractTransport transport) {
final TransportProbe[] probes =
transport.transportMonitoringConfig.getProbesUnsafe();
if (probes != null) {
for (TransportProbe probe : probes) {
probe.onBeforeStopEvent(transport);
}
}
}
protected static void notifyProbesStop(final AbstractTransport transport) {
final TransportProbe[] probes =
transport.transportMonitoringConfig.getProbesUnsafe();
if (probes != null) {
for (TransportProbe probe : probes) {
probe.onStopEvent(transport);
}
}
}
protected static void notifyProbesBeforePause(final AbstractTransport transport) {
final TransportProbe[] probes =
transport.transportMonitoringConfig.getProbesUnsafe();
if (probes != null) {
for (TransportProbe probe : probes) {
probe.onBeforePauseEvent(transport);
}
}
}
protected static void notifyProbesPause(final AbstractTransport transport) {
final TransportProbe[] probes =
transport.transportMonitoringConfig.getProbesUnsafe();
if (probes != null) {
for (TransportProbe probe : probes) {
probe.onPauseEvent(transport);
}
}
}
protected static void notifyProbesBeforeResume(final AbstractTransport transport) {
final TransportProbe[] probes =
transport.transportMonitoringConfig.getProbesUnsafe();
if (probes != null) {
for (TransportProbe probe : probes) {
probe.onBeforeStartEvent(transport);
}
}
}
protected static void notifyProbesConfigChanged(final AbstractTransport transport) {
final TransportProbe[] probes =
transport.transportMonitoringConfig.getProbesUnsafe();
if (probes != null) {
for (TransportProbe probe : probes) {
probe.onConfigChangeEvent(transport);
}
}
}
@Deprecated
@Override
public void stop() throws IOException {
shutdownNow();
}
protected abstract Object createJmxManagementObject();
}