package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
{
private static final Logger LOG = Log.getLogger(AbstractEndPoint.class);
private final AtomicReference<State> _state = new AtomicReference<>(State.OPEN);
private final long _created = System.currentTimeMillis();
private volatile Connection _connection;
private final FillInterest _fillInterest = new FillInterest()
{
@Override
protected void needsFillInterest() throws IOException
{
AbstractEndPoint.this.needsFillInterest();
}
};
private final WriteFlusher _writeFlusher = new WriteFlusher(this)
{
@Override
protected void onIncompleteFlush()
{
AbstractEndPoint.this.onIncompleteFlush();
}
};
protected AbstractEndPoint(Scheduler scheduler)
{
super(scheduler);
}
protected final void shutdownInput()
{
if (LOG.isDebugEnabled())
LOG.debug("shutdownInput {}", this);
while (true)
{
State s = _state.get();
switch (s)
{
case OPEN:
if (!_state.compareAndSet(s, State.ISHUTTING))
continue;
try
{
doShutdownInput();
}
finally
{
if (!_state.compareAndSet(State.ISHUTTING, State.ISHUT))
{
if (_state.get() == State.CLOSED)
doOnClose(null);
else
throw new IllegalStateException();
}
}
return;
case ISHUTTING:
case ISHUT:
return;
case OSHUTTING:
if (!_state.compareAndSet(s, State.CLOSED))
continue;
return;
case OSHUT:
if (!_state.compareAndSet(s, State.CLOSED))
continue;
doOnClose(null);
return;
case CLOSED:
return;
default:
throw new IllegalStateException(s.toString());
}
}
}
@Override
public final void shutdownOutput()
{
if (LOG.isDebugEnabled())
LOG.debug("shutdownOutput {}", this);
while (true)
{
State s = _state.get();
switch (s)
{
case OPEN:
if (!_state.compareAndSet(s, State.OSHUTTING))
continue;
try
{
doShutdownOutput();
}
finally
{
if (!_state.compareAndSet(State.OSHUTTING, State.OSHUT))
{
if (_state.get() == State.CLOSED)
doOnClose(null);
else
throw new IllegalStateException();
}
}
return;
case ISHUTTING:
if (!_state.compareAndSet(s, State.CLOSED))
continue;
return;
case ISHUT:
if (!_state.compareAndSet(s, State.CLOSED))
continue;
doOnClose(null);
return;
case OSHUTTING:
case OSHUT:
return;
case CLOSED:
return;
default:
throw new IllegalStateException(s.toString());
}
}
}
@Override
public final void close()
{
if (LOG.isDebugEnabled())
LOG.debug("close {}", this);
close(null);
}
public final void close(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("close({}) {}", failure, this);
while (true)
{
State s = _state.get();
switch (s)
{
case OPEN:
case ISHUT:
case OSHUT:
if (!_state.compareAndSet(s, State.CLOSED))
continue;
doOnClose(failure);
return;
case ISHUTTING:
case OSHUTTING:
if (!_state.compareAndSet(s, State.CLOSED))
continue;
return;
case CLOSED:
return;
default:
throw new IllegalStateException(s.toString());
}
}
}
protected void doShutdownInput()
{
}
protected void doShutdownOutput()
{
}
private void doOnClose(Throwable failure)
{
try
{
doClose();
}
finally
{
if (failure == null)
onClose();
else
onClose(failure);
}
}
protected void doClose()
{
}
@Override
public boolean isOutputShutdown()
{
switch (_state.get())
{
case CLOSED:
case OSHUT:
case OSHUTTING:
return true;
default:
return false;
}
}
@Override
public boolean isInputShutdown()
{
switch (_state.get())
{
case CLOSED:
case ISHUT:
case ISHUTTING:
return true;
default:
return false;
}
}
@Override
public boolean isOpen()
{
switch (_state.get())
{
case CLOSED:
return false;
default:
return true;
}
}
public void checkFlush() throws IOException
{
State s = _state.get();
switch (s)
{
case OSHUT:
case OSHUTTING:
case CLOSED:
throw new IOException(s.toString());
default:
break;
}
}
public void checkFill() throws IOException
{
State s = _state.get();
switch (s)
{
case ISHUT:
case ISHUTTING:
case CLOSED:
throw new IOException(s.toString());
default:
break;
}
}
@Override
public long getCreatedTimeStamp()
{
return _created;
}
@Override
public Connection getConnection()
{
return _connection;
}
@Override
public void setConnection(Connection connection)
{
_connection = connection;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return false;
}
protected void reset()
{
_state.set(State.OPEN);
_writeFlusher.onClose();
_fillInterest.onClose();
}
@Override
public void onOpen()
{
if (LOG.isDebugEnabled())
LOG.debug("onOpen {}", this);
if (_state.get() != State.OPEN)
throw new IllegalStateException();
}
@Override
public final void onClose()
{
onClose(null);
}
@Override
public void onClose(Throwable failure)
{
super.onClose();
if (failure == null)
{
_writeFlusher.onClose();
_fillInterest.onClose();
}
else
{
_writeFlusher.onFail(failure);
_fillInterest.onFail(failure);
}
}
@Override
public void fillInterested(Callback callback)
{
notIdle();
_fillInterest.register(callback);
}
@Override
public boolean tryFillInterested(Callback callback)
{
notIdle();
return _fillInterest.tryRegister(callback);
}
@Override
public boolean isFillInterested()
{
return _fillInterest.isInterested();
}
@Override
public void write(Callback callback, ByteBuffer... buffers) throws IllegalStateException
{
_writeFlusher.write(callback, buffers);
}
protected abstract void onIncompleteFlush();
protected abstract void needsFillInterest() throws IOException;
public FillInterest getFillInterest()
{
return _fillInterest;
}
protected WriteFlusher getWriteFlusher()
{
return _writeFlusher;
}
@Override
protected void onIdleExpired(TimeoutException timeout)
{
Connection connection = _connection;
if (connection != null && !connection.onIdleExpired())
return;
boolean outputShutdown = isOutputShutdown();
boolean inputShutdown = isInputShutdown();
boolean fillFailed = _fillInterest.onFail(timeout);
boolean writeFailed = _writeFlusher.onFail(timeout);
if (isOpen() && (outputShutdown || inputShutdown) && !(fillFailed || writeFailed))
close();
else
LOG.debug("Ignored idle endpoint {}", this);
}
@Override
public void upgrade(Connection newConnection)
{
Connection oldConnection = getConnection();
ByteBuffer prefilled = (oldConnection instanceof Connection.UpgradeFrom)
? ((Connection.UpgradeFrom)oldConnection).onUpgradeFrom()
: null;
oldConnection.onClose(null);
oldConnection.getEndPoint().setConnection(newConnection);
if (LOG.isDebugEnabled())
LOG.debug("{} upgrading from {} to {} with {}",
this, oldConnection, newConnection, BufferUtil.toDetailString(prefilled));
if (newConnection instanceof Connection.UpgradeTo)
((Connection.UpgradeTo)newConnection).onUpgradeTo(prefilled);
else if (BufferUtil.hasContent(prefilled))
throw new IllegalStateException("Cannot upgrade: " + newConnection + " does not implement " + Connection.UpgradeTo.class.getName());
newConnection.onOpen();
}
@Override
public String toString()
{
return String.format("%s->%s", toEndPointString(), toConnectionString());
}
public String toEndPointString()
{
Class<?> c = getClass();
String name = c.getSimpleName();
while (name.length() == 0 && c.getSuperclass() != null)
{
c = c.getSuperclass();
name = c.getSimpleName();
}
return String.format("%s@%h{%s<->%s,%s,fill=%s,flush=%s,to=%d/%d}",
name,
this,
getRemoteAddress(),
getLocalAddress(),
_state.get(),
_fillInterest.toStateString(),
_writeFlusher.toStateString(),
getIdleFor(),
getIdleTimeout());
}
public String toConnectionString()
{
Connection connection = getConnection();
if (connection == null)
return "<null>";
if (connection instanceof AbstractConnection)
return ((AbstractConnection)connection).toConnectionString();
return String.format("%s@%x", connection.getClass().getSimpleName(), connection.hashCode());
}
private enum State
{
OPEN, ISHUTTING, ISHUT, OSHUTTING, OSHUT, CLOSED
}
}