/*
* Copyright (c) 1997, 2013, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package com.sun.xml.internal.ws.api.pipe;
import com.sun.istack.internal.NotNull;
import com.sun.istack.internal.Nullable;
import com.sun.xml.internal.ws.api.Cancelable;
import com.sun.xml.internal.ws.api.Component;
import com.sun.xml.internal.ws.api.ComponentRegistry;
import com.sun.xml.internal.ws.api.SOAPVersion;
import com.sun.xml.internal.ws.api.addressing.AddressingVersion;
import com.sun.xml.internal.ws.api.message.AddressingUtils;
import com.sun.xml.internal.ws.api.message.Packet;
import com.sun.xml.internal.ws.api.pipe.helper.AbstractFilterTubeImpl;
import com.sun.xml.internal.ws.api.pipe.helper.AbstractTubeImpl;
import com.sun.xml.internal.ws.api.server.Adapter;
import com.sun.xml.internal.ws.api.server.Container;
import com.sun.xml.internal.ws.api.server.ContainerResolver;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.xml.ws.Holder;
import javax.xml.ws.WebServiceException;
User-level thread. Represents the execution of one request/response processing.
JAX-WS RI is capable of running a large number of request/response concurrently by using a relatively small number of threads. This is made possible by utilizing a Fiber
— a user-level thread that gets created for each request/response processing.
A fiber remembers where in the pipeline the processing is at, what needs to be
executed on the way out (when processing response), and other additional information
specific to the execution of a particular request/response.
Suspend/Resume
Fiber can be suspended
by a Tube
. When a fiber is suspended, it will be kept on the side until it is resumed
. This allows threads to go execute other runnable fibers, allowing efficient utilization of smaller number of threads.
Context-switch Interception
FiberContextSwitchInterceptor
allows Tube
s and Adapter
s to perform additional processing every time a thread starts running a fiber and stops running it.
Context ClassLoader
Just like thread, a fiber has a context class loader (CCL.) A fiber's CCL
becomes the thread's CCL when it's executing the fiber. The original CCL
of the thread will be restored when the thread leaves the fiber execution.
Debugging Aid
Because Fiber
doesn't keep much in the call stack, and instead use conts
to store the continuation, debugging fiber related activities could be harder.
Setting the LOGGER
for FINE would give you basic start/stop/resume/suspend level logging. Using FINER would cause more detailed logging, which includes what tubes are executed in what order and how they behaved.
When you debug the server side, consider setting serializeExecution
to true, so that execution of fibers are serialized. Debugging a server with more than one running threads is very tricky, and this switch will prevent that. This can be also enabled by setting the system property on. See the source code. Author: Kohsuke Kawaguchi, Jitendra Kotamraju
/**
* User-level thread. Represents the execution of one request/response processing.
* <p/>
* <p/>
* JAX-WS RI is capable of running a large number of request/response concurrently by
* using a relatively small number of threads. This is made possible by utilizing
* a {@link Fiber} — a user-level thread that gets created for each request/response
* processing.
* <p/>
* <p/>
* A fiber remembers where in the pipeline the processing is at, what needs to be
* executed on the way out (when processing response), and other additional information
* specific to the execution of a particular request/response.
* <p/>
* <h2>Suspend/Resume</h2>
* <p/>
* Fiber can be {@link NextAction#suspend() suspended} by a {@link Tube}.
* When a fiber is suspended, it will be kept on the side until it is
* {@link #resume(Packet) resumed}. This allows threads to go execute
* other runnable fibers, allowing efficient utilization of smaller number of
* threads.
* <p/>
* <h2>Context-switch Interception</h2>
* <p/>
* {@link FiberContextSwitchInterceptor} allows {@link Tube}s and {@link Adapter}s
* to perform additional processing every time a thread starts running a fiber
* and stops running it.
* <p/>
* <h2>Context ClassLoader</h2>
* <p/>
* Just like thread, a fiber has a context class loader (CCL.) A fiber's CCL
* becomes the thread's CCL when it's executing the fiber. The original CCL
* of the thread will be restored when the thread leaves the fiber execution.
* <p/>
* <p/>
* <h2>Debugging Aid</h2>
* <p/>
* Because {@link Fiber} doesn't keep much in the call stack, and instead use
* {@link #conts} to store the continuation, debugging fiber related activities
* could be harder.
* <p/>
* <p/>
* Setting the {@link #LOGGER} for FINE would give you basic start/stop/resume/suspend
* level logging. Using FINER would cause more detailed logging, which includes
* what tubes are executed in what order and how they behaved.
* <p/>
* <p/>
* When you debug the server side, consider setting {@link Fiber#serializeExecution}
* to true, so that execution of fibers are serialized. Debugging a server
* with more than one running threads is very tricky, and this switch will
* prevent that. This can be also enabled by setting the system property on.
* See the source code.
*
* @author Kohsuke Kawaguchi
* @author Jitendra Kotamraju
*/
public final class Fiber implements Runnable, Cancelable, ComponentRegistry {
Callback interface for notification of suspend and resume.
Since: 2.2.6 Deprecated: Use NextAction.suspend(Runnable)
/**
* Callback interface for notification of suspend and resume.
*
* @since 2.2.6
* @deprecated Use {@link NextAction#suspend(Runnable)}
*/
public interface Listener {
Fiber has been suspended. Implementations of this callback may resume the Fiber.
Params: - fiber – Fiber
/**
* Fiber has been suspended. Implementations of this callback may resume the Fiber.
* @param fiber Fiber
*/
public void fiberSuspended(Fiber fiber);
Fiber has been resumed. Behavior is undefined if implementations of this callback attempt to suspend the Fiber.
Params: - fiber – Fiber
/**
* Fiber has been resumed. Behavior is undefined if implementations of this callback attempt to suspend the Fiber.
* @param fiber Fiber
*/
public void fiberResumed(Fiber fiber);
}
private final List<Listener> _listeners = new ArrayList<Listener>();
Adds suspend/resume callback listener
Params: - listener – Listener
Since: 2.2.6 Deprecated:
/**
* Adds suspend/resume callback listener
* @param listener Listener
* @since 2.2.6
* @deprecated
*/
public void addListener(Listener listener) {
synchronized(_listeners) {
if (!_listeners.contains(listener)) {
_listeners.add(listener);
}
}
}
Removes suspend/resume callback listener
Params: - listener – Listener
Since: 2.2.6 Deprecated:
/**
* Removes suspend/resume callback listener
* @param listener Listener
* @since 2.2.6
* @deprecated
*/
public void removeListener(Listener listener) {
synchronized(_listeners) {
_listeners.remove(listener);
}
}
List<Listener> getCurrentListeners() {
synchronized(_listeners) {
return new ArrayList<Listener>(_listeners);
}
}
private void clearListeners() {
synchronized(_listeners) {
_listeners.clear();
}
}
Tube
s whose Tube.processResponse(Packet)
method needs to be invoked on the way back. /**
* {@link Tube}s whose {@link Tube#processResponse(Packet)} method needs
* to be invoked on the way back.
*/
private Tube[] conts = new Tube[16];
private int contsSize;
If this field is non-null, the next instruction to execute is to call its Tube.processRequest(Packet)
. Otherwise the instruction is to call conts
. /**
* If this field is non-null, the next instruction to execute is
* to call its {@link Tube#processRequest(Packet)}. Otherwise
* the instruction is to call {@link #conts}.
*/
private Tube next;
private Packet packet;
private Throwable/*but really it's either RuntimeException or Error*/ throwable;
public final Engine owner;
Is this thread suspended? 0=not suspended, 1=suspended.
Logically this is just a boolean, but we need to prepare for the case where the thread is resumed
before we get to the suspend()
. This happens when things happen in the following order:
- Tube decides that the fiber needs to be suspended to wait for the external event.
- Tube hooks up fiber with some external mechanism (like NIO channel selector)
- Tube returns with
NextAction.suspend()
. - "External mechanism" becomes signal state and invokes
resume(Packet)
to wake up fiber doRun
invokes suspend()
.
Using int, this will work OK because suspendedCount
becomes -1 when resume(Packet)
occurs before suspend()
.
Increment and decrement is guarded by 'this' object.
/**
* Is this thread suspended? 0=not suspended, 1=suspended.
* <p/>
* <p/>
* Logically this is just a boolean, but we need to prepare for the case
* where the thread is {@link #resume(Packet) resumed} before we get to the {@link #suspend()}.
* This happens when things happen in the following order:
* <p/>
* <ol>
* <li>Tube decides that the fiber needs to be suspended to wait for the external event.
* <li>Tube hooks up fiber with some external mechanism (like NIO channel selector)
* <li>Tube returns with {@link NextAction#suspend()}.
* <li>"External mechanism" becomes signal state and invokes {@link Fiber#resume(Packet)}
* to wake up fiber
* <li>{@link Fiber#doRun} invokes {@link Fiber#suspend()}.
* </ol>
* <p/>
* <p/>
* Using int, this will work OK because {@link #suspendedCount} becomes -1 when
* {@link #resume(Packet)} occurs before {@link #suspend()}.
* <p/>
* <p/>
* Increment and decrement is guarded by 'this' object.
*/
private volatile int suspendedCount = 0;
private volatile boolean isInsideSuspendCallbacks = false;
Is this Fiber
currently running in the synchronous mode? /**
* Is this {@link Fiber} currently running in the synchronous mode?
*/
private boolean synchronous;
private boolean interrupted;
private final int id;
Active FiberContextSwitchInterceptor
s for this fiber. /**
* Active {@link FiberContextSwitchInterceptor}s for this fiber.
*/
private List<FiberContextSwitchInterceptor> interceptors;
Fiber's context ClassLoader
. /**
* Fiber's context {@link ClassLoader}.
*/
private
@Nullable
ClassLoader contextClassLoader;
private
@Nullable
CompletionCallback completionCallback;
private boolean isDeliverThrowableInPacket = false;
public void setDeliverThrowableInPacket(boolean isDeliverThrowableInPacket) {
this.isDeliverThrowableInPacket = isDeliverThrowableInPacket;
}
The thread on which this Fiber is currently executing, if applicable.
/**
* The thread on which this Fiber is currently executing, if applicable.
*/
private Thread currentThread;
Replace uses of synchronized(this) with this lock so that we can control
unlocking for resume use cases
/**
* Replace uses of synchronized(this) with this lock so that we can control
* unlocking for resume use cases
*/
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private volatile boolean isCanceled;
Set to true if this fiber is started asynchronously, to avoid
doubly-invoking completion code.
/**
* Set to true if this fiber is started asynchronously, to avoid
* doubly-invoking completion code.
*/
private boolean started;
Set to true if this fiber is started sync but allowed to run async.
This property exists for use cases where the processing model is fundamentally async
but some requirement or feature mandates that part of the tubeline run synchronously. For
instance, WS-ReliableMessaging with non-anonymous addressing is compatible with running
asynchronously, but if in-order message delivery is used then message processing must assign
a message number before the remainder of the processing can be asynchronous.
/**
* Set to true if this fiber is started sync but allowed to run async.
* This property exists for use cases where the processing model is fundamentally async
* but some requirement or feature mandates that part of the tubeline run synchronously. For
* instance, WS-ReliableMessaging with non-anonymous addressing is compatible with running
* asynchronously, but if in-order message delivery is used then message processing must assign
* a message number before the remainder of the processing can be asynchronous.
*/
private boolean startedSync;
Callback to be invoked when a Fiber
finishes execution. /**
* Callback to be invoked when a {@link Fiber} finishes execution.
*/
public interface CompletionCallback {
Indicates that the fiber has finished its execution.
Since the JAX-WS RI runs asynchronously,
this method maybe invoked by a different thread
than any of the threads that started it or run a part of tubeline.
/**
* Indicates that the fiber has finished its execution.
* <p/>
* <p/>
* Since the JAX-WS RI runs asynchronously,
* this method maybe invoked by a different thread
* than any of the threads that started it or run a part of tubeline.
*/
void onCompletion(@NotNull Packet response);
Indicates that the fiber has finished abnormally, by throwing a given Throwable
. /**
* Indicates that the fiber has finished abnormally, by throwing a given {@link Throwable}.
*/
void onCompletion(@NotNull Throwable error);
}
Fiber(Engine engine) {
this.owner = engine;
id = iotaGen.incrementAndGet();
if (isTraceEnabled()) {
LOGGER.log(Level.FINE, "{0} created", getName());
}
// if this is run from another fiber, then we naturally inherit its context classloader,
// so this code works for fiber->fiber inheritance just fine.
contextClassLoader = Thread.currentThread().getContextClassLoader();
}
Starts the execution of this fiber asynchronously.
This method works like Thread.start()
. Params: - tubeline – The first tube of the tubeline that will act on the packet.
- request – The request packet to be passed to
startPoint.processRequest()
. - completionCallback – The callback to be invoked when the processing is finished and the
final response packet is available.
See Also:
/**
* Starts the execution of this fiber asynchronously.
* <p/>
* <p/>
* This method works like {@link Thread#start()}.
*
* @param tubeline The first tube of the tubeline that will act on the packet.
* @param request The request packet to be passed to {@code startPoint.processRequest()}.
* @param completionCallback The callback to be invoked when the processing is finished and the
* final response packet is available.
* @see #runSync(Tube, Packet)
*/
public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback) {
start(tubeline, request, completionCallback, false);
}
private void dumpFiberContext(String desc) {
if(isTraceEnabled()) {
String action = null;
String msgId = null;
if (packet != null) {
for (SOAPVersion sv: SOAPVersion.values()) {
for (AddressingVersion av: AddressingVersion.values()) {
action = packet.getMessage() != null ? AddressingUtils.getAction(packet.getMessage().getHeaders(), av, sv) : null;
msgId = packet.getMessage() != null ? AddressingUtils.getMessageID(packet.getMessage().getHeaders(), av, sv) : null;
if (action != null || msgId != null) {
break;
}
}
if (action != null || msgId != null) {
break;
}
}
}
String actionAndMsgDesc;
if (action == null && msgId == null) {
actionAndMsgDesc = "NO ACTION or MSG ID";
} else {
actionAndMsgDesc = "'" + action + "' and msgId '" + msgId + "'";
}
String tubeDesc;
if (next != null) {
tubeDesc = next.toString() + ".processRequest()";
} else {
tubeDesc = peekCont() + ".processResponse()";
}
LOGGER.log(Level.FINE, "{0} {1} with {2} and ''current'' tube {3} from thread {4} with Packet: {5}", new Object[]{getName(), desc, actionAndMsgDesc, tubeDesc, Thread.currentThread().getName(), packet != null ? packet.toShortString() : null});
}
}
Starts the execution of this fiber.
If forceSync is true, then the fiber is started for an ostensibly async invocation,
but allows for some portion of the tubeline to run sync with the calling
client instance (Port/Dispatch instance). This allows tubes that enforce
ordering to see requests in the order they were sent at the point the
client invoked them.
The forceSync parameter will be true only when the caller (e.g. AsyncInvoker or
SEIStub) knows one or more tubes need to enforce ordering and thus need
to run sync with the client. Such tubes can return
NextAction.INVOKE_ASYNC to indicate that the next tube in the tubeline
should be invoked async to the current thread.
This method works like Thread.start()
.
Params: - tubeline –
The first tube of the tubeline that will act on the packet.
- request – The request packet to be passed to
startPoint.processRequest()
. - completionCallback –
The callback to be invoked when the processing is finished and the
final response packet is available.
See Also: Since: 2.2.6
/**
* Starts the execution of this fiber.
*
* If forceSync is true, then the fiber is started for an ostensibly async invocation,
* but allows for some portion of the tubeline to run sync with the calling
* client instance (Port/Dispatch instance). This allows tubes that enforce
* ordering to see requests in the order they were sent at the point the
* client invoked them.
* <p>
* The forceSync parameter will be true only when the caller (e.g. AsyncInvoker or
* SEIStub) knows one or more tubes need to enforce ordering and thus need
* to run sync with the client. Such tubes can return
* NextAction.INVOKE_ASYNC to indicate that the next tube in the tubeline
* should be invoked async to the current thread.
*
* <p>
* This method works like {@link Thread#start()}.
*
* @param tubeline
* The first tube of the tubeline that will act on the packet.
* @param request
* The request packet to be passed to {@code startPoint.processRequest()}.
* @param completionCallback
* The callback to be invoked when the processing is finished and the
* final response packet is available.
*
* @see #start(Tube,Packet,CompletionCallback)
* @see #runSync(Tube,Packet)
* @since 2.2.6
*/
public void start(@NotNull Tube tubeline, @NotNull Packet request, @Nullable CompletionCallback completionCallback, boolean forceSync) {
next = tubeline;
this.packet = request;
this.completionCallback = completionCallback;
if (forceSync) {
this.startedSync = true;
dumpFiberContext("starting (sync)");
run();
} else {
this.started = true;
dumpFiberContext("starting (async)");
owner.addRunnable(this);
}
}
Wakes up a suspended fiber.
If a fiber was suspended without specifying the next Tube
, then the execution will be resumed in the response processing direction, by calling the Tube.processResponse(Packet)
method on the next/first Tube
in the Fiber
's processing stack with the specified resume packet as the parameter.
If a fiber was suspended with specifying the next Tube
, then the execution will be resumed in the request processing direction, by calling the next tube's Tube.processRequest(Packet)
method with the specified resume packet as the parameter.
This method is implemented in a race-free way. Another thread can invoke this method even before this fiber goes into the suspension mode. So the caller need not worry about synchronizing NextAction.suspend()
and this method. Params: - resumePacket – packet used in the resumed processing
/**
* Wakes up a suspended fiber.
* <p/>
* <p/>
* If a fiber was suspended without specifying the next {@link Tube},
* then the execution will be resumed in the response processing direction,
* by calling the {@link Tube#processResponse(Packet)} method on the next/first
* {@link Tube} in the {@link Fiber}'s processing stack with the specified resume
* packet as the parameter.
* <p/>
* <p/>
* If a fiber was suspended with specifying the next {@link Tube},
* then the execution will be resumed in the request processing direction,
* by calling the next tube's {@link Tube#processRequest(Packet)} method with the
* specified resume packet as the parameter.
* <p/>
* <p/>
* This method is implemented in a race-free way. Another thread can invoke
* this method even before this fiber goes into the suspension mode. So the caller
* need not worry about synchronizing {@link NextAction#suspend()} and this method.
*
* @param resumePacket packet used in the resumed processing
*/
public void resume(@NotNull Packet resumePacket) {
resume(resumePacket, false);
}
Similar to resume(Packet) but allowing the Fiber to be resumed
synchronously (in the current Thread). If you want to know when the
fiber completes (not when this method returns) then add/wrap a
CompletionCallback on this Fiber.
For example, an asynchronous response endpoint that supports WS-ReliableMessaging
including in-order message delivery may need to resume the Fiber synchronously
until message order is confirmed prior to returning to asynchronous processing.
Since: 2.2.6
/**
* Similar to resume(Packet) but allowing the Fiber to be resumed
* synchronously (in the current Thread). If you want to know when the
* fiber completes (not when this method returns) then add/wrap a
* CompletionCallback on this Fiber.
* For example, an asynchronous response endpoint that supports WS-ReliableMessaging
* including in-order message delivery may need to resume the Fiber synchronously
* until message order is confirmed prior to returning to asynchronous processing.
* @since 2.2.6
*/
public void resume(@NotNull Packet resumePacket,
boolean forceSync) {
resume(resumePacket, forceSync, null);
}
Similar to resume(Packet, boolean) but allowing the Fiber to be resumed
and at the same time atomically assign a new CompletionCallback to it.
Since: 2.2.6
/**
* Similar to resume(Packet, boolean) but allowing the Fiber to be resumed
* and at the same time atomically assign a new CompletionCallback to it.
* @since 2.2.6
*/
public void resume(@NotNull Packet resumePacket,
boolean forceSync,
CompletionCallback callback) {
lock.lock();
try {
if (callback != null) {
setCompletionCallback(callback);
}
if(isTraceEnabled())
LOGGER.log(Level.FINE, "{0} resuming. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount-1});
packet = resumePacket;
if( --suspendedCount == 0 ) {
if (!isInsideSuspendCallbacks) {
List<Listener> listeners = getCurrentListeners();
for (Listener listener: listeners) {
try {
listener.fiberResumed(this);
} catch (Throwable e) {
if (isTraceEnabled())
LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
}
}
if(synchronous) {
condition.signalAll();
} else if (forceSync || startedSync) {
run();
} else {
dumpFiberContext("resuming (async)");
owner.addRunnable(this);
}
}
} else {
if (isTraceEnabled()) {
LOGGER.log(Level.FINE, "{0} taking no action on resume because suspendedCount != 0: {1}", new Object[]{getName(), suspendedCount});
}
}
} finally {
lock.unlock();
}
}
Wakes up a suspended fiber and begins response processing.
Since: 2.2.6
/**
* Wakes up a suspended fiber and begins response processing.
* @since 2.2.6
*/
public void resumeAndReturn(@NotNull Packet resumePacket,
boolean forceSync) {
if(isTraceEnabled())
LOGGER.log(Level.FINE, "{0} resumed with Return Packet", getName());
next = null;
resume(resumePacket, forceSync);
}
Wakes up a suspended fiber with an exception.
The execution of the suspended fiber will be resumed in the response processing direction, by calling the Tube.processException(Throwable)
method on the next/first Tube
in the Fiber
's processing stack with the specified exception as the parameter.
This method is implemented in a race-free way. Another thread can invoke this method even before this fiber goes into the suspension mode. So the caller need not worry about synchronizing NextAction.suspend()
and this method. Params: - throwable – exception that is used in the resumed processing
/**
* Wakes up a suspended fiber with an exception.
* <p/>
* <p/>
* The execution of the suspended fiber will be resumed in the response
* processing direction, by calling the {@link Tube#processException(Throwable)} method
* on the next/first {@link Tube} in the {@link Fiber}'s processing stack with
* the specified exception as the parameter.
* <p/>
* <p/>
* This method is implemented in a race-free way. Another thread can invoke
* this method even before this fiber goes into the suspension mode. So the caller
* need not worry about synchronizing {@link NextAction#suspend()} and this method.
*
* @param throwable exception that is used in the resumed processing
*/
public void resume(@NotNull Throwable throwable) {
resume(throwable, packet, false);
}
Wakes up a suspended fiber with an exception.
The execution of the suspended fiber will be resumed in the response processing direction, by calling the Tube.processException(Throwable)
method on the next/first Tube
in the Fiber
's processing stack with the specified exception as the parameter.
This method is implemented in a race-free way. Another thread can invoke this method even before this fiber goes into the suspension mode. So the caller need not worry about synchronizing NextAction.suspend()
and this method. Params: - throwable – exception that is used in the resumed processing
- packet – Packet that will be visible on the Fiber after the resume
Since: 2.2.8
/**
* Wakes up a suspended fiber with an exception.
* <p/>
* <p/>
* The execution of the suspended fiber will be resumed in the response
* processing direction, by calling the {@link Tube#processException(Throwable)} method
* on the next/first {@link Tube} in the {@link Fiber}'s processing stack with
* the specified exception as the parameter.
* <p/>
* <p/>
* This method is implemented in a race-free way. Another thread can invoke
* this method even before this fiber goes into the suspension mode. So the caller
* need not worry about synchronizing {@link NextAction#suspend()} and this method.
*
* @param throwable exception that is used in the resumed processing
* @param packet Packet that will be visible on the Fiber after the resume
* @since 2.2.8
*/
public void resume(@NotNull Throwable throwable, @NotNull Packet packet) {
resume(throwable, packet, false);
}
Wakes up a suspend fiber with an exception.
If forceSync is true, then the suspended fiber will resume with
synchronous processing on the current thread. This will continue
until some Tube indicates that it is safe to switch to asynchronous
processing.
Params: - error – exception that is used in the resumed processing
- forceSync – if processing begins synchronously
Since: 2.2.6
/**
* Wakes up a suspend fiber with an exception.
*
* If forceSync is true, then the suspended fiber will resume with
* synchronous processing on the current thread. This will continue
* until some Tube indicates that it is safe to switch to asynchronous
* processing.
*
* @param error exception that is used in the resumed processing
* @param forceSync if processing begins synchronously
* @since 2.2.6
*/
public void resume(@NotNull Throwable error,
boolean forceSync) {
resume(error, packet, forceSync);
}
Wakes up a suspend fiber with an exception.
If forceSync is true, then the suspended fiber will resume with
synchronous processing on the current thread. This will continue
until some Tube indicates that it is safe to switch to asynchronous
processing.
Params: - error – exception that is used in the resumed processing
- packet – Packet that will be visible on the Fiber after the resume
- forceSync – if processing begins synchronously
Since: 2.2.8
/**
* Wakes up a suspend fiber with an exception.
*
* If forceSync is true, then the suspended fiber will resume with
* synchronous processing on the current thread. This will continue
* until some Tube indicates that it is safe to switch to asynchronous
* processing.
*
* @param error exception that is used in the resumed processing
* @param packet Packet that will be visible on the Fiber after the resume
* @param forceSync if processing begins synchronously
* @since 2.2.8
*/
public void resume(@NotNull Throwable error,
@NotNull Packet packet,
boolean forceSync) {
if(isTraceEnabled())
LOGGER.log(Level.FINE, "{0} resumed with Return Throwable", getName());
next = null;
throwable = error;
resume(packet, forceSync);
}
Marks this Fiber as cancelled. A cancelled Fiber will never invoke its completion callback
Params: - mayInterrupt – if cancel should use
Thread.interrupt()
See Also: Since: 2.2.6
/**
* Marks this Fiber as cancelled. A cancelled Fiber will never invoke its completion callback
* @param mayInterrupt if cancel should use {@link Thread#interrupt()}
* @see java.util.concurrent.Future#cancel(boolean)
* @since 2.2.6
*/
@Override
public void cancel(boolean mayInterrupt) {
isCanceled = true;
if (mayInterrupt) {
// synchronized(this) is used as Thread running Fiber will be holding lock
synchronized(this) {
if (currentThread != null)
currentThread.interrupt();
}
}
}
Suspends this fiber's execution until the resume method is invoked.
The call returns immediately, and when the fiber is resumed
the execution picks up from the last scheduled continuation.
Params: - onExitRunnable – runnable to be invoked after fiber is marked for suspension
Returns: if control loop must exit
/**
* Suspends this fiber's execution until the resume method is invoked.
* <p/>
* The call returns immediately, and when the fiber is resumed
* the execution picks up from the last scheduled continuation.
* @param onExitRunnable runnable to be invoked after fiber is marked for suspension
* @return if control loop must exit
*/
private boolean suspend(Holder<Boolean> isRequireUnlock, Runnable onExitRunnable) {
if(isTraceEnabled()) {
LOGGER.log(Level.FINE, "{0} suspending. Will have suspendedCount={1}", new Object[]{getName(), suspendedCount+1});
if (suspendedCount > 0) {
LOGGER.log(Level.FINE, "WARNING - {0} suspended more than resumed. Will require more than one resume to actually resume this fiber.", getName());
}
}
List<Listener> listeners = getCurrentListeners();
if (++suspendedCount == 1) {
isInsideSuspendCallbacks = true;
try {
for (Listener listener: listeners) {
try {
listener.fiberSuspended(this);
} catch (Throwable e) {
if(isTraceEnabled())
LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
}
}
} finally {
isInsideSuspendCallbacks = false;
}
}
if (suspendedCount <= 0) {
// suspend callback caused fiber to resume
for (Listener listener: listeners) {
try {
listener.fiberResumed(this);
} catch (Throwable e) {
if(isTraceEnabled())
LOGGER.log(Level.FINE, "Listener {0} threw exception: {1}", new Object[]{listener, e.getMessage()});
}
}
} else if (onExitRunnable != null) {
// synchronous use cases cannot disconnect from the current thread
if (!synchronous) {
/* INTENTIONALLY UNLOCKING EARLY */
synchronized(this) {
// currentThread is protected by the monitor for this fiber so
// that it is accessible to cancel() even when the lock is held
currentThread = null;
}
lock.unlock();
assert(!lock.isHeldByCurrentThread());
isRequireUnlock.value = Boolean.FALSE;
try {
onExitRunnable.run();
} catch(Throwable t) {
throw new OnExitRunnableException(t);
}
return true;
} else {
// for synchronous we will stay with current thread, so do not disconnect
if (isTraceEnabled())
LOGGER.fine("onExitRunnable used with synchronous Fiber execution -- not exiting current thread");
onExitRunnable.run();
}
}
return false;
}
private static final class OnExitRunnableException extends RuntimeException {
private static final long serialVersionUID = 1L;
Throwable target;
public OnExitRunnableException(Throwable target) {
super((Throwable)null); // see pattern for InvocationTargetException
this.target = target;
}
}
Adds a new FiberContextSwitchInterceptor
to this fiber.
The newly installed fiber will take effect immediately after the current tube returns from its Tube.processRequest(Packet)
or Tube.processResponse(Packet)
, before the next tube begins processing.
So when the tubeline consists of X and Y, and when X installs an interceptor,
the order of execution will be as follows:
- X.processRequest()
- interceptor gets installed
- interceptor.execute() is invoked
- Y.processRequest()
/**
* Adds a new {@link FiberContextSwitchInterceptor} to this fiber.
* <p/>
* <p/>
* The newly installed fiber will take effect immediately after the current
* tube returns from its {@link Tube#processRequest(Packet)} or
* {@link Tube#processResponse(Packet)}, before the next tube begins processing.
* <p/>
* <p/>
* So when the tubeline consists of X and Y, and when X installs an interceptor,
* the order of execution will be as follows:
* <p/>
* <ol>
* <li>X.processRequest()
* <li>interceptor gets installed
* <li>interceptor.execute() is invoked
* <li>Y.processRequest()
* </ol>
*/
public synchronized void addInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
if (interceptors == null) {
interceptors = new ArrayList<FiberContextSwitchInterceptor>();
} else {
List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>();
l.addAll(interceptors);
interceptors = l;
}
interceptors.add(interceptor);
}
Removes a FiberContextSwitchInterceptor
from this fiber.
The removal of the interceptor takes effect immediately after the current tube returns from its Tube.processRequest(Packet)
or Tube.processResponse(Packet)
, before the next tube begins processing.
So when the tubeline consists of X and Y, and when Y uninstalls an interceptor
on the way out, then the order of execution will be as follows:
- Y.processResponse() (notice that this happens with interceptor.execute() in the callstack)
- interceptor gets uninstalled
- interceptor.execute() returns
- X.processResponse()
Returns: true if the specified interceptor was removed. False if
the specified interceptor was not registered with this fiber to begin with.
/**
* Removes a {@link FiberContextSwitchInterceptor} from this fiber.
* <p/>
* <p/>
* The removal of the interceptor takes effect immediately after the current
* tube returns from its {@link Tube#processRequest(Packet)} or
* {@link Tube#processResponse(Packet)}, before the next tube begins processing.
* <p/>
* <p/>
* <p/>
* So when the tubeline consists of X and Y, and when Y uninstalls an interceptor
* on the way out, then the order of execution will be as follows:
* <p/>
* <ol>
* <li>Y.processResponse() (notice that this happens with interceptor.execute() in the callstack)
* <li>interceptor gets uninstalled
* <li>interceptor.execute() returns
* <li>X.processResponse()
* </ol>
*
* @return true if the specified interceptor was removed. False if
* the specified interceptor was not registered with this fiber to begin with.
*/
public synchronized boolean removeInterceptor(@NotNull FiberContextSwitchInterceptor interceptor) {
if (interceptors != null) {
boolean result = interceptors.remove(interceptor);
if (interceptors.isEmpty())
interceptors = null;
else {
List<FiberContextSwitchInterceptor> l = new ArrayList<FiberContextSwitchInterceptor>();
l.addAll(interceptors);
interceptors = l;
}
return result;
}
return false;
}
Gets the context ClassLoader
of this fiber. /**
* Gets the context {@link ClassLoader} of this fiber.
*/
public
@Nullable
ClassLoader getContextClassLoader() {
return contextClassLoader;
}
Sets the context ClassLoader
of this fiber. /**
* Sets the context {@link ClassLoader} of this fiber.
*/
public ClassLoader setContextClassLoader(@Nullable ClassLoader contextClassLoader) {
ClassLoader r = this.contextClassLoader;
this.contextClassLoader = contextClassLoader;
return r;
}
DO NOT CALL THIS METHOD. This is an implementation detail of Fiber
. /**
* DO NOT CALL THIS METHOD. This is an implementation detail
* of {@link Fiber}.
*/
@Deprecated
@Override
public void run() {
Container old = ContainerResolver.getDefault().enterContainer(owner.getContainer());
try {
assert !synchronous;
// doRun returns true to indicate an early exit from fiber processing
if (!doRun()) {
if (startedSync && suspendedCount == 0 &&
(next != null || contsSize > 0)) {
// We bailed out of running this fiber we started as sync, and now
// want to finish running it async
startedSync = false;
// Start back up as an async fiber
dumpFiberContext("restarting (async) after startSync");
owner.addRunnable(this);
} else {
completionCheck();
}
}
} finally {
ContainerResolver.getDefault().exitContainer(old);
}
}
Runs a given Tube
(and everything thereafter) synchronously.
This method blocks and returns only when all the successive Tube
s complete their request/response processing. This method can be used if a Tube
needs to fallback to synchronous processing.
Example:
class FooTube extends AbstractFilterTubeImpl
{ NextAction processRequest(Packet request) { // run everything synchronously and return with the response packet return doReturnWith(Fiber.current().runSync(next,request)); } NextAction processResponse(Packet response) { // never be invoked } }
Params: - tubeline – The first tube of the tubeline that will act on the packet.
- request – The request packet to be passed to
startPoint.processRequest()
.
See Also: Returns: The response packet to the request
.
/**
* Runs a given {@link Tube} (and everything thereafter) synchronously.
* <p/>
* <p/>
* This method blocks and returns only when all the successive {@link Tube}s
* complete their request/response processing. This method can be used
* if a {@link Tube} needs to fallback to synchronous processing.
* <p/>
* <h3>Example:</h3>
* <pre>
* class FooTube extends {@link AbstractFilterTubeImpl} {
* NextAction processRequest(Packet request) {
* // run everything synchronously and return with the response packet
* return doReturnWith(Fiber.current().runSync(next,request));
* }
* NextAction processResponse(Packet response) {
* // never be invoked
* }
* }
* </pre>
*
* @param tubeline The first tube of the tubeline that will act on the packet.
* @param request The request packet to be passed to {@code startPoint.processRequest()}.
* @return The response packet to the {@code request}.
* @see #start(Tube, Packet, CompletionCallback)
*/
public
@NotNull
Packet runSync(@NotNull Tube tubeline, @NotNull Packet request) {
lock.lock();
try {
// save the current continuation, so that we return runSync() without executing them.
final Tube[] oldCont = conts;
final int oldContSize = contsSize;
final boolean oldSynchronous = synchronous;
final Tube oldNext = next;
if (oldContSize > 0) {
conts = new Tube[16];
contsSize = 0;
}
try {
synchronous = true;
this.packet = request;
next = tubeline;
doRun();
if (throwable != null) {
if (isDeliverThrowableInPacket) {
packet.addSatellite(new ThrowableContainerPropertySet(throwable));
} else {
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
}
if (throwable instanceof Error) {
throw (Error) throwable;
}
// our system is supposed to only accept Error or RuntimeException
throw new AssertionError(throwable);
}
}
return this.packet;
} finally {
conts = oldCont;
contsSize = oldContSize;
synchronous = oldSynchronous;
next = oldNext;
if(interrupted) {
Thread.currentThread().interrupt();
interrupted = false;
}
if(!started && !startedSync)
completionCheck();
}
} finally {
lock.unlock();
}
}
private void completionCheck() {
lock.lock();
try {
// Don't trigger completion and callbacks if fiber is suspended
if(!isCanceled && contsSize==0 && suspendedCount == 0) {
if(isTraceEnabled())
LOGGER.log(Level.FINE, "{0} completed", getName());
clearListeners();
condition.signalAll();
if (completionCallback != null) {
if (throwable != null) {
if (isDeliverThrowableInPacket) {
packet.addSatellite(new ThrowableContainerPropertySet(throwable));
completionCallback.onCompletion(packet);
} else
completionCallback.onCompletion(throwable);
} else
completionCallback.onCompletion(packet);
}
}
} finally {
lock.unlock();
}
}
Invokes all registered InterceptorHandler
s and then call into Fiber.__doRun()
. /**
* Invokes all registered {@link InterceptorHandler}s and then call into
* {@link Fiber#__doRun()}.
*/
private class InterceptorHandler implements FiberContextSwitchInterceptor.Work<Tube, Tube> {
private final Holder<Boolean> isUnlockRequired;
private final List<FiberContextSwitchInterceptor> ints;
Index in Fiber.interceptors
to invoke next. /**
* Index in {@link Fiber#interceptors} to invoke next.
*/
private int idx;
public InterceptorHandler(Holder<Boolean> isUnlockRequired, List<FiberContextSwitchInterceptor> ints) {
this.isUnlockRequired = isUnlockRequired;
this.ints = ints;
}
Initiate the interception, and eventually invokes Fiber.__doRun()
. /**
* Initiate the interception, and eventually invokes {@link Fiber#__doRun()}.
*/
Tube invoke(Tube next) {
idx = 0;
return execute(next);
}
@Override
public Tube execute(Tube next) {
if (idx == ints.size()) {
Fiber.this.next = next;
if (__doRun(isUnlockRequired, ints))
return PLACEHOLDER;
} else {
FiberContextSwitchInterceptor interceptor = ints.get(idx++);
return interceptor.execute(Fiber.this, next, this);
}
return Fiber.this.next;
}
}
private static final PlaceholderTube PLACEHOLDER = new PlaceholderTube();
private static class PlaceholderTube extends AbstractTubeImpl {
@Override
public NextAction processRequest(Packet request) {
throw new UnsupportedOperationException();
}
@Override
public NextAction processResponse(Packet response) {
throw new UnsupportedOperationException();
}
@Override
public NextAction processException(Throwable t) {
return doThrow(t);
}
@Override
public void preDestroy() {
}
@Override
public PlaceholderTube copy(TubeCloner cloner) {
throw new UnsupportedOperationException();
}
}
Executes the fiber as much as possible.
/**
* Executes the fiber as much as possible.
*
*/
private boolean doRun() {
dumpFiberContext("running");
if (serializeExecution) {
serializedExecutionLock.lock();
try {
return _doRun(next);
} finally {
serializedExecutionLock.unlock();
}
} else {
return _doRun(next);
}
}
private boolean _doRun(Tube next) {
// isRequireUnlock will contain Boolean.FALSE when lock has already been released in suspend
Holder<Boolean> isRequireUnlock = new Holder<Boolean>(Boolean.TRUE);
lock.lock();
try {
List<FiberContextSwitchInterceptor> ints;
ClassLoader old;
synchronized(this) {
ints = interceptors;
// currentThread is protected by the monitor for this fiber so
// that it is accessible to cancel() even when the lock is held
currentThread = Thread.currentThread();
if (isTraceEnabled()) {
LOGGER.log(Level.FINE, "Thread entering _doRun(): {0}", currentThread);
}
old = currentThread.getContextClassLoader();
currentThread.setContextClassLoader(contextClassLoader);
}
try {
boolean needsToReenter;
do {
// if interceptors are set, go through the interceptors.
if (ints == null) {
this.next = next;
if (__doRun(isRequireUnlock, null /*ints*/)) {
return true;
}
} else {
next = new InterceptorHandler(isRequireUnlock, ints).invoke(next);
if (next == PLACEHOLDER) {
return true;
}
}
synchronized(this) {
needsToReenter = (ints != interceptors);
if (needsToReenter)
ints = interceptors;
}
} while (needsToReenter);
} catch(OnExitRunnableException o) {
// catching this exception indicates onExitRunnable in suspend() threw.
// we must still avoid double unlock
Throwable t = o.target;
if (t instanceof WebServiceException)
throw (WebServiceException) t;
throw new WebServiceException(t);
} finally {
// don't reference currentThread here because fiber processing
// may already be running on a different thread (Note: isAlreadyExited
// tracks this state
Thread thread = Thread.currentThread();
thread.setContextClassLoader(old);
if (isTraceEnabled()) {
LOGGER.log(Level.FINE, "Thread leaving _doRun(): {0}", thread);
}
}
return false;
} finally {
if (isRequireUnlock.value) {
synchronized(this) {
currentThread = null;
}
lock.unlock();
}
}
}
To be invoked from doRun()
. See Also:
/**
* To be invoked from {@link #doRun()}.
*
* @see #doRun()
*/
private boolean __doRun(Holder<Boolean> isRequireUnlock, List<FiberContextSwitchInterceptor> originalInterceptors) {
assert(lock.isHeldByCurrentThread());
final Fiber old = CURRENT_FIBER.get();
CURRENT_FIBER.set(this);
// if true, lots of debug messages to show what's being executed
final boolean traceEnabled = LOGGER.isLoggable(Level.FINER);
try {
boolean abortResponse = false;
while(isReady(originalInterceptors)) {
if (isCanceled) {
next = null;
throwable = null;
contsSize = 0;
break;
}
try {
NextAction na;
Tube last;
if(throwable!=null) {
if(contsSize==0 || abortResponse) {
contsSize = 0; // abortResponse case
// nothing else to execute. we are done.
return false;
}
last = popCont();
if (traceEnabled)
LOGGER.log(Level.FINER, "{0} {1}.processException({2})", new Object[]{getName(), last, throwable});
na = last.processException(throwable);
} else {
if(next!=null) {
if(traceEnabled)
LOGGER.log(Level.FINER, "{0} {1}.processRequest({2})", new Object[]{getName(), next, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"});
na = next.processRequest(packet);
last = next;
} else {
if(contsSize==0 || abortResponse) {
// nothing else to execute. we are done.
contsSize = 0;
return false;
}
last = popCont();
if(traceEnabled)
LOGGER.log(Level.FINER, "{0} {1}.processResponse({2})", new Object[]{getName(), last, packet != null ? "Packet@"+Integer.toHexString(packet.hashCode()) : "null"});
na = last.processResponse(packet);
}
}
if (traceEnabled)
LOGGER.log(Level.FINER, "{0} {1} returned with {2}", new Object[]{getName(), last, na});
// If resume is called before suspend, then make sure
// resume(Packet) is not lost
if (na.kind != NextAction.SUSPEND) {
// preserve in-flight packet so that processException may inspect
if (na.kind != NextAction.THROW &&
na.kind != NextAction.THROW_ABORT_RESPONSE)
packet = na.packet;
throwable = na.throwable;
}
switch(na.kind) {
case NextAction.INVOKE:
case NextAction.INVOKE_ASYNC:
pushCont(last);
// fall through next
case NextAction.INVOKE_AND_FORGET:
next = na.next;
if (na.kind == NextAction.INVOKE_ASYNC
&& startedSync) {
// Break out here
return false;
}
break;
case NextAction.THROW_ABORT_RESPONSE:
case NextAction.ABORT_RESPONSE:
abortResponse = true;
if (isTraceEnabled()) {
LOGGER.log(Level.FINE, "Fiber {0} is aborting a response due to exception: {1}", new Object[]{this, na.throwable});
}
case NextAction.RETURN:
case NextAction.THROW:
next = null;
break;
case NextAction.SUSPEND:
if (next != null) {
// Only store the 'last' tube when we're processing
// a request, since conts array is for processResponse
pushCont(last);
}
next = na.next;
if(suspend(isRequireUnlock, na.onExitRunnable))
return true; // explicitly exiting control loop
break;
default:
throw new AssertionError();
}
} catch (RuntimeException t) {
if (traceEnabled)
LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t);
throwable = t;
} catch (Error t) {
if (traceEnabled)
LOGGER.log(Level.FINER, getName() + " Caught " + t + ". Start stack unwinding", t);
throwable = t;
}
dumpFiberContext("After tube execution");
}
// there's nothing we can execute right away.
// we'll be back when this fiber is resumed.
} finally {
CURRENT_FIBER.set(old);
}
return false;
}
private void pushCont(Tube tube) {
conts[contsSize++] = tube;
// expand if needed
int len = conts.length;
if (contsSize == len) {
Tube[] newBuf = new Tube[len * 2];
System.arraycopy(conts, 0, newBuf, 0, len);
conts = newBuf;
}
}
private Tube popCont() {
return conts[--contsSize];
}
private Tube peekCont() {
int index = contsSize - 1;
if (index >= 0 && index < conts.length) {
return conts[index];
} else {
return null;
}
}
Only to be used by Tubes that manipulate the Fiber to create alternate flows
Since: 2.2.6
/**
* Only to be used by Tubes that manipulate the Fiber to create alternate flows
* @since 2.2.6
*/
public void resetCont(Tube[] conts, int contsSize) {
this.conts = conts;
this.contsSize = contsSize;
}
Returns true if the fiber is ready to execute.
/**
* Returns true if the fiber is ready to execute.
*/
private boolean isReady(List<FiberContextSwitchInterceptor> originalInterceptors) {
if (synchronous) {
while (suspendedCount == 1)
try {
if (isTraceEnabled()) {
LOGGER.log(Level.FINE, "{0} is blocking thread {1}", new Object[]{getName(), Thread.currentThread().getName()});
}
condition.await(); // the synchronized block is the whole runSync method.
} catch (InterruptedException e) {
// remember that we are interrupted, but don't respond to it
// right away. This behavior is in line with what happens
// when you are actually running the whole thing synchronously.
interrupted = true;
}
synchronized(this) {
return interceptors == originalInterceptors;
}
}
else {
if (suspendedCount>0)
return false;
synchronized(this) {
return interceptors == originalInterceptors;
}
}
}
private String getName() {
return "engine-" + owner.id + "fiber-" + id;
}
@Override
public String toString() {
return getName();
}
Gets the current Packet
associated with this fiber.
This method returns null if no packet has been associated with the fiber yet.
/**
* Gets the current {@link Packet} associated with this fiber.
* <p/>
* <p/>
* This method returns null if no packet has been associated with the fiber yet.
*/
public
@Nullable
Packet getPacket() {
return packet;
}
Returns completion callback associated with this Fiber
Returns: Completion callback Since: 2.2.6
/**
* Returns completion callback associated with this Fiber
* @return Completion callback
* @since 2.2.6
*/
public CompletionCallback getCompletionCallback() {
return completionCallback;
}
Updates completion callback associated with this Fiber
Params: - completionCallback – Completion callback
Since: 2.2.6
/**
* Updates completion callback associated with this Fiber
* @param completionCallback Completion callback
* @since 2.2.6
*/
public void setCompletionCallback(CompletionCallback completionCallback) {
this.completionCallback = completionCallback;
}
(ADVANCED) Returns true if the current fiber is being executed synchronously.
Fiber may run synchronously for various reasons. Perhaps this is
on client side and application has invoked a synchronous method call.
Perhaps this is on server side and we have deployed on a synchronous
transport (like servlet.)
When a fiber is run synchronously (IOW by runSync(Tube, Packet)
), further invocations to runSync(Tube, Packet)
can be done without degrading the performance.
So this value can be used as a further optimization hint for advanced Tube
s to choose the best strategy to invoke the next Tube
. For example, a tube may want to install a FiberContextSwitchInterceptor
if running async, yet it might find it faster to do runSync(Tube, Packet)
if it's already running synchronously. /**
* (ADVANCED) Returns true if the current fiber is being executed synchronously.
* <p/>
* <p/>
* Fiber may run synchronously for various reasons. Perhaps this is
* on client side and application has invoked a synchronous method call.
* Perhaps this is on server side and we have deployed on a synchronous
* transport (like servlet.)
* <p/>
* <p/>
* When a fiber is run synchronously (IOW by {@link #runSync(Tube, Packet)}),
* further invocations to {@link #runSync(Tube, Packet)} can be done
* without degrading the performance.
* <p/>
* <p/>
* So this value can be used as a further optimization hint for
* advanced {@link Tube}s to choose the best strategy to invoke
* the next {@link Tube}. For example, a tube may want to install
* a {@link FiberContextSwitchInterceptor} if running async, yet
* it might find it faster to do {@link #runSync(Tube, Packet)}
* if it's already running synchronously.
*/
public static boolean isSynchronous() {
return current().synchronous;
}
Returns true if the current Fiber on the current thread was started
synchronously. Note, this is not strictly the same as being synchronous
because the assumption is that the Fiber will ultimately be dispatched
asynchronously, possibly have a completion callback associated with it, etc.
Note, the 'startedSync' flag is cleared once the current Fiber is
converted to running asynchronously.
Since: 2.2.6
/**
* Returns true if the current Fiber on the current thread was started
* synchronously. Note, this is not strictly the same as being synchronous
* because the assumption is that the Fiber will ultimately be dispatched
* asynchronously, possibly have a completion callback associated with it, etc.
* Note, the 'startedSync' flag is cleared once the current Fiber is
* converted to running asynchronously.
* @since 2.2.6
*/
public boolean isStartedSync() {
return startedSync;
}
Gets the current fiber that's running.
This works like Thread.currentThread()
. This method only works when invoked from Tube
. /**
* Gets the current fiber that's running.
* <p/>
* <p/>
* This works like {@link Thread#currentThread()}.
* This method only works when invoked from {@link Tube}.
*/
public static
@NotNull
@SuppressWarnings({"null", "ConstantConditions"})
Fiber current() {
Fiber fiber = CURRENT_FIBER.get();
if (fiber == null)
throw new IllegalStateException("Can be only used from fibers");
return fiber;
}
Gets the current fiber that's running, if set.
/**
* Gets the current fiber that's running, if set.
*/
public static Fiber getCurrentIfSet() {
return CURRENT_FIBER.get();
}
private static final ThreadLocal<Fiber> CURRENT_FIBER = new ThreadLocal<Fiber>();
Used to allocate unique number for each fiber.
/**
* Used to allocate unique number for each fiber.
*/
private static final AtomicInteger iotaGen = new AtomicInteger();
private static boolean isTraceEnabled() {
return LOGGER.isLoggable(Level.FINE);
}
private static final Logger LOGGER = Logger.getLogger(Fiber.class.getName());
private static final ReentrantLock serializedExecutionLock = new ReentrantLock();
Set this boolean to true to execute fibers sequentially one by one.
See class javadoc.
/**
* Set this boolean to true to execute fibers sequentially one by one.
* See class javadoc.
*/
public static volatile boolean serializeExecution = Boolean.getBoolean(Fiber.class.getName() + ".serialize");
private final Set<Component> components = new CopyOnWriteArraySet<Component>();
@Override
public <S> S getSPI(Class<S> spiType) {
for (Component c : components) {
S spi = c.getSPI(spiType);
if (spi != null) {
return spi;
}
}
return null;
}
@Override
public Set<Component> getComponents() {
return components;
}
}