/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package org.apache.http.pool;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.http.annotation.Contract;
import org.apache.http.annotation.ThreadingBehavior;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.util.Args;
import org.apache.http.util.Asserts;

Abstract synchronous (blocking) pool of connections.

Please note that this class does not maintain its own pool of execution Threads. Therefore, one must call Future.get() or Future.get(long, TimeUnit) method on the Future object returned by the lease(Object, Object, FutureCallback) method in order for the lease operation to complete.

Type parameters:
  • <T> – the route type that represents the opposite endpoint of a pooled connection.
  • <C> – the connection type.
  • <E> – the type of the pool entry containing a pooled connection.
Since:4.2
/** * Abstract synchronous (blocking) pool of connections. * <p> * Please note that this class does not maintain its own pool of execution {@link Thread}s. * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)} * method on the {@link Future} object returned by the * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation * to complete. * * @param <T> the route type that represents the opposite endpoint of a pooled * connection. * @param <C> the connection type. * @param <E> the type of the pool entry containing a pooled connection. * @since 4.2 */
@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL) public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>> implements ConnPool<T, E>, ConnPoolControl<T> { private final Lock lock; private final Condition condition; private final ConnFactory<T, C> connFactory; private final Map<T, RouteSpecificPool<T, C, E>> routeToPool; private final Set<E> leased; private final LinkedList<E> available; private final LinkedList<Future<E>> pending; private final Map<T, Integer> maxPerRoute; private volatile boolean isShutDown; private volatile int defaultMaxPerRoute; private volatile int maxTotal; private volatile int validateAfterInactivity; public AbstractConnPool( final ConnFactory<T, C> connFactory, final int defaultMaxPerRoute, final int maxTotal) { super(); this.connFactory = Args.notNull(connFactory, "Connection factory"); this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value"); this.maxTotal = Args.positive(maxTotal, "Max total value"); this.lock = new ReentrantLock(); this.condition = this.lock.newCondition(); this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>(); this.leased = new HashSet<E>(); this.available = new LinkedList<E>(); this.pending = new LinkedList<Future<E>>(); this.maxPerRoute = new HashMap<T, Integer>(); }
Creates a new entry for the given connection with the given route.
/** * Creates a new entry for the given connection with the given route. */
protected abstract E createEntry(T route, C conn);
Since:4.3
/** * @since 4.3 */
protected void onLease(final E entry) { }
Since:4.3
/** * @since 4.3 */
protected void onRelease(final E entry) { }
Since:4.4
/** * @since 4.4 */
protected void onReuse(final E entry) { }
Since:4.4
/** * @since 4.4 */
protected boolean validate(final E entry) { return true; } public boolean isShutdown() { return this.isShutDown; }
Shuts down the pool.
/** * Shuts down the pool. */
public void shutdown() throws IOException { if (this.isShutDown) { return ; } this.isShutDown = true; this.lock.lock(); try { for (final E entry: this.available) { entry.close(); } for (final E entry: this.leased) { entry.close(); } for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) { pool.shutdown(); } this.routeToPool.clear(); this.leased.clear(); this.available.clear(); } finally { this.lock.unlock(); } } private RouteSpecificPool<T, C, E> getPool(final T route) { RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route); if (pool == null) { pool = new RouteSpecificPool<T, C, E>(route) { @Override protected E createEntry(final C conn) { return AbstractConnPool.this.createEntry(route, conn); } }; this.routeToPool.put(route, pool); } return pool; } private static Exception operationAborted() { return new CancellationException("Operation aborted"); }
{@inheritDoc}

Please note that this class does not maintain its own pool of execution Threads. Therefore, one must call Future.get() or Future.get(long, TimeUnit) method on the Future returned by this method in order for the lease operation to complete.

/** * {@inheritDoc} * <p> * Please note that this class does not maintain its own pool of execution * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()} * or {@link Future#get(long, TimeUnit)} method on the {@link Future} * returned by this method in order for the lease operation to complete. */
@Override public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) { Args.notNull(route, "Route"); Asserts.check(!this.isShutDown, "Connection pool shut down"); return new Future<E>() { private final AtomicBoolean cancelled = new AtomicBoolean(false); private final AtomicBoolean done = new AtomicBoolean(false); private final AtomicReference<E> entryRef = new AtomicReference<E>(null); @Override public boolean cancel(final boolean mayInterruptIfRunning) { if (done.compareAndSet(false, true)) { cancelled.set(true); lock.lock(); try { condition.signalAll(); } finally { lock.unlock(); } if (callback != null) { callback.cancelled(); } return true; } return false; } @Override public boolean isCancelled() { return cancelled.get(); } @Override public boolean isDone() { return done.get(); } @Override public E get() throws InterruptedException, ExecutionException { try { return get(0L, TimeUnit.MILLISECONDS); } catch (final TimeoutException ex) { throw new ExecutionException(ex); } } @Override public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { for (;;) { synchronized (this) { try { final E entry = entryRef.get(); if (entry != null) { return entry; } if (done.get()) { throw new ExecutionException(operationAborted()); } final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this); if (validateAfterInactivity > 0) { if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) { if (!validate(leasedEntry)) { leasedEntry.close(); release(leasedEntry, false); continue; } } } if (done.compareAndSet(false, true)) { entryRef.set(leasedEntry); done.set(true); onLease(leasedEntry); if (callback != null) { callback.completed(leasedEntry); } return leasedEntry; } else { release(leasedEntry, true); throw new ExecutionException(operationAborted()); } } catch (final IOException ex) { if (done.compareAndSet(false, true)) { if (callback != null) { callback.failed(ex); } } throw new ExecutionException(ex); } } } } }; }
Attempts to lease a connection for the given route and with the given state from the pool.

