/*
* XAPool: Open Source XA JDBC Pool
* Copyright (C) 2003 Objectweb.org
* Initial Developer: Lutris Technologies Inc.
* Contact: xapool-public@lists.debian-sf.objectweb.org
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA
*/
package org.enhydra.jdbc.standard;
import org.enhydra.jdbc.util.Logger;
import org.apache.commons.logging.LogFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Vector;
import java.util.Iterator;
import javax.sql.XAConnection;
import javax.sql.XADataSource;
import javax.transaction.Status;
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
import javax.transaction.TransactionManager;
Data source for creating StandardXAConnections.
/**
* Data source for creating StandardXAConnections.
*/
public class StandardXADataSource
extends StandardConnectionPoolDataSource
implements XADataSource {
public int minCon; // minimum number of connections
public int maxCon; // maximum number of connections
public long deadLockMaxWait;
// time (in ms) to wait before return an exception
Vector freeConnections; // connections not currently associated with an XID
Hashtable xidConnections; // connections currently associated with an XID
Hashtable deadConnections;
// connections which should be discarded when the transaction finishes
public int connectionCount = 0; // total number of connections created
public long deadLockRetryWait; // time to wait before 2 try of loop
transient public TransactionManager transactionManager;
public static final int DEFAULT_MIN_CON = 50;
// minimum number of connections
public static final int DEFAULT_MAX_CON = 0;
// maximum number of connections
public static final long DEFAULT_DEADLOCKMAXWAIT = 300000; // 5 minutes
public static final int DEFAULT_DEADLOCKRETRYWAIT = 10000; // 10 seconds
Constructor
/**
* Constructor
*/
public StandardXADataSource() {
super();
minCon = DEFAULT_MIN_CON;
maxCon = DEFAULT_MAX_CON;
deadLockMaxWait = DEFAULT_DEADLOCKMAXWAIT;
deadLockRetryWait = DEFAULT_DEADLOCKRETRYWAIT;
freeConnections = new Vector(minCon, 1);
// allow a reasonable size for free connections
xidConnections = new Hashtable(minCon * 2, 0.5f);
// ...and same for used connections
log = new Logger(LogFactory.getLog("org.enhydra.jdbc.xapool"));
log.debug("StandardXADataSource is created");
}
public int getConnectionCount() {
return connectionCount;
}
public Hashtable getXidConnections() {
return xidConnections;
}
Creates an XA connection using the default username and password.
/**
* Creates an XA connection using the default username and password.
*/
public XAConnection getXAConnection() throws SQLException {
log.debug(
"StandardXADataSource:getXAConnection(0) XA connection returned");
return getXAConnection(user, password);
}
Creates an XA connection using the supplied username and password.
/**
* Creates an XA connection using the supplied username and password.
*/
public synchronized XAConnection getXAConnection(
String user,
String password)
throws SQLException {
log.debug("StandardXADataSource:getXAConnection(user, password)");
StandardXAConnection xac =
new StandardXAConnection(this, user, password);
xac.setTransactionManager(transactionManager);
xac.setLogger(log);
connectionCount++;
return xac;
}
public void setTransactionManager(TransactionManager tm) {
log.debug("StandardXADataSource:setTransactionManager");
this.transactionManager = tm;
}
public TransactionManager getTransactionManager() {
return transactionManager;
}
public void setUser(String user) {
log.debug("StandardXADataSource:setUser");
if (((user == null) || (getUser() == null))
? user != getUser()
: !user.equals(getUser())) {
super.setUser(user);
resetCache();
}
}
public void setPassword(String password) {
log.debug("StandardXADataSource:setPassword");
if (((password == null) || (getPassword() == null))
? password != getPassword()
: !password.equals(getPassword())) {
super.setPassword(password);
resetCache();
}
}
public void setUrl(String url) {
if (((url == null) || (getUrl() == null))
? url != getUrl()
: !url.equals(getUrl())) {
super.setUrl(url);
resetCache();
}
}
public void setDriverName(String driverName) throws SQLException {
if ((driverName == null && getDriverName() != null)
|| (!driverName.equals(getDriverName()))) {
super.setDriverName(driverName);
resetCache();
}
}
private synchronized void resetCache() {
log.debug("StandardXADataSource:resetCache");
// deadConnections will temporarily hold pointers to the
// current ongoing transactions. These will be discarded when
// freed
deadConnections = (Hashtable) xidConnections.clone();
deadConnections.putAll(xidConnections);
// now we'll just clear out the freeConnections
Enumeration enum = freeConnections.elements();
while (enum.hasMoreElements()) {
StandardXAStatefulConnection xasc =
(StandardXAStatefulConnection) enum.nextElement();
try {
log.debug(
"StandardXADataSource:resetCache closing Connection:"
+ xasc.con);
xasc.con.close();
} catch (SQLException e) {
log.error(
"StandardXADataSource:resetCache Error closing connection:"
+ xasc.con);
}
freeConnections.removeElement(xasc);
}
}
Called when an XA connection gets closed. When they have all
been closed then any remaining physical connections are also
closed.
/**
* Called when an XA connection gets closed. When they have all
* been closed then any remaining physical connections are also
* closed.
*/
synchronized void connectionClosed() throws SQLException {
log.debug("StandardXADataSource:connectionClosed");
connectionCount--; // one more connection closed
if (connectionCount == 0) { // if no connections left
// Close any connections still associated with XIDs.
Enumeration cons = xidConnections.keys();
// used to iterate through the used connections
while (cons.hasMoreElements()) {
// while there are more connections
Object key = cons.nextElement(); // get the next connection
StandardXAStatefulConnection cur =
(StandardXAStatefulConnection)xidConnections.remove(key);
if(cur != null)
{
cur.con.close(); // close the physical connection
}
// cast to something more convenient
log.debug(
"StandardXADataSource:connectionClosed close physical connection");
}
Iterator connIterator = freeConnections.iterator();
while (connIterator.hasNext())
{
StandardXAStatefulConnection cur =(StandardXAStatefulConnection)connIterator.next();
cur.con.close();
connIterator.remove();
log.debug(
"StandardXADataSource:connectionClosed close any free connections");
}
}
}
Returns the number of connections that are either
prepared or heuristically completed.
/**
* Returns the number of connections that are either
* prepared or heuristically completed.
*/
public int getXidCount() {
int count = 0; // the return value
Enumeration cons = xidConnections.elements();
// used to iterate through the used connections
while (cons.hasMoreElements()) { // while there are more connections
Object o = cons.nextElement(); // get the next connection
StandardXAStatefulConnection cur = (StandardXAStatefulConnection) o;
// cast to something more convenient
if ((cur.getState() == Status.STATUS_PREPARED)
|| // if prepared
(
cur.getState() == Status.STATUS_PREPARING)) {
// ...or heuristically committed
count++; // one more connection with a valid xid
}
}
log.debug(
"StandardXADataSource:getXidCount return XidCount=<" + count + ">");
return count;
}
Constructs a list of all prepared connections' xids.
/**
* Constructs a list of all prepared connections' xids.
*/
Xid[] recover() {
int nodeCount = getXidCount();
// get number of connections in transactions
Xid[] xids = new Xid[nodeCount]; // create the return array
int i = 0; // used as xids index
Enumeration cons = xidConnections.elements();
// used to iterate through the used connections
while (cons.hasMoreElements()) { // while there are more connections
Object o = cons.nextElement(); // get the next connection
StandardXAStatefulConnection cur = (StandardXAStatefulConnection) o;
// cast to something more convenient
if ((cur.getState() == Status.STATUS_PREPARED)
|| // if prepared
(
cur.getState() == Status.STATUS_PREPARING)) {
// ...or heuristically committed
xids[i++] = cur.xid; // save in list
}
}
return xids;
}
Frees a connection to make it eligible for reuse. The free list
is normally a last in, first out list (LIFO). This is efficient.
However, timed out connections are nice to hang onto for error
reporting, so they can be placed at the start. This is less
efficient, but hopefully is a rare occurence.
Here, no need to verify the number of connections, we remove an
object from the xidConnections to put it in th freeConnections
/**
* Frees a connection to make it eligible for reuse. The free list
* is normally a last in, first out list (LIFO). This is efficient.
* However, timed out connections are nice to hang onto for error
* reporting, so they can be placed at the start. This is less
* efficient, but hopefully is a rare occurence.
*
* Here, no need to verify the number of connections, we remove an
* object from the xidConnections to put it in th freeConnections
*
*/
public synchronized void freeConnection(Xid id, boolean placeAtStart) {
log.debug("StandardXADataSource:freeConnection");
Object o = xidConnections.get(id); // lookup the connection by XID
StandardXAStatefulConnection cur = (StandardXAStatefulConnection) o;
// cast to something more convenient
xidConnections.remove(id); // remove connection from in use list
log.debug(
"StandardXADataSource:freeConnection remove id from xidConnections");
if (!deadConnections.containsKey(id)) {
// if this isn't to be discarded
/*
try {
log.debug("StandardXADataSource:freeConnection setAutoCommit(true):" + cur.id);
log.debug("con='"+cur.con.toString()+"'");
cur.con.setAutoCommit(acommit);
} catch(SQLException e) {
log.error("ERROR: Failed while autocommiting a connection: "+e);
}
*/
cur.setState(Status.STATUS_NO_TRANSACTION);
// set its new internal state
if (!freeConnections.contains(cur)) {
if (placeAtStart) { // if we want to keep for as long as possible
freeConnections.insertElementAt(cur, 0);
// then place it at the start of the list
} else {
freeConnections.addElement(cur);
// otherwise it's a LIFO list
}
}
} else {
deadConnections.remove(id);
try {
cur.con.close();
} catch (SQLException e) {
//ignore
}
}
notify();
}
Invoked by the timer thread to check all transactions
for timeouts. Returns the time of the next timeout event
after current timeouts have expired.
/**
* Invoked by the timer thread to check all transactions
* for timeouts. Returns the time of the next timeout event
* after current timeouts have expired.
*/
synchronized long checkTimeouts(long curTime) throws SQLException {
//log.debug("StandardXADataSource:checkTimeouts");
long nextTimeout = 0; // the earliest non-expired timeout in the list
Enumeration cons = xidConnections.elements();
// used to iterate through the used connections
while (cons.hasMoreElements()) { // while there are more connections
Object o = cons.nextElement(); // get the next connection
StandardXAStatefulConnection cur = (StandardXAStatefulConnection) o;
// cast to something more convenient
if ((cur.timeout != 0)
&& // if connection has a timeout
(
curTime > cur.timeout)) {
// ...and transaction has timed out
//log.debug("StandardXADataSource:checkTimeouts connection timeout");
cur.con.rollback();
// undo everything to do with this transaction
cur.timedOut = true; // flag that it has timed out
//log.debug(cur.toString()+" timed out");
freeConnection(cur.xid, true);
// make the connection eligible for reuse
// The timed out connection is eligible for reuse. The Xid and timedOut
// flag will nevertheless remain valid until it is reallocated to another
// global transaction. This gives the TM a *chance* to get a timeout
// exception, but we won't hang on to it forever.
} else { // transaction has not timed out
if (cur.timeout != 0) { // but it has a timeout scheduled
if ((cur.timeout < nextTimeout)
|| // and it's the next timeout to expire
(
nextTimeout == 0)) {
// ...or first timeout we've found
nextTimeout = cur.timeout; // set up next timeout
}
}
}
}
return nextTimeout;
}
Checks the start of the free list to see if the connection
previously associated with the supplied Xid has timed out.
Note that this can be an expensive operation as it has to
scan all free connections. so it should only be called in
the event of an error.
/**
* Checks the start of the free list to see if the connection
* previously associated with the supplied Xid has timed out.
* <P>
* Note that this can be an expensive operation as it has to
* scan all free connections. so it should only be called in
* the event of an error.
*/
synchronized private void checkTimeouts(Xid xid) throws XAException {
log.debug("StandardXADataSource:checkTimeouts");
for (int i = 0;
i < freeConnections.size();
i++) { // check each free connection
Object o = freeConnections.elementAt(i); // get next connection
StandardXAStatefulConnection cur = (StandardXAStatefulConnection) o;
// cast to something more convenient
if (!cur.timedOut) { // if it hasn't timed out
continue; // skip it
}
log.debug(
"StandardXADataSource:checkTimeouts ("
+ i
+ "/"
+ freeConnections.size()
+ ") xid = "
+ xid);
log.debug(
"StandardXADataSource:checkTimeouts cur.xid = " + cur.xid);
if (xid.equals(cur.xid)) { // if we've found our xid
cur.timedOut = false; // cancel time out
throw new XAException(XAException.XA_RBTIMEOUT);
}
}
}
Returns the connection associated with a given XID.
is reached, the Xid is found or an exception is thrown.
/**
* Returns the connection associated with a given XID.
* is reached, the Xid is found or an exception is thrown.
*/
synchronized StandardXAStatefulConnection getConnection(
Xid xid,
boolean mustFind)
throws XAException {
log.debug(
"StandardXADataSource:getConnection (xid="
+ xid
+ ", mustFind="
+ mustFind
+ ")");
Object o = xidConnections.get(xid); // lookup the connection by XID
log.debug("XID: " + o);
StandardXAStatefulConnection cur = (StandardXAStatefulConnection) o;
// cast to something more convenient
if (mustFind) { // if we expected to find the connection
if (cur == null) { // and we didn't
log.debug(
"StandardXADataSource:getConnection (StatefulConnection is null)");
checkTimeouts(xid); // see if it's been freed during a timeout
throw new XAException(XAException.XAER_NOTA);
// not a valid XID
}
} else { // didn't expect to find the connection
if (cur != null) { // but we found it anyway
throw new XAException(XAException.XAER_DUPID); // duplicate XID
}
}
log.debug(
"StandardXADataSource:getConnection return connection associated with a given XID");
return cur;
}
Returns a connection from the free list, removing it
in the process. If none area available then a new
connection is created.
/**
* Returns a connection from the free list, removing it
* in the process. If none area available then a new
* connection is created.
*/
synchronized StandardXAStatefulConnection getFreeConnection()
throws SQLException {
log.debug("StandardXADataSource:getFreeConnection");
StandardXAStatefulConnection cur = null;
// this will be the return value
int freeCount = freeConnections.size();
// get number of free connections
if (freeCount == 0) { // if there are no free connections
log.debug(
"StandardXADataSource:getFreeConnection there are no free connections, get a new database connection");
Connection con = super.getConnection(user, password);
// get a new database connection
cur = new StandardXAStatefulConnection(this, con);
// make the connection stateful
} else {
Object o = freeConnections.lastElement(); // get the last element
cur = (StandardXAStatefulConnection) o;
// cast to something more convenient
freeConnections.removeElementAt(freeCount - 1);
// remove from free list
cur.timeout = 0; // no timeout until start() called
cur.timedOut = false; // cancel any time old out
}
log.debug(
"StandardXADataSource:getFreeConnection return a connection from the free list");
try {
log.debug(
"StandardXADataSource:getFreeConnection setAutoCommit(true)");
// changed by cney - was false
cur.con.setAutoCommit(true);
} catch (SQLException e) {
log.error(
"StandardXADataSource:getFreeConnection ERROR: Failed while autocommiting a connection: "
+ e);
}
return cur;
}
public void closeFreeConnection() {
log.debug("StandardXADataSource:closeFreeConnection empty method TBD");
}
public void setMinCon(int min) {
this.minCon = min;
}
public void setMaxCon(int max) {
this.maxCon = max;
}
public void setDeadLockMaxWait(long deadLock) {
this.deadLockMaxWait = deadLock;
}
public int getMinCon() {
return this.minCon;
}
public int getMaxCon() {
return this.maxCon;
}
public long getDeadLockMaxWait() {
return this.deadLockMaxWait;
}
public int getAllConnections() {
return xidConnections.size() + freeConnections.size();
}
public synchronized void processToWait() throws Exception {
log.debug("StandardXADataSource:processToWait");
int currentWait = 0;
if (maxCon != 0) {
while ((getAllConnections() >= maxCon)
&& (currentWait < getDeadLockMaxWait())) {
dump();
try {
synchronized (this) {
wait(getDeadLockRetryWait());
}
} catch (InterruptedException e) {
log.error(
"StandardXADataSource:processToWait ERROR: Failed while waiting for an object: "
+ e);
}
currentWait += getDeadLockRetryWait();
}
if (getAllConnections() >= getMaxCon())
throw new Exception("StandardXADataSource:processToWait ERROR : impossible to obtain a new xa connection");
}
}
public void dump() {
for (int i = 0; i < freeConnections.size(); i++) {
log.debug(
"freeConnection:<"
+ freeConnections.elementAt(i).toString()
+ ">");
}
for (Enumeration enum = xidConnections.elements();
enum.hasMoreElements();
) {
log.debug("xidConnection:<" + enum.nextElement().toString() + ">");
}
}
public void setDeadLockRetryWait(long deadLockRetryWait) {
this.deadLockRetryWait = deadLockRetryWait;
}
public long getDeadLockRetryWait() {
return this.deadLockRetryWait;
}
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("StandardXADataSource:\n");
sb.append(" connection count=<"+this.connectionCount+">\n");
if (deadConnections != null)
sb.append(" number of dead connection=<"+this.deadConnections.size()+">\n");
sb.append(" dead lock max wait=<"+this.deadLockMaxWait+">\n");
sb.append(" dead lock retry wait=<"+this.deadLockRetryWait+">\n");
if (driver != null)
sb.append(" driver=<"+this.driver.toString()+">\n");
sb.append(" driver name=<"+this.driverName+">\n");
if (freeConnections != null)
sb.append(" number of *free* connections=<"+this.freeConnections.size()+">\n");
sb.append(" max con=<"+this.maxCon+">\n");
sb.append(" min con=<"+this.minCon+">\n");
sb.append(" prepared stmt cache size=<"+this.preparedStmtCacheSize+">\n");
sb.append(" transaction manager=<"+this.transactionManager+">\n");
sb.append(" xid connection size=<"+this.xidConnections.size()+">\n");
sb.append(super.toString());
return sb.toString();
}
}