package org.glassfish.grizzly.nio.transport;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.glassfish.grizzly.*;
import org.glassfish.grizzly.asyncqueue.AsyncQueueWriter;
import org.glassfish.grizzly.localization.LogMessages;
import org.glassfish.grizzly.nio.NIOConnection;
import org.glassfish.grizzly.nio.SelectorRunner;
import org.glassfish.grizzly.utils.Holder;
import org.glassfish.grizzly.utils.NullaryFunction;
@SuppressWarnings("unchecked")
public class TCPNIOConnection extends NIOConnection {
private static final Logger LOGGER = Grizzly.logger(TCPNIOConnection.class);
Holder<SocketAddress> localSocketAddressHolder;
Holder<SocketAddress> peerSocketAddressHolder;
private int readBufferSize = -1;
private int writeBufferSize = -1;
private AtomicReference<ConnectResultHandler> connectHandlerRef;
public TCPNIOConnection(TCPNIOTransport transport,
SelectableChannel channel) {
super(transport);
this.channel = channel;
}
@Override
protected void setSelectionKey(SelectionKey selectionKey) {
super.setSelectionKey(selectionKey);
}
@Override
protected void setSelectorRunner(SelectorRunner selectorRunner) {
super.setSelectorRunner(selectorRunner);
}
@Override
protected void preClose() {
checkConnectFailed(null);
super.preClose();
}
protected boolean notifyReady() {
return connectCloseSemaphoreUpdater.compareAndSet(this, null,
NOTIFICATION_INITIALIZED);
}
@Override
public SocketAddress getPeerAddress() {
return peerSocketAddressHolder.get();
}
@Override
public SocketAddress getLocalAddress() {
return localSocketAddressHolder.get();
}
protected void resetProperties() {
if (channel != null) {
setReadBufferSize(transport.getReadBufferSize());
setWriteBufferSize(transport.getWriteBufferSize());
final int transportMaxAsyncWriteQueueSize =
((TCPNIOTransport) transport).getAsyncQueueIO()
.getWriter().getMaxPendingBytesPerConnection();
setMaxAsyncWriteQueueSize(
transportMaxAsyncWriteQueueSize == AsyncQueueWriter.AUTO_SIZE
? getWriteBufferSize() * 4
: transportMaxAsyncWriteQueueSize);
localSocketAddressHolder = Holder.lazyHolder(
new NullaryFunction<SocketAddress>() {
@Override
public SocketAddress evaluate() {
return ((SocketChannel) channel).socket().getLocalSocketAddress();
}
});
peerSocketAddressHolder = Holder.lazyHolder(
new NullaryFunction<SocketAddress>() {
@Override
public SocketAddress evaluate() {
return ((SocketChannel) channel).socket().getRemoteSocketAddress();
}
});
}
}
@Override
public int getReadBufferSize() {
if (readBufferSize >= 0) {
return readBufferSize;
}
try {
readBufferSize = ((SocketChannel) channel).socket().getReceiveBufferSize();
} catch (IOException e) {
LOGGER.log(Level.FINE,
LogMessages.WARNING_GRIZZLY_CONNECTION_GET_READBUFFER_SIZE_EXCEPTION(),
e);
readBufferSize = 0;
}
return readBufferSize;
}
@Override
public void setReadBufferSize(final int readBufferSize) {
if (readBufferSize > 0) {
try {
final int currentReadBufferSize = ((SocketChannel) channel).socket().getReceiveBufferSize();
if (readBufferSize > currentReadBufferSize) {
((SocketChannel) channel).socket().setReceiveBufferSize(readBufferSize);
}
this.readBufferSize = readBufferSize;
} catch (IOException e) {
LOGGER.log(Level.WARNING,
LogMessages.WARNING_GRIZZLY_CONNECTION_SET_READBUFFER_SIZE_EXCEPTION(),
e);
}
}
}
@Override
public int getWriteBufferSize() {
if (writeBufferSize >= 0) {
return writeBufferSize;
}
try {
writeBufferSize = ((SocketChannel) channel).socket().getSendBufferSize();
} catch (IOException e) {
LOGGER.log(Level.FINE,
LogMessages.WARNING_GRIZZLY_CONNECTION_GET_WRITEBUFFER_SIZE_EXCEPTION(),
e);
writeBufferSize = 0;
}
return writeBufferSize;
}
@Override
public void setWriteBufferSize(int writeBufferSize) {
if (writeBufferSize > 0) {
try {
final int currentSendBufferSize = ((SocketChannel) channel).socket().getSendBufferSize();
if (writeBufferSize > currentSendBufferSize) {
((SocketChannel) channel).socket().setSendBufferSize(writeBufferSize);
}
this.writeBufferSize = writeBufferSize;
} catch (IOException e) {
LOGGER.log(Level.WARNING,
LogMessages.WARNING_GRIZZLY_CONNECTION_SET_WRITEBUFFER_SIZE_EXCEPTION(),
e);
}
}
}
protected final void setConnectResultHandler(
final ConnectResultHandler connectHandler) {
connectHandlerRef =
new AtomicReference<ConnectResultHandler>(connectHandler);
}
protected final void onConnect() throws IOException {
final AtomicReference<ConnectResultHandler> localRef = connectHandlerRef;
final ConnectResultHandler localConnectHandler;
if (localRef != null &&
(localConnectHandler = localRef.getAndSet(null)) != null) {
localConnectHandler.connected();
connectHandlerRef = null;
}
notifyProbesConnect(this);
}
protected final void checkConnectFailed(Throwable failure) {
final AtomicReference<ConnectResultHandler> localRef = connectHandlerRef;
final ConnectResultHandler localConnectHandler;
if (localRef != null &&
(localConnectHandler = localRef.getAndSet(null)) != null) {
if (failure == null) {
failure = new IOException("closed");
}
localConnectHandler.failed(failure);
connectHandlerRef = null;
}
}
@Override
protected void terminate0(
final CompletionHandler<Closeable> completionHandler,
final CloseReason closeReason) {
super.terminate0(completionHandler, closeReason);
}
protected final void onRead(Buffer data, int size) {
if (size > 0) {
notifyProbesRead(this, data, size);
}
checkEmptyRead(size);
}
@Override
protected void enableInitialOpRead() throws IOException {
super.enableInitialOpRead();
}
protected final void onWrite(Buffer data, long size) {
notifyProbesWrite(this, data, size);
}
@Override
public boolean canWrite() {
return transport.getWriter(this).canWrite(this);
}
@Deprecated
@Override
public boolean canWrite(int length) {
return transport.getWriter(this).canWrite(this);
}
@Override
public void notifyCanWrite(final WriteHandler writeHandler) {
transport.getWriter(this).notifyWritePossible(this, writeHandler);
}
@Deprecated
@Override
public void notifyCanWrite(WriteHandler handler, int length) {
transport.getWriter(this).notifyWritePossible(this, handler);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("TCPNIOConnection");
sb.append("{localSocketAddress=").append(localSocketAddressHolder);
sb.append(", peerSocketAddress=").append(peerSocketAddressHolder);
sb.append('}');
return sb.toString();
}
protected interface ConnectResultHandler {
void connected() throws IOException;
void failed(final Throwable t);
}
}