Please note that this class does not maintain its own pool of execution Threads. Therefore, one must call Future.get() or Future.get(long, TimeUnit) method on the Future returned by this method in order for the lease operation to complete.

Params:
  • route – route of the connection.
  • state – arbitrary object that represents a particular state (usually a security principal or a unique token identifying the user whose credentials have been used while establishing the connection). May be null.
Returns:future for a leased pool entry.
/** * Attempts to lease a connection for the given route and with the given * state from the pool. * <p> * Please note that this class does not maintain its own pool of execution * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()} * or {@link Future#get(long, TimeUnit)} method on the {@link Future} * returned by this method in order for the lease operation to complete. * * @param route route of the connection. * @param state arbitrary object that represents a particular state * (usually a security principal or a unique token identifying * the user whose credentials have been used while establishing the connection). * May be {@code null}. * @return future for a leased pool entry. */
public Future<E> lease(final T route, final Object state) { return lease(route, state, null); } private E getPoolEntryBlocking( final T route, final Object state, final long timeout, final TimeUnit timeUnit, final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException { Date deadline = null; if (timeout > 0) { deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout)); } this.lock.lock(); try { final RouteSpecificPool<T, C, E> pool = getPool(route); E entry; for (;;) { Asserts.check(!this.isShutDown, "Connection pool shut down"); if (future.isCancelled()) { throw new ExecutionException(operationAborted()); } for (;;) { entry = pool.getFree(state); if (entry == null) { break; } if (entry.isExpired(System.currentTimeMillis())) { entry.close(); } if (entry.isClosed()) { this.available.remove(entry); pool.free(entry, false); } else { break; } } if (entry != null) { this.available.remove(entry); this.leased.add(entry); onReuse(entry); return entry; } // New connection is needed final int maxPerRoute = getMax(route); // Shrink the pool prior to allocating a new connection final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute); if (excess > 0) { for (int i = 0; i < excess; i++) { final E lastUsed = pool.getLastUsed(); if (lastUsed == null) { break; } lastUsed.close(); this.available.remove(lastUsed); pool.remove(lastUsed); } } if (pool.getAllocatedCount() < maxPerRoute) { final int totalUsed = this.leased.size(); final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0); if (freeCapacity > 0) { final int totalAvailable = this.available.size(); if (totalAvailable > freeCapacity - 1) { if (!this.available.isEmpty()) { final E lastUsed = this.available.removeLast(); lastUsed.close(); final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute()); otherpool.remove(lastUsed); } } final C conn = this.connFactory.create(route); entry = pool.add(conn); this.leased.add(entry); return entry; } } boolean success = false; try { pool.queue(future); this.pending.add(future); if (deadline != null) { success = this.condition.awaitUntil(deadline); } else { this.condition.await(); success = true; } if (future.isCancelled()) { throw new ExecutionException(operationAborted()); } } finally { // In case of 'success', we were woken up by the // connection pool and should now have a connection // waiting for us, or else we're shutting down. // Just continue in the loop, both cases are checked. pool.unqueue(future); this.pending.remove(future); } // check for spurious wakeup vs. timeout if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) { break; } } throw new TimeoutException("Timeout waiting for connection"); } finally { this.lock.unlock(); } } @Override public void release(final E entry, final boolean reusable) { this.lock.lock(); try { if (this.leased.remove(entry)) { final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute()); pool.free(entry, reusable); if (reusable && !this.isShutDown) { this.available.addFirst(entry); } else { entry.close(); } onRelease(entry); Future<E> future = pool.nextPending(); if (future != null) { this.pending.remove(future); } else { future = this.pending.poll(); } if (future != null) { this.condition.signalAll(); } } } finally { this.lock.unlock(); } } private int getMax(final T route) { final Integer v = this.maxPerRoute.get(route); return v != null ? v.intValue() : this.defaultMaxPerRoute; } @Override public void setMaxTotal(final int max) { Args.positive(max, "Max value"); this.lock.lock(); try { this.maxTotal = max; } finally { this.lock.unlock(); } } @Override public int getMaxTotal() { this.lock.lock(); try { return this.maxTotal; } finally { this.lock.unlock(); } } @Override public void setDefaultMaxPerRoute(final int max) { Args.positive(max, "Max per route value"); this.lock.lock(); try { this.defaultMaxPerRoute = max; } finally { this.lock.unlock(); } } @Override public int getDefaultMaxPerRoute() { this.lock.lock(); try { return this.defaultMaxPerRoute; } finally { this.lock.unlock(); } } @Override public void setMaxPerRoute(final T route, final int max) { Args.notNull(route, "Route"); this.lock.lock(); try { if (max > -1) { this.maxPerRoute.put(route, Integer.valueOf(max)); } else { this.maxPerRoute.remove(route); } } finally { this.lock.unlock(); } } @Override public int getMaxPerRoute(final T route) { Args.notNull(route, "Route"); this.lock.lock(); try { return getMax(route); } finally { this.lock.unlock(); } } @Override public PoolStats getTotalStats() { this.lock.lock(); try { return new PoolStats( this.leased.size(), this.pending.size(), this.available.size(), this.maxTotal); } finally { this.lock.unlock(); } } @Override public PoolStats getStats(final T route) { Args.notNull(route, "Route"); this.lock.lock(); try { final RouteSpecificPool<T, C, E> pool = getPool(route); return new PoolStats( pool.getLeasedCount(), pool.getPendingCount(), pool.getAvailableCount(), getMax(route)); } finally { this.lock.unlock(); } }
Returns snapshot of all knows routes
Returns:the set of routes
Since:4.4
/** * Returns snapshot of all knows routes * @return the set of routes * * @since 4.4 */
public Set<T> getRoutes() { this.lock.lock(); try { return new HashSet<T>(routeToPool.keySet()); } finally { this.lock.unlock(); } }
Enumerates all available connections.
Since:4.3
/** * Enumerates all available connections. * * @since 4.3 */
protected void enumAvailable(final PoolEntryCallback<T, C> callback) { this.lock.lock(); try { final Iterator<E> it = this.available.iterator(); while (it.hasNext()) { final E entry = it.next(); callback.process(entry); if (entry.isClosed()) { final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute()); pool.remove(entry); it.remove(); } } purgePoolMap(); } finally { this.lock.unlock(); } }
Enumerates all leased connections.
Since:4.3
/** * Enumerates all leased connections. * * @since 4.3 */
protected void enumLeased(final PoolEntryCallback<T, C> callback) { this.lock.lock(); try { final Iterator<E> it = this.leased.iterator(); while (it.hasNext()) { final E entry = it.next(); callback.process(entry); } } finally { this.lock.unlock(); } } private void purgePoolMap() { final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator(); while (it.hasNext()) { final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next(); final RouteSpecificPool<T, C, E> pool = entry.getValue(); if (pool.getPendingCount() + pool.getAllocatedCount() == 0) { it.remove(); } } }
Closes connections that have been idle longer than the given period of time and evicts them from the pool.
Params:
  • idletime – maximum idle time.
  • timeUnit – time unit.
/** * Closes connections that have been idle longer than the given period * of time and evicts them from the pool. * * @param idletime maximum idle time. * @param timeUnit time unit. */
public void closeIdle(final long idletime, final TimeUnit timeUnit) { Args.notNull(timeUnit, "Time unit"); long time = timeUnit.toMillis(idletime); if (time < 0) { time = 0; } final long deadline = System.currentTimeMillis() - time; enumAvailable(new PoolEntryCallback<T, C>() { @Override public void process(final PoolEntry<T, C> entry) { if (entry.getUpdated() <= deadline) { entry.close(); } } }); }
Closes expired connections and evicts them from the pool.
/** * Closes expired connections and evicts them from the pool. */
public void closeExpired() { final long now = System.currentTimeMillis(); enumAvailable(new PoolEntryCallback<T, C>() { @Override public void process(final PoolEntry<T, C> entry) { if (entry.isExpired(now)) { entry.close(); } } }); }
Returns:the number of milliseconds
Since:4.4
/** * @return the number of milliseconds * @since 4.4 */
public int getValidateAfterInactivity() { return this.validateAfterInactivity; }
Params:
  • ms – the number of milliseconds
Since:4.4
/** * @param ms the number of milliseconds * @since 4.4 */
public void setValidateAfterInactivity(final int ms) { this.validateAfterInactivity = ms; } @Override public String toString() { this.lock.lock(); try { final StringBuilder buffer = new StringBuilder(); buffer.append("[leased: "); buffer.append(this.leased); buffer.append("][available: "); buffer.append(this.available); buffer.append("][pending: "); buffer.append(this.pending); buffer.append("]"); return buffer.toString(); } finally { this.lock.unlock(); } } }