package com.mchange.v2.resourcepool;
import java.util.*;
import com.mchange.v2.async.*;
import com.mchange.v2.log.*;
import com.mchange.v2.lang.ThreadUtils;
import com.mchange.v2.util.ResourceClosedException;
class BasicResourcePool implements ResourcePool
{
private final static MLogger logger = MLog.getLogger( BasicResourcePool.class );
final static int AUTO_CULL_FREQUENCY_DIVISOR = 4;
final static int AUTO_MAX_CULL_FREQUENCY = (15 * 60 * 1000);
final static int AUTO_MIN_CULL_FREQUENCY = (1 * 1000);
final static String USE_SCATTERED_ACQUIRE_TASK_KEY = "com.mchange.v2.resourcepool.experimental.useScatteredAcquireTask";
final static boolean USE_SCATTERED_ACQUIRE_TASK;
static
{
String checkScattered = com.mchange.v2.cfg.MConfig.readVmConfig().getProperty(USE_SCATTERED_ACQUIRE_TASK_KEY);
if (checkScattered != null && checkScattered.trim().toLowerCase().equals("false"))
{
USE_SCATTERED_ACQUIRE_TASK = false;
if ( logger.isLoggable( MLevel.INFO ) )
logger.info(BasicResourcePool.class.getName() + " using traditional, Thread-blocking AcquireTask. Yuk. Why? It's no longer supported.");
}
else
USE_SCATTERED_ACQUIRE_TASK = true;
}
final Manager mgr;
final int start;
final int min;
final int max;
final int inc;
final int num_acq_attempts;
final int acq_attempt_delay;
final long check_idle_resources_delay;
final long max_resource_age;
final long max_idle_time;
final long excess_max_idle_time;
final long destroy_unreturned_resc_time;
final long expiration_enforcement_delay;
final boolean break_on_acquisition_failure;
final boolean debug_store_checkout_exceptions;
final boolean force_synchronous_checkins;
final long pool_start_time = System.currentTimeMillis();
final BasicResourcePoolFactory factory;
final AsynchronousRunner taskRunner;
final RunnableQueue asyncEventQueue;
final ResourcePoolEventSupport rpes;
Timer cullAndIdleRefurbishTimer;
TimerTask cullTask;
TimerTask idleRefurbishTask;
HashSet acquireWaiters = new HashSet();
HashSet otherWaiters = new HashSet();
int pending_acquires;
int pending_removes;
int target_pool_size;
HashMap managed = new HashMap();
LinkedList unused = new LinkedList();
HashSet excluded = new HashSet();
Map formerResources = new WeakHashMap();
Set idleCheckResources = new HashSet();
boolean force_kill_acquires = false;
boolean broken = false;
long failed_checkins = 0;
long failed_checkouts = 0;
long failed_idle_tests = 0;
Throwable lastCheckinFailure = null;
Throwable lastCheckoutFailure = null;
Throwable lastIdleTestFailure = null;
Throwable lastResourceTestFailure = null;
Throwable lastAcquisitionFailiure = null;
Object exampleResource;
public long getStartTime()
{ return pool_start_time; }
public long getUpTime()
{ return System.currentTimeMillis() - pool_start_time; }
public synchronized long getNumFailedCheckins()
{ return failed_checkins; }
public synchronized long getNumFailedCheckouts()
{ return failed_checkouts; }
public synchronized long getNumFailedIdleTests()
{ return failed_idle_tests; }
public synchronized Throwable getLastCheckinFailure()
{ return lastCheckinFailure; }
private void setLastCheckinFailure(Throwable t)
{
assert ( Thread.holdsLock(this));
this.lastCheckinFailure = t;
this.lastResourceTestFailure = t;
}
public synchronized Throwable getLastCheckoutFailure()
{ return lastCheckoutFailure; }
private void setLastCheckoutFailure(Throwable t)
{
assert ( Thread.holdsLock(this));
this.lastCheckoutFailure = t;
this.lastResourceTestFailure = t;
}
public synchronized Throwable getLastIdleCheckFailure()
{ return lastIdleTestFailure; }
private void setLastIdleCheckFailure(Throwable t)
{
assert ( Thread.holdsLock(this));
this.lastIdleTestFailure = t;
this.lastResourceTestFailure = t;
}
public synchronized Throwable getLastResourceTestFailure()
{ return lastResourceTestFailure; }
public synchronized Throwable getLastAcquisitionFailure()
{ return lastAcquisitionFailiure; }
private synchronized void setLastAcquisitionFailure( Throwable t )
{ this.lastAcquisitionFailiure = t; }
public synchronized int getNumCheckoutWaiters()
{ return acquireWaiters.size(); }
public synchronized int getNumPendingAcquireTasks()
{ return pending_acquires; }
public synchronized int getNumPendingRemoveTasks()
{ return pending_removes; }
public synchronized int getNumThreadsWaitingForResources()
{ return acquireWaiters.size(); }
public synchronized String[] getThreadNamesWaitingForResources()
{
int len = acquireWaiters.size();
String[] out = new String[len];
int i = 0;
for (Iterator ii = acquireWaiters.iterator(); ii.hasNext(); )
out[i++] = ((Thread) ii.next()).getName();
Arrays.sort( out );
return out;
}
public synchronized int getNumThreadsWaitingForAdministrativeTasks()
{ return otherWaiters.size(); }
public synchronized String[] getThreadNamesWaitingForAdministrativeTasks()
{
int len = otherWaiters.size();
String[] out = new String[len];
int i = 0;
for (Iterator ii = otherWaiters.iterator(); ii.hasNext(); )
out[i++] = ((Thread) ii.next()).getName();
Arrays.sort( out );
return out;
}
private void addToFormerResources( Object resc )
{ formerResources.put( resc, null ); }
private boolean isFormerResource( Object resc )
{ return formerResources.keySet().contains( resc ); }
public BasicResourcePool(
Manager mgr,
int start,
int min,
int max,
int inc,
int num_acq_attempts,
int acq_attempt_delay,
long check_idle_resources_delay,
long max_resource_age,
long max_idle_time,
long excess_max_idle_time,
long destroy_unreturned_resc_time,
long expiration_enforcement_delay,
boolean break_on_acquisition_failure,
boolean debug_store_checkout_exceptions,
boolean force_synchronous_checkins,
AsynchronousRunner taskRunner,
RunnableQueue asyncEventQueue,
Timer cullAndIdleRefurbishTimer,
BasicResourcePoolFactory factory)
throws ResourcePoolException
{
try
{
if ( min > max )
{
if ( logger.isLoggable( MLevel.WARNING ) )
logger.log( MLevel.WARNING, "Bad pool size config, min " + min + " > max " + max + ". Using " + max + " as min." );
min = max;
}
if ( start < min )
{
if ( logger.isLoggable( MLevel.WARNING ) )
logger.log( MLevel.WARNING, "Bad pool size config, start " + start + " < min " + min + ". Using " + min + " as start." );
start = min;
}
if ( start > max )
{
if ( logger.isLoggable( MLevel.WARNING ) )
logger.log( MLevel.WARNING, "Bad pool size config, start " + start + " > max " + max + ". Using " + max + " as start." );
start = max;
}
this.mgr = mgr;
this.start = start;
this.min = min;
this.max = max;
this.inc = inc;
this.num_acq_attempts = num_acq_attempts;
this.acq_attempt_delay = acq_attempt_delay;
this.check_idle_resources_delay = check_idle_resources_delay;
this.max_resource_age = max_resource_age;
this.max_idle_time = max_idle_time;
this.excess_max_idle_time = excess_max_idle_time;
this.destroy_unreturned_resc_time = destroy_unreturned_resc_time;
this.break_on_acquisition_failure = break_on_acquisition_failure;
this.debug_store_checkout_exceptions = (debug_store_checkout_exceptions && destroy_unreturned_resc_time > 0);
this.force_synchronous_checkins = force_synchronous_checkins;
this.taskRunner = taskRunner;
this.asyncEventQueue = asyncEventQueue;
this.cullAndIdleRefurbishTimer = cullAndIdleRefurbishTimer;
this.factory = factory;
this.pending_acquires = 0;
this.pending_removes = 0;
this.target_pool_size = this.start;
if (asyncEventQueue != null)
this.rpes = new ResourcePoolEventSupport(this);
else
this.rpes = null;
ensureStartResources();
if (mustEnforceExpiration())
{
if (expiration_enforcement_delay <= 0)
this.expiration_enforcement_delay = automaticExpirationEnforcementDelay();
else
this.expiration_enforcement_delay = expiration_enforcement_delay;
this.cullTask = new CullTask();
cullAndIdleRefurbishTimer.schedule( cullTask, minExpirationTime(), this.expiration_enforcement_delay );
}
else
this.expiration_enforcement_delay = expiration_enforcement_delay;
if (check_idle_resources_delay > 0)
{
this.idleRefurbishTask = new CheckIdleResourcesTask();
cullAndIdleRefurbishTimer.schedule( idleRefurbishTask,
check_idle_resources_delay,
check_idle_resources_delay );
}
if ( logger.isLoggable( MLevel.FINER ) )
logger.finer( this + " config: [start -> " + this.start + "; min -> " + this.min + "; max -> " + this.max + "; inc -> " + this.inc +
"; num_acq_attempts -> " + this.num_acq_attempts + "; acq_attempt_delay -> " + this.acq_attempt_delay +
"; check_idle_resources_delay -> " + this.check_idle_resources_delay + "; max_resource_age -> " + this.max_resource_age +
"; max_idle_time -> " + this.max_idle_time + "; excess_max_idle_time -> " + this.excess_max_idle_time +
"; destroy_unreturned_resc_time -> " + this.destroy_unreturned_resc_time +
"; expiration_enforcement_delay -> " + this.expiration_enforcement_delay +
"; break_on_acquisition_failure -> " + this.break_on_acquisition_failure +
"; debug_store_checkout_exceptions -> " + this.debug_store_checkout_exceptions +
"; force_synchronous_checkins -> " + this.force_synchronous_checkins +
"]");
}
catch (Exception e)
{
throw ResourcePoolUtils.convertThrowable( e );
}
}
private boolean mustTestIdleResources()
{ return check_idle_resources_delay > 0; }
private boolean mustEnforceExpiration()
{
return
max_resource_age > 0 ||
max_idle_time > 0 ||
excess_max_idle_time > 0 ||
destroy_unreturned_resc_time > 0;
}
private long minExpirationTime()
{
long out = Long.MAX_VALUE;
if (max_resource_age > 0)
out = Math.min( out, max_resource_age );
if (max_idle_time > 0)
out = Math.min( out, max_idle_time );
if (excess_max_idle_time > 0)
out = Math.min( out, excess_max_idle_time );
if (destroy_unreturned_resc_time > 0)
out = Math.min( out, destroy_unreturned_resc_time );
return out;
}
private long automaticExpirationEnforcementDelay()
{
long out = minExpirationTime();
out /= AUTO_CULL_FREQUENCY_DIVISOR;
out = Math.min( out, AUTO_MAX_CULL_FREQUENCY );
out = Math.max( out, AUTO_MIN_CULL_FREQUENCY );
return out;
}
public long getEffectiveExpirationEnforcementDelay()
{ return expiration_enforcement_delay; }
private synchronized boolean isBroken()
{ return broken; }
private boolean supportsEvents()
{ return asyncEventQueue != null; }
public Object checkoutResource()
throws ResourcePoolException, InterruptedException
{
try { return checkoutResource( 0 ); }
catch (TimeoutException e)
{
if ( logger.isLoggable( MLevel.WARNING ) )
logger.log( MLevel.WARNING, "Huh??? TimeoutException with no timeout set!!!", e);
throw new ResourcePoolException("Huh??? TimeoutException with no timeout set!!!", e);
}
}
private void _recheckResizePool()
{
assert Thread.holdsLock(this);
if (! broken)
{
int msz = managed.size();
int shrink_count;
int expand_count;
if ((shrink_count = msz - pending_removes - target_pool_size) > 0)
shrinkPool( shrink_count );
else if ((expand_count = target_pool_size - (msz + pending_acquires)) > 0)
expandPool( expand_count );
}
}
private synchronized void incrementPendingAcquires()
{
++pending_acquires;
if (logger.isLoggable(MLevel.FINEST))
logger.finest("incremented pending_acquires: " + pending_acquires);
}
private synchronized void incrementPendingRemoves()
{
++pending_removes;
if (logger.isLoggable(MLevel.FINEST))
logger.finest("incremented pending_removes: " + pending_removes);
}
private synchronized void decrementPendingAcquires()
{ _decrementPendingAcquires(); }
private void _decrementPendingAcquires()
{
--pending_acquires;
if (logger.isLoggable(MLevel.FINEST))
logger.finest("decremented pending_acquires: " + pending_acquires);
}
private synchronized void decrementPendingRemoves()
{
--pending_removes;
if (logger.isLoggable(MLevel.FINEST))
logger.finest("decremented pending_removes: " + pending_removes);
}
private synchronized void recheckResizePool()
{ _recheckResizePool(); }
private void expandPool(int count)
{
assert Thread.holdsLock(this);
if ( USE_SCATTERED_ACQUIRE_TASK )
{
for (int i = 0; i < count; ++i)
taskRunner.postRunnable( new ScatteredAcquireTask() );
}
else
{
for (int i = 0; i < count; ++i)
taskRunner.postRunnable( new AcquireTask() );
}
}
private void shrinkPool(int count)
{
assert Thread.holdsLock(this);
for (int i = 0; i < count; ++i)
taskRunner.postRunnable( new RemoveTask() );
}
public Object checkoutResource( long timeout )
throws TimeoutException, ResourcePoolException, InterruptedException
{
try
{
Object resc = prelimCheckoutResource( timeout );
boolean refurb = attemptRefurbishResourceOnCheckout( resc );
synchronized( this )
{
if (!refurb)
{
if (Debug.DEBUG && logger.isLoggable( MLevel.FINER))
logger.log( MLevel.FINER, "Resource [" + resc + "] could not be refurbished in preparation for checkout. Will try to find a better resource." );
removeResource( resc );
ensureMinResources();
resc = null;
}
else
{
asyncFireResourceCheckedOut( resc, managed.size(), unused.size(), excluded.size() );
if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX) trace();
PunchCard card = (PunchCard) managed.get( resc );
if (card == null)
{
if (Debug.DEBUG && logger.isLoggable( MLevel.FINER ))
logger.finer("Resource " + resc + " was removed from the pool while it was being checked out " +
" or refurbished for checkout. Will try to find a replacement resource.");
resc = null;
}
else
{
card.checkout_time = System.currentTimeMillis();
if (debug_store_checkout_exceptions)
card.checkoutStackTraceException = new Exception("DEBUG STACK TRACE: Overdue resource check-out stack trace.");
}
}
}
if (resc == null)
return checkoutResource( timeout );
else
return resc;
}
catch ( StackOverflowError e )
{
throw new NoGoodResourcesException( "After checking so many resources we blew the stack, no resources tested acceptable for checkout. " +
"See logger com.mchange.v2.resourcepool.BasicResourcePool output at FINER/DEBUG for information on individual failures.",
e );
}
}
private synchronized Object prelimCheckoutResource( long timeout )
throws TimeoutException, ResourcePoolException, InterruptedException
{
try
{
ensureNotBroken();
int available = unused.size();
if (available == 0)
{
int msz = managed.size();
if (msz < max)
{
int desired_target = msz + acquireWaiters.size() + 1;
if (logger.isLoggable(MLevel.FINER))
logger.log(MLevel.FINER, "acquire test -- pool size: " + msz + "; target_pool_size: " + target_pool_size + "; desired target? " + desired_target);
if (desired_target >= target_pool_size)
{
desired_target = Math.max(desired_target, target_pool_size + inc);
target_pool_size = Math.max( Math.min( max, desired_target ), min );
_recheckResizePool();
}
}
else
{
if (logger.isLoggable(MLevel.FINER))
logger.log(MLevel.FINER, "acquire test -- pool is already maxed out. [managed: " + msz + "; max: " + max + "]");
}
awaitAvailable(timeout);
}
Object resc = unused.get(0);
if ( idleCheckResources.contains( resc ) )
{
if (Debug.DEBUG && logger.isLoggable( MLevel.FINER))
logger.log( MLevel.FINER, "Resource we want to check out is in idleCheck! (waiting until idle-check completes.) [" + this + "]");
Thread t = Thread.currentThread();
try
{
otherWaiters.add ( t );
this.wait( timeout );
ensureNotBroken();
}
finally
{ otherWaiters.remove( t ); }
return prelimCheckoutResource( timeout );
}
else if ( shouldExpire( resc ) )
{
if (Debug.DEBUG && logger.isLoggable( MLevel.FINER))
logger.log( MLevel.FINER, "Resource we want to check out has expired already. Trying again.");
removeResource( resc );
ensureMinResources();
return prelimCheckoutResource( timeout );
}
else
{
unused.remove(0);
return resc;
}
}
catch ( ResourceClosedException e )
{
if (logger.isLoggable( MLevel.SEVERE ))
logger.log( MLevel.SEVERE, this + " -- the pool was found to be closed or broken during an attempt to check out a resource.", e );
this.unexpectedBreak();
throw e;
}
catch ( InterruptedException e )
{
if (broken)
{
if (logger.isLoggable( MLevel.FINER ))
logger.log(MLevel.FINER,
this + " -- an attempt to checkout a resource was interrupted, because the pool is now closed. " +
"[Thread: " + Thread.currentThread().getName() + ']',
e );
else if (logger.isLoggable( MLevel.INFO ))
logger.log(MLevel.INFO,
this + " -- an attempt to checkout a resource was interrupted, because the pool is now closed. " +
"[Thread: " + Thread.currentThread().getName() + ']');
}
else
{
if (logger.isLoggable( MLevel.WARNING ))
{
logger.log(MLevel.WARNING,
this + " -- an attempt to checkout a resource was interrupted, and the pool is still live: some other thread " +
"must have interrupted the Thread attempting checkout!",
e );
}
}
throw e;
}
catch ( StackOverflowError e )
{
throw new NoGoodResourcesException( "After checking so many resources we blew the stack, no resources tested acceptable for checkout. " +
"See logger com.mchange.v2.resourcepool.BasicResourcePool output at FINER/DEBUG for information on individual failures.",
e );
}
}
public void checkinResource( Object resc ) throws ResourcePoolException
{
try
{
boolean unlocked_do_checkin_managed = false;
synchronized ( this )
{
if (managed.keySet().contains(resc))
unlocked_do_checkin_managed = true;
else if (excluded.contains(resc))
doCheckinExcluded( resc );
else if ( isFormerResource(resc) )
{
if ( logger.isLoggable( MLevel.FINER ) )
logger.finer("Resource " + resc + " checked-in after having been checked-in already, or checked-in after " +
" having being destroyed for being checked-out too long.");
}
else
throw new ResourcePoolException("ResourcePool" + (broken ? " [BROKEN!]" : "") + ": Tried to check-in a foreign resource!");
}
if ( unlocked_do_checkin_managed ) doCheckinManaged( resc );
if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX) syncTrace();
}
catch ( ResourceClosedException e )
{
if ( logger.isLoggable( MLevel.SEVERE ) )
logger.log( MLevel.SEVERE,
this + " - checkinResource( ... ) -- even broken pools should allow checkins without exception. probable resource pool bug.",
e);
this.unexpectedBreak();
throw e;
}
}
public void checkinAll() throws ResourcePoolException
{
try
{
Set checkedOutNotExcluded = null;
synchronized ( this )
{
checkedOutNotExcluded = new HashSet( managed.keySet() );
checkedOutNotExcluded.removeAll( unused );
for (Iterator ii = excluded.iterator(); ii.hasNext(); )
doCheckinExcluded( ii.next() );
}
for (Iterator ii = checkedOutNotExcluded.iterator(); ii.hasNext(); )
doCheckinManaged( ii.next() );
}
catch ( ResourceClosedException e )
{
if ( logger.isLoggable( MLevel.SEVERE ) )
logger.log( MLevel.SEVERE,
this + " - checkinAll() -- even broken pools should allow checkins without exception. probable resource pool bug.",
e );
this.unexpectedBreak();
throw e;
}
}
public synchronized int statusInPool( Object resc )
throws ResourcePoolException
{
try
{
if ( unused.contains( resc ) )
return KNOWN_AND_AVAILABLE;
else if ( managed.keySet().contains( resc ) || excluded.contains( resc ) )
return KNOWN_AND_CHECKED_OUT;
else
return UNKNOWN_OR_PURGED;
}
catch ( ResourceClosedException e )
{
if ( logger.isLoggable( MLevel.SEVERE ) )
logger.log( MLevel.SEVERE, "Apparent pool break.", e );
this.unexpectedBreak();
throw e;
}
}
public synchronized void markBroken(Object resc)
{
try
{
if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX && logger.isLoggable( MLevel.FINER ))
logger.log( MLevel.FINER, "Resource " + resc + " marked broken by pool (" + this + ").");
_markBroken( resc );
ensureMinResources();
}
catch ( ResourceClosedException e )
{
if ( logger.isLoggable( MLevel.SEVERE ) )
logger.log( MLevel.SEVERE, "Apparent pool break.", e );
this.unexpectedBreak();
}
}
public int getMinPoolSize()
{ return min; }
public int getMaxPoolSize()
{ return max; }
public synchronized int getPoolSize()
throws ResourcePoolException
{ return managed.size(); }
public synchronized int getAvailableCount()
{ return unused.size(); }
public synchronized int getExcludedCount()
{ return excluded.size(); }
public synchronized int getAwaitingCheckinCount()
{ return managed.size() - unused.size() + excluded.size(); }
public synchronized int getAwaitingCheckinNotExcludedCount()
{ return managed.size() - unused.size(); }
public synchronized void resetPool()
{
try
{
for (Iterator ii = cloneOfManaged().keySet().iterator(); ii.hasNext();)
markBrokenNoEnsureMinResources(ii.next());
ensureMinResources();
}
catch ( ResourceClosedException e )
{
if ( logger.isLoggable( MLevel.SEVERE ) )
logger.log( MLevel.SEVERE, "Apparent pool break.", e );
this.unexpectedBreak();
}
}
public synchronized void close()
throws ResourcePoolException
{
close( true );
}
public void finalize() throws Throwable
{
if (! broken )
this.close();
}
public void addResourcePoolListener(ResourcePoolListener rpl)
{
if ( ! supportsEvents() )
throw new RuntimeException(this + " does not support ResourcePoolEvents. " +
"Probably it was constructed by a BasicResourceFactory configured not to support such events.");
else
rpes.addResourcePoolListener(rpl);
}
public void removeResourcePoolListener(ResourcePoolListener rpl)
{
if ( ! supportsEvents() )
throw new RuntimeException(this + " does not support ResourcePoolEvents. " +
"Probably it was constructed by a BasicResourceFactory configured not to support such events.");
else
rpes.removeResourcePoolListener(rpl);
}
private synchronized boolean isForceKillAcquiresPending()
{ return force_kill_acquires; }
private synchronized void forceKillAcquires() throws InterruptedException
{
if (logger.isLoggable(MLevel.WARNING))
logger.log(MLevel.WARNING,
"Having failed to acquire a resource, " +
this +
" is interrupting all Threads waiting on a resource to check out. " +
"Will try again in response to new client requests.");
Thread t = Thread.currentThread();
try
{
force_kill_acquires = true;
this.notifyAll();
while (acquireWaiters.size() > 0)
{
otherWaiters.add( t );
this.wait();
}
force_kill_acquires = false;
}
catch ( InterruptedException e )
{
for (Iterator ii = acquireWaiters.iterator(); ii.hasNext(); )
((Thread) ii.next()).interrupt();
if (logger.isLoggable( MLevel.WARNING ))
logger.log( MLevel.WARNING,
"An interrupt left an attempt to gently clear threads waiting on resource acquisition potentially incomplete! " +
"We have made a best attempt to finish that by interrupt()ing the waiting Threads." );
force_kill_acquires = false;
e.fillInStackTrace();
throw e;
}
catch ( Throwable ick )
{
for (Iterator ii = acquireWaiters.iterator(); ii.hasNext(); )
((Thread) ii.next()).interrupt();
if (logger.isLoggable( MLevel.SEVERE ))
logger.log( MLevel.SEVERE,
"An unexpected problem caused our attempt to gently clear threads waiting on resource acquisition to fail! " +
"We have made a best attempt to finish that by interrupt()ing the waiting Threads.",
ick );
force_kill_acquires = false;
if ( ick instanceof RuntimeException ) throw (RuntimeException) ick;
else if ( ick instanceof Error ) throw (Error) ick;
else throw new RuntimeException("Wrapped unexpected Throwable.", ick );
}
finally
{ otherWaiters.remove( t ); }
}
private synchronized void unexpectedBreak()
{
if ( logger.isLoggable( MLevel.SEVERE ) )
logger.log( MLevel.SEVERE, this + " -- Unexpectedly broken!!!", new ResourcePoolException("Unexpected Break Stack Trace!") );
close( false );
}
private boolean canFireEvents()
{ return ( asyncEventQueue != null && !isBroken() ); }
private void asyncFireResourceAcquired( final Object resc,
final int pool_size,
final int available_size,
final int removed_but_unreturned_size )
{
if ( canFireEvents() )
{
Runnable r = new Runnable()
{
public void run()
{rpes.fireResourceAcquired(resc, pool_size, available_size, removed_but_unreturned_size);}
};
asyncEventQueue.postRunnable(r);
}
}
private void asyncFireResourceCheckedIn( final Object resc,
final int pool_size,
final int available_size,
final int removed_but_unreturned_size )
{
if ( canFireEvents() )
{
Runnable r = new Runnable()
{
public void run()
{rpes.fireResourceCheckedIn(resc, pool_size, available_size, removed_but_unreturned_size);}
};
asyncEventQueue.postRunnable(r);
}
}
private void asyncFireResourceCheckedOut( final Object resc,
final int pool_size,
final int available_size,
final int removed_but_unreturned_size )
{
if ( canFireEvents() )
{
Runnable r = new Runnable()
{
public void run()
{rpes.fireResourceCheckedOut(resc,pool_size,available_size,removed_but_unreturned_size);}
};
asyncEventQueue.postRunnable(r);
}
}
private void asyncFireResourceRemoved( final Object resc,
final boolean checked_out_resource,
final int pool_size,
final int available_size,
final int removed_but_unreturned_size )
{
if ( canFireEvents() )
{
Runnable r = new Runnable()
{
public void run()
{
rpes.fireResourceRemoved(resc, checked_out_resource,
pool_size,available_size,removed_but_unreturned_size);
}
};
asyncEventQueue.postRunnable(r);
}
}
private void destroyResource(final Object resc)
{ destroyResource( resc, false ); }
private void destroyResource(final Object resc, boolean synchronous)
{ destroyResource( resc, synchronous, false ); }
private void destroyResource(final Object resc, boolean synchronous, final boolean checked_out)
{
class DestroyResourceTask implements Runnable
{
public void run()
{
try
{
if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX && logger.isLoggable( MLevel.FINER ))
logger.log(MLevel.FINER, "Preparing to destroy resource: " + resc);
mgr.destroyResource(resc, checked_out);
if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX && logger.isLoggable( MLevel.FINER ))
logger.log(MLevel.FINER, "Successfully destroyed resource: " + resc);
}
catch ( Exception e )
{
if ( logger.isLoggable( MLevel.WARNING ) )
logger.log( MLevel.WARNING, "Failed to destroy resource: " + resc, e );
}
}
}
Runnable r = new DestroyResourceTask();
if ( synchronous || broken )
{
if ( logger.isLoggable(MLevel.FINEST) && !broken && Boolean.TRUE.equals( ThreadUtils.reflectiveHoldsLock( this ) ) )
logger.log( MLevel.FINEST,
this + ": Destroyiong a resource on an active pool, synchronousy while holding pool's lock! " +
"(not a bug, but a potential bottleneck... is there a good reason for this?)",
new Exception("DEBUG STACK TRACE: resource destruction while holding lock.") );
r.run();
}
else
{
try { taskRunner.postRunnable( r ); }
catch (Exception e)
{
if (logger.isLoggable(MLevel.FINER))
logger.log( MLevel.FINER,
"AsynchronousRunner refused to accept task to destroy resource. " +
"It is probably shared, and has probably been closed underneath us. " +
"Reverting to synchronous destruction. This is not usually a problem.",
e );
destroyResource( resc, true );
}
}
}
private void doAcquire() throws Exception
{ doAcquire( NO_DECREMENT ); }
private void doAcquireAndDecrementPendingAcquiresWithinLockOnSuccess() throws Exception
{ doAcquire( DECREMENT_ON_SUCCESS ); }
private void doAcquireAndDecrementPendingAcquiresWithinLockAlways() throws Exception
{ doAcquire( DECREMENT_WITH_CERTAINTY ); }
private final static int NO_DECREMENT = 0;
private final static int DECREMENT_ON_SUCCESS =1;
private final static int DECREMENT_WITH_CERTAINTY = 2;
private void doAcquire( int decrement_policy ) throws Exception
{
assert !Thread.holdsLock( this );
Object resc = mgr.acquireResource();
boolean destroy = false;
int msz;
synchronized(this)
{
try
{
msz = managed.size();
if (!broken && msz < target_pool_size)
assimilateResource(resc);
else
destroy = true;
if (decrement_policy == DECREMENT_ON_SUCCESS)
_decrementPendingAcquires();
}
finally
{
if (decrement_policy == DECREMENT_WITH_CERTAINTY)
_decrementPendingAcquires();
}
}
if (destroy)
{
try
{
mgr.destroyResource( resc, false );
if (logger.isLoggable( MLevel.FINER))
logger.log(MLevel.FINER, "destroying overacquired resource: " + resc);
}
catch (Exception e)
{
if (logger.isLoggable( MLevel.FINE))
logger.log(MLevel.FINE, "An exception occurred while trying to destroy an overacquired resource: " + resc, e);
}
}
}
public synchronized void setPoolSize( int sz ) throws ResourcePoolException
{
try
{
setTargetPoolSize( sz );
while ( managed.size() != sz )
this.wait();
}
catch (Exception e)
{
String msg = "An exception occurred while trying to set the pool size!";
if ( logger.isLoggable( MLevel.FINER ) )
logger.log( MLevel.FINER, msg, e );
throw ResourcePoolUtils.convertThrowable( msg, e );
}
}
public synchronized void setTargetPoolSize(int sz)
{
if (sz > max)
{
throw new IllegalArgumentException("Requested size [" + sz +
"] is greater than max [" + max +
"].");
}
else if (sz < min)
{
throw new IllegalArgumentException("Requested size [" + sz +
"] is less than min [" + min +
"].");
}
this.target_pool_size = sz;
_recheckResizePool();
}
private void markBrokenNoEnsureMinResources(Object resc)
{
assert Thread.holdsLock( this );
try
{
_markBroken( resc );
}
catch ( ResourceClosedException e )
{
if ( logger.isLoggable( MLevel.SEVERE ) )
logger.log( MLevel.SEVERE, "Apparent pool break.", e );
this.unexpectedBreak();
}
}
private void _markBroken( Object resc )
{
assert Thread.holdsLock( this );
if ( unused.contains( resc ) )
removeResource( resc );
else
excludeResource( resc );
}
public synchronized void close( boolean close_checked_out_resources )
{
if (! broken )
{
this.broken = true;
final Collection cleanupResources = ( close_checked_out_resources ? (Collection) cloneOfManaged().keySet() : (Collection) cloneOfUnused() );
if ( cullTask != null )
cullTask.cancel();
if (idleRefurbishTask != null)
idleRefurbishTask.cancel();
for ( Iterator ii = cleanupResources.iterator(); ii.hasNext(); )
addToFormerResources( ii.next() );
managed.keySet().removeAll( cleanupResources );
unused.removeAll( cleanupResources );
Thread resourceDestroyer = new Thread("Resource Destroyer in BasicResourcePool.close()")
{
public void run()
{
for (Iterator ii = cleanupResources.iterator(); ii.hasNext();)
{
try
{
Object resc = ii.next();
destroyResource( resc, true );
}
catch (Exception e)
{
if (Debug.DEBUG)
{
if ( logger.isLoggable( MLevel.FINE ) )
logger.log( MLevel.FINE, "BasicResourcePool -- A resource couldn't be cleaned up on close()", e );
}
}
}
}
};
resourceDestroyer.start();
for (Iterator ii = acquireWaiters.iterator(); ii.hasNext(); )
((Thread) ii.next()).interrupt();
for (Iterator ii = otherWaiters.iterator(); ii.hasNext(); )
((Thread) ii.next()).interrupt();
if (factory != null)
factory.markBroken( this );
}
else
{
if ( logger.isLoggable( MLevel.WARNING ) )
logger.warning(this + " -- close() called multiple times.");
}
}
private void doCheckinManaged( final Object resc ) throws ResourcePoolException
{
assert !Thread.holdsLock( this );
if ( Debug.DEBUG && this.statusInPool( resc ) == KNOWN_AND_AVAILABLE )
throw new ResourcePoolException("Tried to check-in an already checked-in resource: " + resc);
synchronized ( this )
{
if (broken)
{
removeResource( resc, true );
return;
}
}
class RefurbishCheckinResourceTask implements Runnable
{
public void run()
{
boolean resc_okay = attemptRefurbishResourceOnCheckin( resc );
synchronized( BasicResourcePool.this )
{
PunchCard card = (PunchCard) managed.get( resc );
if ( resc_okay && card != null)
{
unused.add(0, resc );
card.last_checkin_time = System.currentTimeMillis();
card.checkout_time = -1;
}
else
{
if (card != null)
card.checkout_time = -1;
removeResource( resc );
ensureMinResources();
if (card == null && logger.isLoggable( MLevel.FINE ))
logger.fine("Resource " + resc + " was removed from the pool during its refurbishment for checkin.");
}
asyncFireResourceCheckedIn( resc, managed.size(), unused.size(), excluded.size() );
BasicResourcePool.this.notifyAll();
}
}
}
Runnable doMe = new RefurbishCheckinResourceTask();
if ( force_synchronous_checkins ) doMe.run();
else taskRunner.postRunnable( doMe );
}
private void doCheckinExcluded( Object resc )
{
assert Thread.holdsLock( this );
excluded.remove(resc);
destroyResource(resc);
}
private void awaitAvailable(long timeout) throws InterruptedException, TimeoutException, ResourcePoolException
{
assert Thread.holdsLock( this );
if (force_kill_acquires)
throw new ResourcePoolException("A ResourcePool cannot acquire a new resource -- the factory or source appears to be down.");
Thread t = Thread.currentThread();
try
{
acquireWaiters.add( t );
int avail;
long start = ( timeout > 0 ? System.currentTimeMillis() : -1);
if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX)
{
if ( logger.isLoggable( MLevel.FINE ) )
logger.fine("awaitAvailable(): " +
(exampleResource != null ?
exampleResource :
"[unknown]") );
trace();
}
while ((avail = unused.size()) == 0)
{
if (pending_acquires == 0 && managed.size() < max)
_recheckResizePool();
this.wait(timeout);
if (timeout > 0 && System.currentTimeMillis() - start > timeout)
throw new TimeoutException("A client timed out while waiting to acquire a resource from " + this + " -- timeout at awaitAvailable()");
if (force_kill_acquires)
throw new CannotAcquireResourceException("A ResourcePool could not acquire a resource from its primary factory or source.", getLastAcquisitionFailure());
ensureNotBroken();
}
}
finally
{
acquireWaiters.remove( t );
if (acquireWaiters.size() == 0)
this.notifyAll();
}
}
private void assimilateResource( Object resc ) throws Exception
{
assert Thread.holdsLock( this );
managed.put(resc, new PunchCard());
unused.add(0, resc);
asyncFireResourceAcquired( resc, managed.size(), unused.size(), excluded.size() );
this.notifyAll();
if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX) trace();
if (Debug.DEBUG && exampleResource == null)
exampleResource = resc;
}
private void synchronousRemoveArbitraryResource()
{
assert !Thread.holdsLock( this );
Object removeMe = null;
synchronized ( this )
{
if (unused.size() > 0)
{
removeMe = unused.get(0);
managed.remove(removeMe);
unused.remove(removeMe);
}
else
{
Set checkedOut = cloneOfManaged().keySet();
if ( checkedOut.isEmpty() )
{
unexpectedBreak();
logger.severe("A pool from which a resource is requested to be removed appears to have no managed resources?!");
}
else
excludeResource( checkedOut.iterator().next() );
}
}
if (removeMe != null)
destroyResource( removeMe, true );
}
private void removeResource(Object resc)
{ removeResource( resc, false ); }
private void removeResource(Object resc, boolean synchronous)
{
assert Thread.holdsLock( this );
PunchCard pc = (PunchCard) managed.remove(resc);
boolean checked_out = false;
if (pc != null)
{
checked_out = pc.checkout_time > 0;
if ( checked_out && !broken)
{
if (logger.isLoggable( MLevel.INFO ) )
{
logger.info("A checked-out resource is overdue, and will be destroyed: " + resc);
if (pc.checkoutStackTraceException != null)
{
logger.log( MLevel.INFO,
"Logging the stack trace by which the overdue resource was checked-out.",
pc.checkoutStackTraceException );
}
}
}
}
else if ( logger.isLoggable( MLevel.FINE ) )
logger.fine("Resource " + resc + " was removed twice. (Lotsa reasons a resource can be removed, sometimes simultaneously. It's okay)");
unused.remove(resc);
destroyResource(resc, synchronous, checked_out);
addToFormerResources( resc );
asyncFireResourceRemoved( resc, false, managed.size(), unused.size(), excluded.size() );
if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX) trace();
}
private void excludeResource(Object resc)
{
assert Thread.holdsLock( this );
managed.remove(resc);
excluded.add(resc);
if (Debug.DEBUG && unused.contains(resc) )
throw new InternalError( "We should only \"exclude\" checked-out resources!" );
if ( Debug.DEBUG && logger.isLoggable( MLevel.FINEST ) )
logger.log( MLevel.FINEST, "Excluded resource " + resc, new Exception("DEBUG STACK TRACE: Excluded resource stack trace"));
asyncFireResourceRemoved( resc, true, managed.size(), unused.size(), excluded.size() );
}
private void removeTowards( int new_sz )
{
assert Thread.holdsLock( this );
int num_to_remove = managed.size() - new_sz;
int count = 0;
for (Iterator ii = cloneOfUnused().iterator();
ii.hasNext() && count < num_to_remove;
++count)
{
Object resc = ii.next();
removeResource( resc );
}
}
private void cullExpired()
{
assert Thread.holdsLock( this );
if ( logger.isLoggable( MLevel.FINER ) )
logger.log( MLevel.FINER, "BEGIN check for expired resources. [" + this + "]");
Collection checkMe = ( destroy_unreturned_resc_time > 0 ? (Collection) cloneOfManaged().keySet() : cloneOfUnused() );
for ( Iterator ii = checkMe.iterator(); ii.hasNext(); )
{
Object resc = ii.next();
if ( shouldExpire( resc ) )
{
if ( logger.isLoggable( MLevel.FINER ) )
logger.log( MLevel.FINER, "Removing expired resource: " + resc + " [" + this + "]");
target_pool_size = Math.max( min, target_pool_size - 1 );
removeResource( resc );
if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX) trace();
}
}
if ( logger.isLoggable( MLevel.FINER ) )
logger.log( MLevel.FINER, "FINISHED check for expired resources. [" + this + "]");
ensureMinResources();
}
private void checkIdleResources()
{
assert Thread.holdsLock( this );
List u = cloneOfUnused();
for ( Iterator ii = u.iterator(); ii.hasNext(); )
{
Object resc = ii.next();
if ( idleCheckResources.add( resc ) )
taskRunner.postRunnable( new AsyncTestIdleResourceTask( resc ) );
}
if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX) trace();
}
private boolean shouldExpire( Object resc )
{
assert Thread.holdsLock( this );
boolean expired = false;
PunchCard pc = (PunchCard) managed.get( resc );
if (pc == null)
{
if ( logger.isLoggable( MLevel.FINE ) )
logger.fine( "Resource " + resc + " was being tested for expiration, but has already been removed from the pool.");
return true;
}
long now = System.currentTimeMillis();
if (pc.checkout_time < 0)
{
long idle_age = now - pc.last_checkin_time;
if (excess_max_idle_time > 0)
{
int msz = managed.size();
expired = (msz > min && idle_age > excess_max_idle_time);
if ( expired && logger.isLoggable( MLevel.FINER ) )
logger.log(MLevel.FINER,
"EXPIRED excess idle resource: " + resc +
" ---> idle_time: " + idle_age +
"; excess_max_idle_time: " + excess_max_idle_time +
"; pool_size: " + msz +
"; min_pool_size: " + min +
" [" + this + "]");
}
if (!expired && max_idle_time > 0)
{
expired = idle_age > max_idle_time;
if ( expired && logger.isLoggable( MLevel.FINER ) )
logger.log(MLevel.FINER,
"EXPIRED idle resource: " + resc +
" ---> idle_time: " + idle_age +
"; max_idle_time: " + max_idle_time +
" [" + this + "]");
}
if (!expired && max_resource_age > 0)
{
long abs_age = now - pc.acquisition_time;
expired = ( abs_age > max_resource_age );
if ( expired && logger.isLoggable( MLevel.FINER ) )
logger.log(MLevel.FINER,
"EXPIRED old resource: " + resc +
" ---> absolute_age: " + abs_age +
"; max_absolute_age: " + max_resource_age +
" [" + this + "]");
}
}
else
{
long checkout_age = now - pc.checkout_time;
expired = checkout_age > destroy_unreturned_resc_time;
}
return expired;
}
private void ensureStartResources()
{ recheckResizePool(); }
private void ensureMinResources()
{ recheckResizePool(); }
private boolean attemptRefurbishResourceOnCheckout( Object resc )
{
assert !Thread.holdsLock( this );
try
{
mgr.refurbishResourceOnCheckout(resc);
return true;
}
catch (Exception e)
{
if (Debug.DEBUG)
{
if (logger.isLoggable( MLevel.FINE ))
logger.log( MLevel.FINE, "A resource could not be refurbished for checkout. [" + resc + ']', e );
}
synchronized (this)
{
++failed_checkouts;
setLastCheckoutFailure(e);
}
return false;
}
}
private boolean attemptRefurbishResourceOnCheckin( Object resc )
{
assert !Thread.holdsLock( this );
try
{
mgr.refurbishResourceOnCheckin(resc);
return true;
}
catch (Exception e)
{
if (Debug.DEBUG)
{
if (logger.isLoggable( MLevel.FINE ))
logger.log( MLevel.FINE, "A resource could not be refurbished on checkin. [" + resc + ']', e );
}
synchronized (this)
{
++failed_checkins;
setLastCheckinFailure(e);
}
return false;
}
}
private void ensureNotBroken() throws ResourcePoolException
{
assert Thread.holdsLock( this );
if (broken)
throw new ResourcePoolException("Attempted to use a closed or broken resource pool");
}
private synchronized void syncTrace()
{ trace(); }
private void trace()
{
assert Thread.holdsLock( this );
if ( logger.isLoggable( MLevel.FINEST ) )
{
String exampleResStr = ( exampleResource == null ?
"" :
" (e.g. " + exampleResource +")");
logger.finest("trace " + this + " [managed: " + managed.size() + ", " +
"unused: " + unused.size() + ", excluded: " +
excluded.size() + ']' + exampleResStr );
}
}
private final HashMap cloneOfManaged()
{
assert Thread.holdsLock( this );
return (HashMap) managed.clone();
}
private final LinkedList cloneOfUnused()
{
assert Thread.holdsLock( this );
return (LinkedList) unused.clone();
}
private final HashSet cloneOfExcluded()
{
assert Thread.holdsLock( this );
return (HashSet) excluded.clone();
}
class ScatteredAcquireTask implements Runnable
{
int attempts_remaining;
ScatteredAcquireTask()
{ this ( (num_acq_attempts >= 0 ? num_acq_attempts : -1) , true ); }
private ScatteredAcquireTask(int attempts_remaining, boolean first_attempt)
{
this.attempts_remaining = attempts_remaining;
if (first_attempt)
{
incrementPendingAcquires();
if (logger.isLoggable(MLevel.FINEST))
logger.finest("Starting acquisition series. Incremented pending_acquires [" + pending_acquires + "], " +
" attempts_remaining: " + attempts_remaining);
}
else
{
if (logger.isLoggable(MLevel.FINEST))
logger.finest("Continuing acquisition series. pending_acquires [" + pending_acquires + "], " +
" attempts_remaining: " + attempts_remaining);
}
}
public void run()
{
boolean recheck = false;
try
{
boolean fkap;
boolean bkn;
synchronized( BasicResourcePool.this )
{
fkap = BasicResourcePool.this.force_kill_acquires;
bkn = BasicResourcePool.this.broken;
}
if (!bkn && !fkap)
{
BasicResourcePool.this.doAcquireAndDecrementPendingAcquiresWithinLockOnSuccess();
}
else
{
decrementPendingAcquires();
recheck = true;
}
try
{
if (logger.isLoggable(MLevel.FINEST))
logger.finest("Acquisition series terminated " +
(bkn ? "because this resource pool has been close()ed" : (fkap ? "because force-kill-acquires is pending" : "successfully") ) +
". Decremented pending_acquires [" + pending_acquires + "], " +
" attempts_remaining: " + attempts_remaining);
}
catch (Exception e)
{
System.err.println("Exception during logging:");
e.printStackTrace();
}
}
catch (Exception e)
{
BasicResourcePool.this.setLastAcquisitionFailure(e);
if (attempts_remaining == 0)
{
decrementPendingAcquires();
if ( logger.isLoggable( MLevel.WARNING ) )
{
logger.log( MLevel.WARNING,
this + " -- Acquisition Attempt Failed!!! Clearing pending acquires. " +
"While trying to acquire a needed new resource, we failed " +
"to succeed more than the maximum number of allowed " +
"acquisition attempts (" + num_acq_attempts + "). " +
"Last acquisition attempt exception: ",
e);
}
if (break_on_acquisition_failure)
{
if ( logger.isLoggable( MLevel.SEVERE ) )
logger.severe("A RESOURCE POOL IS PERMANENTLY BROKEN! [" + this + "] " +
"(because a series of " + num_acq_attempts + " acquisition attempts " +
"failed.)");
unexpectedBreak();
}
else
{
try { forceKillAcquires(); }
catch (InterruptedException ie)
{
if ( logger.isLoggable(MLevel.WARNING) )
logger.log(MLevel.WARNING,
"Failed to force-kill pending acquisition attempts after acquisition failue, " +
" due to an InterruptedException!",
ie );
recheck = true;
}
}
if (logger.isLoggable(MLevel.FINEST))
logger.finest("Acquisition series terminated unsuccessfully. Decremented pending_acquires [" + pending_acquires + "], " +
" attempts_remaining: " + attempts_remaining);
}
else
{
MLevel logLevel = (attempts_remaining > 0 ? MLevel.FINE : MLevel.INFO);
if (logger.isLoggable( logLevel ))
logger.log( logLevel, "An exception occurred while acquiring a poolable resource. Will retry.", e );
TimerTask doNextAcquire = new TimerTask()
{
public void run()
{ taskRunner.postRunnable( new ScatteredAcquireTask( attempts_remaining - 1, false ) ); }
};
cullAndIdleRefurbishTimer.schedule( doNextAcquire, acq_attempt_delay );
}
}
finally
{
if (recheck)
recheckResizePool();
}
}
}
class AcquireTask implements Runnable
{
boolean success = false;
public AcquireTask()
{ incrementPendingAcquires(); }
public void run()
{
boolean decremented = false;
boolean recheck = false;
try
{
Exception lastException = null;
for (int i = 0; shouldTry( i ); ++i)
{
try
{
if (i > 0)
Thread.sleep(acq_attempt_delay);
if ( goodAttemptNumber( i + 1 ) )
{
BasicResourcePool.this.doAcquireAndDecrementPendingAcquiresWithinLockOnSuccess();
decremented = true;
}
else
{
decremented = true;
recheck = true;
BasicResourcePool.this.doAcquireAndDecrementPendingAcquiresWithinLockAlways();
}
success = true;
}
catch (InterruptedException e)
{
throw e;
}
catch (Exception e)
{
MLevel logLevel = (num_acq_attempts > 0 ? MLevel.FINE : MLevel.INFO);
if (logger.isLoggable( logLevel ))
logger.log( logLevel, "An exception occurred while acquiring a poolable resource. Will retry.", e );
lastException = e;
setLastAcquisitionFailure(e);
}
}
if (!success)
{
if ( logger.isLoggable( MLevel.WARNING ) )
{
logger.log( MLevel.WARNING,
this + " -- Acquisition Attempt Failed!!! Clearing pending acquires. " +
"While trying to acquire a needed new resource, we failed " +
"to succeed more than the maximum number of allowed " +
"acquisition attempts (" + num_acq_attempts + "). " +
(lastException == null ? "" : "Last acquisition attempt exception: "),
lastException);
}
if (break_on_acquisition_failure)
{
if ( logger.isLoggable( MLevel.SEVERE ) )
logger.severe("A RESOURCE POOL IS PERMANENTLY BROKEN! [" + this + "]");
unexpectedBreak();
}
else
forceKillAcquires();
}
else
recheckResizePool();
}
catch ( ResourceClosedException e )
{
if ( Debug.DEBUG )
{
if ( logger.isLoggable( MLevel.FINE ) )
logger.log( MLevel.FINE, "a resource pool async thread died.", e );
}
unexpectedBreak();
}
catch (InterruptedException e)
{
if ( logger.isLoggable( MLevel.WARNING ) )
{
logger.log( MLevel.WARNING,
BasicResourcePool.this + " -- Thread unexpectedly interrupted while performing an acquisition attempt.",
e );
}
recheckResizePool();
}
finally
{
if (! decremented)
decrementPendingAcquires();
if (recheck)
recheckResizePool();
}
}
private boolean shouldTry(int attempt_num)
{
return
!success &&
!isForceKillAcquiresPending() &&
goodAttemptNumber( attempt_num );
}
private boolean goodAttemptNumber(int attempt_num)
{ return (num_acq_attempts <= 0 || attempt_num < num_acq_attempts); }
}
class RemoveTask implements Runnable
{
public RemoveTask()
{ incrementPendingRemoves(); }
public void run()
{
try
{
synchronousRemoveArbitraryResource();
recheckResizePool();
}
finally
{ decrementPendingRemoves(); }
}
}
class CullTask extends TimerTask
{
public void run()
{
try
{
if (Debug.DEBUG && Debug.TRACE >= Debug.TRACE_MED && logger.isLoggable( MLevel.FINER ))
logger.log( MLevel.FINER, "Checking for expired resources - " + new Date() + " [" + BasicResourcePool.this + "]");
synchronized ( BasicResourcePool.this )
{ cullExpired(); }
}
catch ( ResourceClosedException e )
{
if ( Debug.DEBUG )
{
if ( logger.isLoggable( MLevel.FINE ) )
logger.log( MLevel.FINE, "a resource pool async thread died.", e );
}
unexpectedBreak();
}
}
}
class CheckIdleResourcesTask extends TimerTask
{
public void run()
{
try
{
if (Debug.DEBUG && Debug.TRACE >= Debug.TRACE_MED && logger.isLoggable(MLevel.FINER))
logger.log(MLevel.FINER, "Refurbishing idle resources - " + new Date() + " [" + BasicResourcePool.this + "]");
synchronized ( BasicResourcePool.this )
{ checkIdleResources(); }
}
catch ( ResourceClosedException e )
{
if ( Debug.DEBUG )
{
if ( logger.isLoggable( MLevel.FINE ) )
logger.log( MLevel.FINE, "a resource pool async thread died.", e );
}
unexpectedBreak();
}
}
}
class AsyncTestIdleResourceTask implements Runnable
{
Object resc;
boolean pending = true;
boolean failed;
AsyncTestIdleResourceTask( Object resc )
{ this.resc = resc; }
public void run()
{
assert !Thread.holdsLock( BasicResourcePool.this );
try
{
try
{
mgr.refurbishIdleResource( resc );
}
catch ( Exception e )
{
if ( logger.isLoggable( MLevel.FINE ) )
logger.log( MLevel.FINE, "BasicResourcePool: An idle resource is broken and will be purged. [" + resc + ']', e);
synchronized (BasicResourcePool.this)
{
if ( managed.keySet().contains( resc ) )
{
removeResource( resc );
ensureMinResources();
}
++failed_idle_tests;
setLastIdleCheckFailure(e);
}
}
}
finally
{
synchronized (BasicResourcePool.this)
{
idleCheckResources.remove( resc );
BasicResourcePool.this.notifyAll();
}
}
}
}
final static class PunchCard
{
long acquisition_time;
long last_checkin_time;
long checkout_time;
Exception checkoutStackTraceException;
PunchCard()
{
this.acquisition_time = System.currentTimeMillis();
this.last_checkin_time = acquisition_time;
this.checkout_time = -1;
this.checkoutStackTraceException = null;
}
}
}