/*
* JBoss, Home of Professional Open Source
*
* Copyright 2011 Red Hat, Inc. and/or its affiliates, and individual
* contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.xnio;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.FileChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.xnio.channels.AcceptingChannel;
import org.xnio.channels.Channels;
import org.xnio.channels.ConnectedChannel;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.channels.StreamSourceChannel;
import org.xnio.channels.SuspendableReadChannel;
import org.xnio.channels.SuspendableWriteChannel;
import org.xnio.channels.WritableMessageChannel;
import static org.xnio._private.Messages.listenerMsg;
import static org.xnio._private.Messages.msg;
Channel listener utility methods.
Author: David M. Lloyd
/**
* Channel listener utility methods.
*
* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
*/
@SuppressWarnings("unused")
public final class ChannelListeners {
private static final ChannelListener<Channel> NULL_LISTENER = new ChannelListener<Channel>() {
public void handleEvent(final Channel channel) {
}
public String toString() {
return "Null channel listener";
}
};
private static final ChannelListener.Setter<?> NULL_SETTER = new ChannelListener.Setter<Channel>() {
public void set(final ChannelListener<? super Channel> channelListener) {
}
public String toString() {
return "Null channel listener setter";
}
};
private static ChannelListener<Channel> CLOSING_CHANNEL_LISTENER = new ChannelListener<Channel>() {
public void handleEvent(final Channel channel) {
IoUtils.safeClose(channel);
}
public String toString() {
return "Closing channel listener";
}
};
private ChannelListeners() {
}
Invoke a channel listener on a given channel, logging any errors.
Params: - channel – the channel
- channelListener – the channel listener
Type parameters: - <T> – the channel type
Returns: true
if the listener completed successfully, or false
if it failed
/**
* Invoke a channel listener on a given channel, logging any errors.
*
* @param channel the channel
* @param channelListener the channel listener
* @param <T> the channel type
* @return {@code true} if the listener completed successfully, or {@code false} if it failed
*/
public static <T extends Channel> boolean invokeChannelListener(T channel, ChannelListener<? super T> channelListener) {
if (channelListener != null) try {
listenerMsg.tracef("Invoking listener %s on channel %s", channelListener, channel);
channelListener.handleEvent(channel);
} catch (Throwable t) {
listenerMsg.listenerException(t);
return false;
}
return true;
}
Invoke a channel listener on a given channel, logging any errors, using the given executor.
Params: - executor – the executor
- channel – the channel
- channelListener – the channel listener
Type parameters: - <T> – the channel type
/**
* Invoke a channel listener on a given channel, logging any errors, using the given executor.
*
* @param executor the executor
* @param channel the channel
* @param channelListener the channel listener
* @param <T> the channel type
*/
public static <T extends Channel> void invokeChannelListener(Executor executor, T channel, ChannelListener<? super T> channelListener) {
try {
executor.execute(getChannelListenerTask(channel, channelListener));
} catch (RejectedExecutionException ree) {
invokeChannelListener(channel, channelListener);
}
}
Safely invoke a channel exception handler, logging any errors.
Params: - channel – the channel
- exceptionHandler – the exception handler
- exception – the exception to pass in
Type parameters: - <T> – the exception type
/**
* Safely invoke a channel exception handler, logging any errors.
*
* @param channel the channel
* @param exceptionHandler the exception handler
* @param exception the exception to pass in
* @param <T> the exception type
*/
public static <T extends Channel> void invokeChannelExceptionHandler(final T channel, final ChannelExceptionHandler<? super T> exceptionHandler, final IOException exception) {
try {
exceptionHandler.handleException(channel, exception);
} catch (Throwable t) {
listenerMsg.exceptionHandlerException(t);
}
}
Get a task which invokes the given channel listener on the given channel.
Params: - channel – the channel
- channelListener – the channel listener
Type parameters: - <T> – the channel type
Returns: the runnable task
/**
* Get a task which invokes the given channel listener on the given channel.
*
* @param channel the channel
* @param channelListener the channel listener
* @param <T> the channel type
* @return the runnable task
*/
public static <T extends Channel> Runnable getChannelListenerTask(final T channel, final ChannelListener<? super T> channelListener) {
return new Runnable() {
public String toString() {
return "Channel listener task for " + channel + " -> " + channelListener;
}
public void run() {
invokeChannelListener(channel, channelListener);
}
};
}
Get a task which invokes the given channel listener on the given channel via its setter.
Params: - channel – the channel
- setter – the setter for the channel listener
Type parameters: - <T> – the channel type
Returns: the runnable task
/**
* Get a task which invokes the given channel listener on the given channel via its setter.
*
* @param channel the channel
* @param setter the setter for the channel listener
* @param <T> the channel type
* @return the runnable task
*/
public static <T extends Channel> Runnable getChannelListenerTask(final T channel, final ChannelListener.SimpleSetter<T> setter) {
return new Runnable() {
public String toString() {
return "Channel listener task for " + channel + " -> " + setter;
}
public void run() {
invokeChannelListener(channel, setter.get());
}
};
}
Get a channel listener which closes the channel when notified.
Returns: the channel listener
/**
* Get a channel listener which closes the channel when notified.
*
* @return the channel listener
*/
public static ChannelListener<Channel> closingChannelListener() {
return CLOSING_CHANNEL_LISTENER;
}
Get a channel listener which closes the given resource when notified.
Params: - resource – the resource to close
Returns: the channel listener
/**
* Get a channel listener which closes the given resource when notified.
*
* @param resource the resource to close
* @return the channel listener
*/
public static ChannelListener<Channel> closingChannelListener(final Closeable resource) {
return new ChannelListener<Channel>() {
public void handleEvent(final Channel channel) {
IoUtils.safeClose(resource);
}
public String toString() {
return "Closing channel listener for " + resource;
}
};
}
Get a channel listener which closes the given resources when notified.
Params: - resources – the resources to close
Returns: the channel listener
/**
* Get a channel listener which closes the given resources when notified.
*
* @param resources the resources to close
* @return the channel listener
*/
public static ChannelListener<Channel> closingChannelListener(final Closeable... resources) {
return new ChannelListener<Channel>() {
public void handleEvent(final Channel channel) {
IoUtils.safeClose(resources);
}
public String toString() {
return "Closing channel listener for " + resources.length + " items";
}
};
}
Get a channel listener which closes the given resource when notified.
Params: - delegate – the listener to call next
- resource – the resource to close
Returns: the channel listener
/**
* Get a channel listener which closes the given resource when notified.
*
* @param delegate the listener to call next
* @param resource the resource to close
* @return the channel listener
*/
public static <T extends Channel> ChannelListener<T> closingChannelListener(final ChannelListener<T> delegate, final Closeable resource) {
return new ChannelListener<T>() {
public void handleEvent(final T channel) {
IoUtils.safeClose(resource);
delegate.handleEvent(channel);
}
public String toString() {
return "Closing channel listener for " + resource + " -> " + delegate;
}
};
}
Get a channel listener which closes the given resource when notified.
Params: - delegate – the listener to call next
- resources – the resource to close
Returns: the channel listener
/**
* Get a channel listener which closes the given resource when notified.
*
* @param delegate the listener to call next
* @param resources the resource to close
* @return the channel listener
*/
public static <T extends Channel> ChannelListener<T> closingChannelListener(final ChannelListener<T> delegate, final Closeable... resources) {
return new ChannelListener<T>() {
public void handleEvent(final T channel) {
IoUtils.safeClose(resources);
delegate.handleEvent(channel);
}
public String toString() {
return "Closing channel listener for " + resources.length + " items -> " + delegate;
}
};
}
Get a channel listener which does nothing.
Returns: the null channel listener
/**
* Get a channel listener which does nothing.
*
* @return the null channel listener
*/
public static ChannelListener<Channel> nullChannelListener() {
return NULL_LISTENER;
}
Get a channel exception handler which closes the channel upon exception.
Returns: the channel exception handler
/**
* Get a channel exception handler which closes the channel upon exception.
*
* @return the channel exception handler
*/
public static ChannelExceptionHandler<Channel> closingChannelExceptionHandler() {
return CLOSING_HANDLER;
}
Create an open listener adapter which automatically accepts connections and invokes an open listener.
Params: - openListener – the channel open listener
Type parameters: - <C> – the connected channel type
Returns: a channel accept listener
/**
* Create an open listener adapter which automatically accepts connections and invokes an open listener.
*
* @param openListener the channel open listener
* @param <C> the connected channel type
* @return a channel accept listener
*/
public static <C extends ConnectedChannel> ChannelListener<AcceptingChannel<C>> openListenerAdapter(final ChannelListener<? super C> openListener) {
if (openListener == null) {
throw msg.nullParameter("openListener");
}
return new ChannelListener<AcceptingChannel<C>>() {
public void handleEvent(final AcceptingChannel<C> channel) {
try {
final C accepted = channel.accept();
if (accepted != null) {
invokeChannelListener(accepted, openListener);
}
} catch (IOException e) {
listenerMsg.acceptFailed(channel, e);
}
}
public String toString() {
return "Accepting listener for " + openListener;
}
};
}
Get a setter based on an atomic reference field updater. Used by channel implementations to avoid having to
define an anonymous class for each listener field.
Params: - channel – the channel
- updater – the updater
Type parameters: Returns: the setter Deprecated: Not recommended as a security manager will enforce unreasonable restrictions on the updater.
/**
* Get a setter based on an atomic reference field updater. Used by channel implementations to avoid having to
* define an anonymous class for each listener field.
*
* @param channel the channel
* @param updater the updater
* @param <T> the channel type
* @param <C> the holding class
* @return the setter
* @deprecated Not recommended as a security manager will enforce unreasonable restrictions on the updater.
*/
@Deprecated
public static <T extends Channel, C> ChannelListener.Setter<T> getSetter(final C channel, final AtomicReferenceFieldUpdater<C, ChannelListener> updater) {
return new ChannelListener.Setter<T>() {
public void set(final ChannelListener<? super T> channelListener) {
updater.set(channel, channelListener);
}
public String toString() {
return "Atomic reference field updater setter for " + updater;
}
};
}
Get a setter based on an atomic reference. Used by channel implementations to avoid having to
define an anonymous class for each listener field.
Params: - atomicReference – the atomic reference
Type parameters: - <T> – the channel type
Returns: the setter
/**
* Get a setter based on an atomic reference. Used by channel implementations to avoid having to
* define an anonymous class for each listener field.
*
* @param atomicReference the atomic reference
* @param <T> the channel type
* @return the setter
*/
public static <T extends Channel> ChannelListener.Setter<T> getSetter(final AtomicReference<ChannelListener<? super T>> atomicReference) {
return new ChannelListener.Setter<T>() {
public void set(final ChannelListener<? super T> channelListener) {
atomicReference.set(channelListener);
}
public String toString() {
return "Atomic reference setter (currently=" + atomicReference.get() + ")";
}
};
}
Get a channel listener setter which delegates to the given target setter with a different channel type.
Params: - target – the target setter
- realChannel – the channel to send in to the listener
Type parameters: - <T> – the real channel type
Returns: the delegating setter
/**
* Get a channel listener setter which delegates to the given target setter with a different channel type.
*
* @param target the target setter
* @param realChannel the channel to send in to the listener
* @param <T> the real channel type
* @return the delegating setter
*/
public static <T extends Channel> ChannelListener.Setter<T> getDelegatingSetter(final ChannelListener.Setter<? extends Channel> target, final T realChannel) {
return target == null ? null : new DelegatingSetter<T>(target, realChannel);
}
Get a channel listener setter which does nothing.
Type parameters: - <T> – the channel type
Returns: a setter which does nothing
/**
* Get a channel listener setter which does nothing.
*
* @param <T> the channel type
* @return a setter which does nothing
*/
@SuppressWarnings({ "unchecked" })
public static <T extends Channel> ChannelListener.Setter<T> nullSetter() {
return (ChannelListener.Setter<T>) NULL_SETTER;
}
Get a channel listener which executes a delegate channel listener via an executor. If an exception occurs
submitting the task, the associated channel is closed.
Params: - listener – the listener to invoke
- executor – the executor with which to invoke the listener
Type parameters: - <T> – the channel type
Returns: a delegating channel listener
/**
* Get a channel listener which executes a delegate channel listener via an executor. If an exception occurs
* submitting the task, the associated channel is closed.
*
* @param listener the listener to invoke
* @param executor the executor with which to invoke the listener
* @param <T> the channel type
* @return a delegating channel listener
*/
public static <T extends Channel> ChannelListener<T> executorChannelListener(final ChannelListener<T> listener, final Executor executor) {
return new ChannelListener<T>() {
public void handleEvent(final T channel) {
try {
executor.execute(getChannelListenerTask(channel, listener));
} catch (RejectedExecutionException e) {
listenerMsg.executorSubmitFailed(e, channel);
IoUtils.safeClose(channel);
}
}
public String toString() {
return "Executor channel listener -> " + listener;
}
};
}
A flushing channel listener. Flushes the channel and then calls the delegate listener. Calls the exception
handler if an exception occurs. The delegate listener should ensure that the channel write listener is appropriately set.
The returned listener is stateless and may be reused on any number of channels concurrently or sequentially.
Params: - delegate – the delegate listener
- exceptionHandler – the exception handler
Type parameters: - <T> – the channel type
Returns: the flushing channel listener
/**
* A flushing channel listener. Flushes the channel and then calls the delegate listener. Calls the exception
* handler if an exception occurs. The delegate listener should ensure that the channel write listener is appropriately set.
* <p>
* The returned listener is stateless and may be reused on any number of channels concurrently or sequentially.
*
* @param delegate the delegate listener
* @param exceptionHandler the exception handler
* @param <T> the channel type
* @return the flushing channel listener
*/
public static <T extends SuspendableWriteChannel> ChannelListener<T> flushingChannelListener(final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
return new ChannelListener<T>() {
public void handleEvent(final T channel) {
final boolean result;
try {
result = channel.flush();
} catch (IOException e) {
channel.suspendWrites();
invokeChannelExceptionHandler(channel, exceptionHandler, e);
return;
}
if (result) {
Channels.setWriteListener(channel, delegate);
invokeChannelListener(channel, delegate);
} else {
Channels.setWriteListener(channel, this);
channel.resumeWrites();
}
}
public String toString() {
return "Flushing channel listener -> " + delegate;
}
};
}
A write shutdown channel listener. Shuts down and flushes the channel and calls the delegate listener. Calls
the exception handler if an exception occurs. When the delegate listener is called, the channel's write side will be shut down and flushed.
The delegate listener should ensure that the channel write listener is appropriately set.
Params: - delegate – the delegate listener
- exceptionHandler – the exception handler
Type parameters: - <T> – the channel type
Returns: the channel listener
/**
* A write shutdown channel listener. Shuts down and flushes the channel and calls the delegate listener. Calls
* the exception handler if an exception occurs. When the delegate listener is called, the channel's write side will be shut down and flushed.
* The delegate listener should ensure that the channel write listener is appropriately set.
*
* @param delegate the delegate listener
* @param exceptionHandler the exception handler
* @param <T> the channel type
* @return the channel listener
*/
public static <T extends SuspendableWriteChannel> ChannelListener<T> writeShutdownChannelListener(final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
final ChannelListener<T> flushingListener = flushingChannelListener(delegate, exceptionHandler);
return new ChannelListener<T>() {
public void handleEvent(final T channel) {
try {
channel.shutdownWrites();
} catch (IOException e) {
invokeChannelExceptionHandler(channel, exceptionHandler, e);
return;
}
invokeChannelListener(channel, flushingListener);
}
public String toString() {
return "Write shutdown channel listener -> " + delegate;
}
};
}
A writing channel listener. Writes the buffer to the channel and then calls the delegate listener. Calls the exception
handler if an exception occurs. The delegate listener should ensure that the channel write listener is appropriately set.
The returned listener is stateful and will not execute properly if reused.
Params: - pooled – the buffer to write
- delegate – the delegate listener
- exceptionHandler – the exception handler
Type parameters: - <T> – the channel type
Returns: the writing channel listener
/**
* A writing channel listener. Writes the buffer to the channel and then calls the delegate listener. Calls the exception
* handler if an exception occurs. The delegate listener should ensure that the channel write listener is appropriately set.
* <p>
* The returned listener is stateful and will not execute properly if reused.
*
* @param pooled the buffer to write
* @param delegate the delegate listener
* @param exceptionHandler the exception handler
* @param <T> the channel type
* @return the writing channel listener
*/
public static <T extends StreamSinkChannel> ChannelListener<T> writingChannelListener(final Pooled<ByteBuffer> pooled, final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
return new ChannelListener<T>() {
public void handleEvent(final T channel) {
final ByteBuffer buffer = pooled.getResource();
int result;
boolean ok = false;
do {
try {
result = channel.write(buffer);
ok = true;
} catch (IOException e) {
channel.suspendWrites();
pooled.free();
invokeChannelExceptionHandler(channel, exceptionHandler, e);
return;
} finally {
if (! ok) {
pooled.free();
}
}
if (result == 0) {
Channels.setWriteListener(channel, this);
channel.resumeWrites();
return;
}
} while (buffer.hasRemaining());
pooled.free();
invokeChannelListener(channel, delegate);
}
public String toString() {
return "Writing channel listener -> " + delegate;
}
};
}
A sending channel listener. Writes the buffer to the channel and then calls the delegate listener. Calls the exception
handler if an exception occurs. The delegate listener should ensure that the channel write listener is appropriately set.
The returned listener is stateful and will not execute properly if reused.
Params: - pooled – the buffer to send
- delegate – the delegate listener
- exceptionHandler – the exception handler
Type parameters: - <T> – the channel type
Returns: the sending channel listener
/**
* A sending channel listener. Writes the buffer to the channel and then calls the delegate listener. Calls the exception
* handler if an exception occurs. The delegate listener should ensure that the channel write listener is appropriately set.
* <p>
* The returned listener is stateful and will not execute properly if reused.
*
* @param pooled the buffer to send
* @param delegate the delegate listener
* @param exceptionHandler the exception handler
* @param <T> the channel type
* @return the sending channel listener
*/
public static <T extends WritableMessageChannel> ChannelListener<T> sendingChannelListener(final Pooled<ByteBuffer> pooled, final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
return new ChannelListener<T>() {
public void handleEvent(final T channel) {
final ByteBuffer buffer = pooled.getResource();
boolean free = true;
try {
if (! (free = channel.send(buffer))) {
Channels.setWriteListener(channel, this);
channel.resumeWrites();
return;
}
} catch (IOException e) {
channel.suspendWrites();
pooled.free();
invokeChannelExceptionHandler(channel, exceptionHandler, e);
return;
} finally {
if (free) pooled.free();
}
invokeChannelListener(channel, delegate);
}
public String toString() {
return "Sending channel listener -> " + delegate;
}
};
}
A file-sending channel listener. Writes the file to the channel and then calls the delegate listener. Calls the exception
handler if an exception occurs. The delegate listener should ensure that the channel write listener is appropriately set.
The returned listener is stateful and will not execute properly if reused.
Params: - source – the file to read from
- position – the position in the source file to read from
- count – the number of bytes to read
- delegate – the listener to call when the file is sent
- exceptionHandler – the exception handler to call if a problem occurs
Type parameters: - <T> – the channel type
Returns: the channel listener
/**
* A file-sending channel listener. Writes the file to the channel and then calls the delegate listener. Calls the exception
* handler if an exception occurs. The delegate listener should ensure that the channel write listener is appropriately set.
* <p>
* The returned listener is stateful and will not execute properly if reused.
*
* @param source the file to read from
* @param position the position in the source file to read from
* @param count the number of bytes to read
* @param delegate the listener to call when the file is sent
* @param exceptionHandler the exception handler to call if a problem occurs
* @param <T> the channel type
* @return the channel listener
*/
public static <T extends StreamSinkChannel> ChannelListener<T> fileSendingChannelListener(final FileChannel source, final long position, final long count, final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
if (count == 0L) {
return delegatingChannelListener(delegate);
}
return new ChannelListener<T>() {
private long p = position;
private long cnt = count;
public void handleEvent(final T channel) {
long result;
long cnt = this.cnt;
long p = this.p;
try {
do {
try {
result = channel.transferFrom(source, p, cnt);
} catch (IOException e) {
invokeChannelExceptionHandler(channel, exceptionHandler, e);
return;
}
if (result == 0L) {
Channels.setWriteListener(channel, this);
channel.resumeWrites();
return;
}
p += result;
cnt -= result;
} while (cnt > 0L);
// cnt is 0
invokeChannelListener(channel, delegate);
return;
} finally {
this.p = p;
this.cnt = cnt;
}
}
public String toString() {
return "File sending channel listener (" + source + ") -> " + delegate;
}
};
}
A file-receiving channel listener. Writes the file from the channel and then calls the delegate listener. Calls the exception
handler if an exception occurs. The delegate listener should ensure that the channel read listener is appropriately set.
The returned listener is stateful and will not execute properly if reused.
Params: - target – the file to write to
- position – the position in the target file to write to
- count – the number of bytes to write
- delegate – the listener to call when the file is sent
- exceptionHandler – the exception handler to call if a problem occurs
Type parameters: - <T> – the channel type
Returns: the channel listener
/**
* A file-receiving channel listener. Writes the file from the channel and then calls the delegate listener. Calls the exception
* handler if an exception occurs. The delegate listener should ensure that the channel read listener is appropriately set.
* <p>
* The returned listener is stateful and will not execute properly if reused.
*
* @param target the file to write to
* @param position the position in the target file to write to
* @param count the number of bytes to write
* @param delegate the listener to call when the file is sent
* @param exceptionHandler the exception handler to call if a problem occurs
* @param <T> the channel type
* @return the channel listener
*/
public static <T extends StreamSourceChannel> ChannelListener<T> fileReceivingChannelListener(final FileChannel target, final long position, final long count, final ChannelListener<? super T> delegate, final ChannelExceptionHandler<? super T> exceptionHandler) {
if (count == 0L) {
return delegatingChannelListener(delegate);
}
return new ChannelListener<T>() {
private long p = position;
private long cnt = count;
public void handleEvent(final T channel) {
long result;
long cnt = this.cnt;
long p = this.p;
try {
do {
try {
result = channel.transferTo(p, cnt, target);
} catch (IOException e) {
invokeChannelExceptionHandler(channel, exceptionHandler, e);
return;
}
if (result == 0L) {
Channels.setReadListener(channel, this);
channel.resumeReads();
return;
}
p += result;
cnt -= result;
} while (cnt > 0L);
// cnt = 0
invokeChannelListener(channel, delegate);
return;
} finally {
this.p = p;
this.cnt = cnt;
}
}
public String toString() {
return "File receiving channel listener (" + target + ") -> " + delegate;
}
};
}
A delegating channel listener which passes an event to another listener of the same or a super type.
Params: - delegate – the delegate channel listener
Type parameters: - <T> – the channel type
Returns: the listener
/**
* A delegating channel listener which passes an event to another listener of the same or a super type.
*
* @param delegate the delegate channel listener
* @param <T> the channel type
* @return the listener
*/
public static <T extends Channel> ChannelListener<T> delegatingChannelListener(final ChannelListener<? super T> delegate) {
return new ChannelListener<T>() {
public void handleEvent(final T channel) {
invokeChannelListener(channel, delegate);
}
public String toString() {
return "Delegating channel listener -> " + delegate;
}
};
}
A delegating channel listener which passes an event to the listener stored in the given setter.
Params: - channel – the channel to pass in
- setter – the channel listener setter
Type parameters: Returns: the listener
/**
* A delegating channel listener which passes an event to the listener stored in the given setter.
*
* @param channel the channel to pass in
* @param setter the channel listener setter
* @param <C> the listener channel type
* @param <T> the passed in channel type
* @return the listener
*/
public static <C extends Channel, T extends Channel> ChannelListener<C> delegatingChannelListener(final T channel, final ChannelListener.SimpleSetter<T> setter) {
return new SetterDelegatingListener<C, T>(setter, channel);
}
A write-suspending channel listener. The returned listener will suspend writes when called. Useful for chaining
writing listeners to a flush listener to this listener. The delegate listener should ensure that the channel write listener is appropriately set.
Params: - delegate – the delegate channel listener
Returns: the suspending channel listener
/**
* A write-suspending channel listener. The returned listener will suspend writes when called. Useful for chaining
* writing listeners to a flush listener to this listener. The delegate listener should ensure that the channel write listener is appropriately set.
*
* @param delegate the delegate channel listener
* @return the suspending channel listener
*/
public static <T extends SuspendableWriteChannel> ChannelListener<T> writeSuspendingChannelListener(final ChannelListener<? super T> delegate) {
return new ChannelListener<T>() {
public void handleEvent(final T channel) {
channel.suspendWrites();
invokeChannelListener(channel, delegate);
}
public String toString() {
return "Write-suspending channel listener -> " + delegate;
}
};
}
A read-suspending channel listener. The returned listener will suspend read when called.
The delegate listener should ensure that the channel read listener is appropriately set.
Params: - delegate – the delegate channel listener
Returns: the suspending channel listener
/**
* A read-suspending channel listener. The returned listener will suspend read when called.
* The delegate listener should ensure that the channel read listener is appropriately set.
*
* @param delegate the delegate channel listener
* @return the suspending channel listener
*/
public static <T extends SuspendableReadChannel> ChannelListener<T> readSuspendingChannelListener(final ChannelListener<? super T> delegate) {
return new ChannelListener<T>() {
public void handleEvent(final T channel) {
channel.suspendReads();
invokeChannelListener(channel, delegate);
}
public String toString() {
return "Read-suspending channel listener -> " + delegate;
}
};
}
static final class TransferListener<I extends StreamSourceChannel, O extends StreamSinkChannel> implements ChannelListener<Channel> {
private final Pooled<ByteBuffer> pooledBuffer;
private final I source;
private final O sink;
private final ChannelListener<? super I> sourceListener;
private final ChannelListener<? super O> sinkListener;
private final ChannelExceptionHandler<? super O> writeExceptionHandler;
private final ChannelExceptionHandler<? super I> readExceptionHandler;
private long count;
private volatile int state;
TransferListener(final long count, final Pooled<ByteBuffer> pooledBuffer, final I source, final O sink, final ChannelListener<? super I> sourceListener, final ChannelListener<? super O> sinkListener, final ChannelExceptionHandler<? super O> writeExceptionHandler, final ChannelExceptionHandler<? super I> readExceptionHandler, final int state) {
this.count = count;
this.pooledBuffer = pooledBuffer;
this.source = source;
this.sink = sink;
this.sourceListener = sourceListener;
this.sinkListener = sinkListener;
this.writeExceptionHandler = writeExceptionHandler;
this.readExceptionHandler = readExceptionHandler;
this.state = state;
}
public void handleEvent(final Channel channel) {
final ByteBuffer buffer = pooledBuffer.getResource();
int state = this.state;
// always read after and write before state
long count = this.count;
long lres;
int ires;
switch (state) {
case 0: {
// read listener
for (;;) {
try {
lres = source.transferTo(count, buffer, sink);
} catch (IOException e) {
readFailed(e);
return;
}
if (lres == 0 && !buffer.hasRemaining()) {
this.count = count;
return;
}
if (lres == -1) {
// possibly unexpected EOF
if (count == Long.MAX_VALUE) {
// it's OK; just be done
done();
return;
} else {
readFailed(new EOFException());
return;
}
}
if (count != Long.MAX_VALUE) {
count -= lres;
}
while (buffer.hasRemaining()) {
try {
ires = sink.write(buffer);
} catch (IOException e) {
writeFailed(e);
return;
}
if (count != Long.MAX_VALUE) {
count -= ires;
}
if (ires == 0) {
this.count = count;
this.state = 1;
source.suspendReads();
sink.resumeWrites();
return;
}
}
if (count == 0) {
done();
return;
}
}
}
case 1: {
// write listener
for (;;) {
while (buffer.hasRemaining()) {
try {
ires = sink.write(buffer);
} catch (IOException e) {
writeFailed(e);
return;
}
if (count != Long.MAX_VALUE) {
count -= ires;
}
if (ires == 0) {
return;
}
}
try {
lres = source.transferTo(count, buffer, sink);
} catch (IOException e) {
readFailed(e);
return;
}
if (lres == 0 && !buffer.hasRemaining()) {
this.count = count;
this.state = 0;
sink.suspendWrites();
source.resumeReads();
return;
}
if (lres == -1) {
// possibly unexpected EOF
if (count == Long.MAX_VALUE) {
// it's OK; just be done
done();
return;
} else {
readFailed(new EOFException());
return;
}
}
if (count != Long.MAX_VALUE) {
count -= lres;
}
if (count == 0) {
done();
return;
}
}
}
}
}
private void writeFailed(final IOException e) {
try {
source.suspendReads();
sink.suspendWrites();
invokeChannelExceptionHandler(sink, writeExceptionHandler, e);
} finally {
pooledBuffer.free();
}
}
private void readFailed(final IOException e) {
try {
source.suspendReads();
sink.suspendWrites();
invokeChannelExceptionHandler(source, readExceptionHandler, e);
} finally {
pooledBuffer.free();
}
}
private void done() {
try {
final ChannelListener<? super I> sourceListener = this.sourceListener;
final ChannelListener<? super O> sinkListener = this.sinkListener;
final I source = this.source;
final O sink = this.sink;
Channels.setReadListener(source, sourceListener);
if (sourceListener == null) {
source.suspendReads();
} else {
source.wakeupReads();
}
Channels.setWriteListener(sink, sinkListener);
if (sinkListener == null) {
sink.suspendWrites();
} else {
sink.wakeupWrites();
}
} finally {
pooledBuffer.free();
}
}
public String toString() {
return "Transfer channel listener (" + source + " to " + sink + ") -> (" + sourceListener + " and " + sinkListener + ")";
}
}
Initiate a low-copy transfer between two stream channels. The pool should be a direct buffer pool for best
performance. The channels will be closed when the transfer completes or if there is an error.
Params: - source – the source channel
- sink – the target channel
- pool – the pool from which the transfer buffer should be allocated
Type parameters:
/**
* Initiate a low-copy transfer between two stream channels. The pool should be a direct buffer pool for best
* performance. The channels will be closed when the transfer completes or if there is an error.
*
* @param source the source channel
* @param sink the target channel
* @param pool the pool from which the transfer buffer should be allocated
* @param <I> the source stream type
* @param <O> the sink stream type
*/
public static <I extends StreamSourceChannel, O extends StreamSinkChannel> void initiateTransfer(final I source, final O sink, Pool<ByteBuffer> pool) {
initiateTransfer(Long.MAX_VALUE, source, sink, CLOSING_CHANNEL_LISTENER, CLOSING_CHANNEL_LISTENER, CLOSING_HANDLER, CLOSING_HANDLER, pool);
}
Initiate a low-copy transfer between two stream channels. The pool should be a direct buffer pool for best
performance.
Params: - count – the number of bytes to transfer, or
Long.MAX_VALUE
to transfer all remaining bytes - source – the source channel
- sink – the target channel
- sourceListener – the source listener to set and call when the transfer is complete, or
null
to clear the listener at that time - sinkListener – the target listener to set and call when the transfer is complete, or
null
to clear the listener at that time - readExceptionHandler – the read exception handler to call if an error occurs during a read operation
- writeExceptionHandler – the write exception handler to call if an error occurs during a write operation
- pool – the pool from which the transfer buffer should be allocated
/**
* Initiate a low-copy transfer between two stream channels. The pool should be a direct buffer pool for best
* performance.
*
* @param count the number of bytes to transfer, or {@link Long#MAX_VALUE} to transfer all remaining bytes
* @param source the source channel
* @param sink the target channel
* @param sourceListener the source listener to set and call when the transfer is complete, or {@code null} to clear the listener at that time
* @param sinkListener the target listener to set and call when the transfer is complete, or {@code null} to clear the listener at that time
* @param readExceptionHandler the read exception handler to call if an error occurs during a read operation
* @param writeExceptionHandler the write exception handler to call if an error occurs during a write operation
* @param pool the pool from which the transfer buffer should be allocated
*/
public static <I extends StreamSourceChannel, O extends StreamSinkChannel> void initiateTransfer(long count, final I source, final O sink, final ChannelListener<? super I> sourceListener, final ChannelListener<? super O> sinkListener, final ChannelExceptionHandler<? super I> readExceptionHandler, final ChannelExceptionHandler<? super O> writeExceptionHandler, Pool<ByteBuffer> pool) {
if (pool == null) {
throw msg.nullParameter("pool");
}
final Pooled<ByteBuffer> allocated = pool.allocate();
boolean free = true;
try {
final ByteBuffer buffer = allocated.getResource();
long transferred;
for(;;) {
try {
transferred = source.transferTo(count, buffer, sink);
} catch (IOException e) {
invokeChannelExceptionHandler(source, readExceptionHandler, e);
return;
}
if (transferred == 0 && !buffer.hasRemaining()) {
break;
}
if (transferred == -1) {
if (count == Long.MAX_VALUE) {
Channels.setReadListener(source, sourceListener);
if (sourceListener == null) {
source.suspendReads();
} else {
source.wakeupReads();
}
Channels.setWriteListener(sink, sinkListener);
if (sinkListener == null) {
sink.suspendWrites();
} else {
sink.wakeupWrites();
}
} else {
source.suspendReads();
sink.suspendWrites();
invokeChannelExceptionHandler(source, readExceptionHandler, new EOFException());
}
return;
}
if (count != Long.MAX_VALUE) {
count -= transferred;
}
while (buffer.hasRemaining()) {
final int res;
try {
res = sink.write(buffer);
} catch (IOException e) {
invokeChannelExceptionHandler(sink, writeExceptionHandler, e);
return;
}
if (res == 0) {
// write first listener
final TransferListener<I, O> listener = new TransferListener<I, O>(count, allocated, source, sink, sourceListener, sinkListener, writeExceptionHandler, readExceptionHandler, 1);
source.suspendReads();
source.getReadSetter().set(listener);
sink.getWriteSetter().set(listener);
sink.resumeWrites();
free = false;
return;
} else if (count != Long.MAX_VALUE) {
count -= res;
}
}
if (count == 0) {
//we are done
Channels.setReadListener(source, sourceListener);
if (sourceListener == null) {
source.suspendReads();
} else {
source.wakeupReads();
}
Channels.setWriteListener(sink, sinkListener);
if (sinkListener == null) {
sink.suspendWrites();
} else {
sink.wakeupWrites();
}
return;
}
}
// read first listener
final TransferListener<I, O> listener = new TransferListener<I, O>(count, allocated, source, sink, sourceListener, sinkListener, writeExceptionHandler, readExceptionHandler, 0);
sink.suspendWrites();
sink.getWriteSetter().set(listener);
source.getReadSetter().set(listener);
source.resumeReads();
free = false;
return;
} finally {
if (free) allocated.free();
}
}
Create a channel listener which automatically drains the given number of bytes from the channel and then calls
a listener.
Params: - bytes – the number of bytes to drain, or
Long.MAX_VALUE
to drain the channel completely - finishListener – the listener to call when the drain is complete
- exceptionHandler – the handler to call if the drain fails
Type parameters: - <T> – the channel type
Returns: the channel listener
/**
* Create a channel listener which automatically drains the given number of bytes from the channel and then calls
* a listener.
*
* @param bytes the number of bytes to drain, or {@code Long.MAX_VALUE} to drain the channel completely
* @param finishListener the listener to call when the drain is complete
* @param exceptionHandler the handler to call if the drain fails
* @param <T> the channel type
* @return the channel listener
*/
public static <T extends StreamSourceChannel> ChannelListener<T> drainListener(long bytes, ChannelListener<? super T> finishListener, ChannelExceptionHandler<? super T> exceptionHandler) {
return new DrainListener<T>(finishListener, exceptionHandler, bytes);
}
private static class DelegatingSetter<T extends Channel> implements ChannelListener.Setter<T> {
private final ChannelListener.Setter<? extends Channel> setter;
private final T realChannel;
DelegatingSetter(final ChannelListener.Setter<? extends Channel> setter, final T realChannel) {
this.setter = setter;
this.realChannel = realChannel;
}
public void set(final ChannelListener<? super T> channelListener) {
setter.set(channelListener == null ? null : new DelegatingChannelListener<T>(channelListener, realChannel));
}
public String toString() {
return "Delegating setter -> " + setter;
}
}
private static class DelegatingChannelListener<T extends Channel> implements ChannelListener<Channel> {
private final ChannelListener<? super T> channelListener;
private final T realChannel;
public DelegatingChannelListener(final ChannelListener<? super T> channelListener, final T realChannel) {
this.channelListener = channelListener;
this.realChannel = realChannel;
}
public void handleEvent(final Channel channel) {
invokeChannelListener(realChannel, channelListener);
}
public String toString() {
return "Delegating channel listener -> " + channelListener;
}
}
private static class SetterDelegatingListener<C extends Channel, T extends Channel> implements ChannelListener<C> {
private final SimpleSetter<T> setter;
private final T channel;
public SetterDelegatingListener(final SimpleSetter<T> setter, final T channel) {
this.setter = setter;
this.channel = channel;
}
public void handleEvent(final C channel) {
invokeChannelListener(this.channel, setter.get());
}
public String toString() {
return "Setter delegating channel listener -> " + setter;
}
}
private static final ChannelExceptionHandler<Channel> CLOSING_HANDLER = new ChannelExceptionHandler<Channel>() {
public void handleException(final Channel channel, final IOException exception) {
IoUtils.safeClose(channel);
}
};
private static class DrainListener<T extends StreamSourceChannel> implements ChannelListener<T> {
private final ChannelListener<? super T> finishListener;
private final ChannelExceptionHandler<? super T> exceptionHandler;
private long count;
private DrainListener(final ChannelListener<? super T> finishListener, final ChannelExceptionHandler<? super T> exceptionHandler, final long count) {
this.finishListener = finishListener;
this.exceptionHandler = exceptionHandler;
this.count = count;
}
public void handleEvent(final T channel) {
try {
long count = this.count;
try {
long res;
for (;;) {
res = Channels.drain(channel, count);
if (res == -1 || res == count) {
this.count = 0L;
invokeChannelListener(channel, finishListener);
return;
} else if (res == 0) {
return;
} else if (count < Long.MAX_VALUE) {
// MAX_VALUE means drain to EOF
count -= res;
}
}
} finally {
this.count = count;
}
} catch (IOException e) {
this.count = 0L;
if (exceptionHandler != null) {
invokeChannelExceptionHandler(channel, exceptionHandler, e);
} else {
IoUtils.safeShutdownReads(channel);
}
}
}
public String toString() {
return "Draining channel listener (" + count + " bytes) -> " + finishListener;
}
}
}