package org.hibernate.engine.spi;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.hibernate.AssertionFailure;
import org.hibernate.HibernateException;
import org.hibernate.PropertyValueException;
import org.hibernate.action.internal.AbstractEntityInsertAction;
import org.hibernate.action.internal.BulkOperationCleanupAction;
import org.hibernate.action.internal.CollectionRecreateAction;
import org.hibernate.action.internal.CollectionRemoveAction;
import org.hibernate.action.internal.CollectionUpdateAction;
import org.hibernate.action.internal.EntityDeleteAction;
import org.hibernate.action.internal.EntityIdentityInsertAction;
import org.hibernate.action.internal.EntityInsertAction;
import org.hibernate.action.internal.EntityUpdateAction;
import org.hibernate.action.internal.OrphanRemovalAction;
import org.hibernate.action.internal.QueuedOperationCollectionAction;
import org.hibernate.action.internal.UnresolvedEntityInsertActions;
import org.hibernate.action.spi.AfterTransactionCompletionProcess;
import org.hibernate.action.spi.BeforeTransactionCompletionProcess;
import org.hibernate.action.spi.Executable;
import org.hibernate.cache.CacheException;
import org.hibernate.engine.internal.NonNullableTransientDependencies;
import org.hibernate.internal.CoreLogging;
import org.hibernate.internal.CoreMessageLogger;
import org.hibernate.proxy.HibernateProxy;
import org.hibernate.proxy.LazyInitializer;
import org.hibernate.type.Type;
public class ActionQueue {
private static final CoreMessageLogger LOG = CoreLogging.messageLogger( ActionQueue.class );
private SessionImplementor session;
private UnresolvedEntityInsertActions unresolvedInsertions;
private final ExecutableList<AbstractEntityInsertAction> insertions;
private final ExecutableList<EntityDeleteAction> deletions;
private final ExecutableList<EntityUpdateAction> updates;
private final ExecutableList<CollectionRecreateAction> collectionCreations;
private final ExecutableList<CollectionUpdateAction> collectionUpdates;
private final ExecutableList<QueuedOperationCollectionAction> collectionQueuedOps;
private final ExecutableList<CollectionRemoveAction> collectionRemovals;
private final ExecutableList<OrphanRemovalAction> orphanRemovals;
private final List<ExecutableList<?>> executableLists;
private transient boolean isTransactionCoordinatorShared;
private AfterTransactionCompletionProcessQueue afterTransactionProcesses;
private BeforeTransactionCompletionProcessQueue beforeTransactionProcesses;
public ActionQueue(SessionImplementor session) {
this.session = session;
unresolvedInsertions = new UnresolvedEntityInsertActions();
insertions = new ExecutableList<AbstractEntityInsertAction>( new InsertActionSorter() );
deletions = new ExecutableList<EntityDeleteAction>();
updates = new ExecutableList<EntityUpdateAction>();
collectionCreations = new ExecutableList<CollectionRecreateAction>();
collectionRemovals = new ExecutableList<CollectionRemoveAction>();
collectionUpdates = new ExecutableList<CollectionUpdateAction>();
collectionQueuedOps = new ExecutableList<QueuedOperationCollectionAction>();
orphanRemovals = new ExecutableList<OrphanRemovalAction>();
List<ExecutableList<?>> tmp = Arrays.<ExecutableList<?>>asList(
orphanRemovals,
insertions,
updates,
collectionQueuedOps,
collectionRemovals,
collectionUpdates,
collectionCreations,
deletions
);
executableLists = Collections.unmodifiableList( tmp );
isTransactionCoordinatorShared = false;
afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session );
beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session );
}
public void clear() {
for ( ExecutableList<?> l : executableLists ) {
l.clear();
}
unresolvedInsertions.clear();
}
public void addAction(EntityInsertAction action) {
LOG.tracev( "Adding an EntityInsertAction for [{0}] object", action.getEntityName() );
addInsertAction( action );
}
private void addInsertAction(AbstractEntityInsertAction insert) {
if ( insert.isEarlyInsert() ) {
LOG.tracev( "Executing inserts before finding non-nullable transient entities for early insert: [{0}]", insert );
executeInserts();
}
NonNullableTransientDependencies nonNullableTransientDependencies = insert.findNonNullableTransientEntities();
if ( nonNullableTransientDependencies == null ) {
LOG.tracev( "Adding insert with no non-nullable, transient entities: [{0}]", insert );
addResolvedEntityInsertAction( insert );
}
else {
if ( LOG.isTraceEnabled() ) {
LOG.tracev( "Adding insert with non-nullable, transient entities; insert=[{0}], dependencies=[{1}]", insert,
nonNullableTransientDependencies.toLoggableString( insert.getSession() ) );
}
unresolvedInsertions.addUnresolvedEntityInsertAction( insert, nonNullableTransientDependencies );
}
}
private void addResolvedEntityInsertAction(AbstractEntityInsertAction insert) {
if ( insert.isEarlyInsert() ) {
LOG.trace( "Executing insertions before resolved early-insert" );
executeInserts();
LOG.debug( "Executing identity-insert immediately" );
execute( insert );
}
else {
LOG.trace( "Adding resolved non-early insert action." );
insertions.add( insert );
}
insert.makeEntityManaged();
for ( AbstractEntityInsertAction resolvedAction : unresolvedInsertions.resolveDependentActions( insert.getInstance(), session ) ) {
addResolvedEntityInsertAction( resolvedAction );
}
}
public void addAction(EntityIdentityInsertAction action) {
LOG.tracev( "Adding an EntityIdentityInsertAction for [{0}] object", action.getEntityName() );
addInsertAction( action );
}
public void addAction(EntityDeleteAction action) {
deletions.add( action );
}
public void addAction(OrphanRemovalAction action) {
orphanRemovals.add( action );
}
public void addAction(EntityUpdateAction action) {
updates.add( action );
}
public void addAction(CollectionRecreateAction action) {
collectionCreations.add( action );
}
public void addAction(CollectionRemoveAction action) {
collectionRemovals.add( action );
}
public void addAction(CollectionUpdateAction action) {
collectionUpdates.add( action );
}
public void addAction(QueuedOperationCollectionAction action) {
collectionQueuedOps.add( action );
}
public void addAction(BulkOperationCleanupAction action) {
registerCleanupActions( action );
}
private void registerCleanupActions(Executable executable) {
beforeTransactionProcesses.register( executable.getBeforeTransactionCompletionProcess() );
if ( session.getFactory().getSettings().isQueryCacheEnabled() ) {
invalidateSpaces( executable.getPropertySpaces() );
}
afterTransactionProcesses.register( executable.getAfterTransactionCompletionProcess() );
}
public boolean hasUnresolvedEntityInsertActions() {
return !unresolvedInsertions.isEmpty();
}
public void checkNoUnresolvedActionsAfterOperation() throws PropertyValueException {
unresolvedInsertions.checkNoUnresolvedActionsAfterOperation();
}
public void registerProcess(AfterTransactionCompletionProcess process) {
afterTransactionProcesses.register( process );
}
public void registerProcess(BeforeTransactionCompletionProcess process) {
beforeTransactionProcesses.register( process );
}
public void executeInserts() throws HibernateException {
executeActions( insertions );
}
public void executeActions() throws HibernateException {
if ( !unresolvedInsertions.isEmpty() ) {
throw new IllegalStateException( "About to execute actions, but there are unresolved entity insert actions." );
}
for ( ExecutableList<?> l : executableLists ) {
executeActions( l );
}
}
public void prepareActions() throws HibernateException {
prepareActions( collectionRemovals );
prepareActions( collectionUpdates );
prepareActions( collectionCreations );
prepareActions( collectionQueuedOps );
}
private void prepareActions(ExecutableList<?> queue) throws HibernateException {
for ( Executable executable : queue ) {
executable.beforeExecutions();
}
}
public void afterTransactionCompletion(boolean success) {
if ( !isTransactionCoordinatorShared ) {
afterTransactionProcesses.afterTransactionCompletion( success );
}
}
public void beforeTransactionCompletion() {
if ( !isTransactionCoordinatorShared ) {
beforeTransactionProcesses.beforeTransactionCompletion();
}
}
public boolean areInsertionsOrDeletionsQueued() {
return !insertions.isEmpty() || !unresolvedInsertions.isEmpty() || !deletions.isEmpty() || !orphanRemovals.isEmpty();
}
public boolean areTablesToBeUpdated(@SuppressWarnings("rawtypes") Set tables) {
if ( tables.isEmpty() ) {
return false;
}
for ( ExecutableList<?> l : executableLists ) {
if ( areTablesToBeUpdated( l, tables ) ) {
return true;
}
}
return areTablesToBeUpdated( unresolvedInsertions, tables );
}
private static boolean areTablesToBeUpdated(ExecutableList<?> actions, @SuppressWarnings("rawtypes") Set tableSpaces) {
if ( actions.isEmpty() ) {
return false;
}
for ( Serializable actionSpace : actions.getQuerySpaces() ) {
if ( tableSpaces.contains( actionSpace ) ) {
LOG.debugf( "Changes must be flushed to space: %s", actionSpace );
return true;
}
}
return false;
}
private static boolean areTablesToBeUpdated(UnresolvedEntityInsertActions actions, @SuppressWarnings("rawtypes") Set tableSpaces) {
for ( Executable action : actions.getDependentEntityInsertActions() ) {
final Serializable[] spaces = action.getPropertySpaces();
for ( Serializable space : spaces ) {
if ( tableSpaces.contains( space ) ) {
LOG.debugf( "Changes must be flushed to space: %s", space );
return true;
}
}
}
return false;
}
private <E extends Executable & Comparable<?> & Serializable> void executeActions(ExecutableList<E> list) throws HibernateException {
try {
for ( E e : list ) {
try {
e.execute();
}
finally {
beforeTransactionProcesses.register( e.getBeforeTransactionCompletionProcess() );
afterTransactionProcesses.register( e.getAfterTransactionCompletionProcess() );
}
}
}
finally {
if ( session.getFactory().getSettings().isQueryCacheEnabled() ) {
Set<Serializable> propertySpaces = list.getQuerySpaces();
invalidateSpaces( propertySpaces.toArray( new Serializable[propertySpaces.size()] ) );
}
}
list.clear();
session.getTransactionCoordinator().getJdbcCoordinator().executeBatch();
}
public <E extends Executable & Comparable<?>> void execute(E executable) {
try {
executable.execute();
}
finally {
registerCleanupActions( executable );
}
}
private void invalidateSpaces(Serializable... spaces) {
if ( spaces != null && spaces.length > 0 ) {
for ( Serializable s : spaces ) {
afterTransactionProcesses.addSpaceToInvalidate( (String) s );
}
session.getFactory().getUpdateTimestampsCache().preInvalidate( spaces, session );
}
}
@Override
public String toString() {
return "ActionQueue[insertions=" + insertions
+ " updates=" + updates
+ " deletions=" + deletions
+ " orphanRemovals=" + orphanRemovals
+ " collectionCreations=" + collectionCreations
+ " collectionRemovals=" + collectionRemovals
+ " collectionUpdates=" + collectionUpdates
+ " collectionQueuedOps=" + collectionQueuedOps
+ " unresolvedInsertDependencies=" + unresolvedInsertions
+ "]";
}
public int numberOfCollectionRemovals() {
return collectionRemovals.size();
}
public int numberOfCollectionUpdates() {
return collectionUpdates.size();
}
public int numberOfCollectionCreations() {
return collectionCreations.size();
}
public int numberOfDeletions() {
return deletions.size() + orphanRemovals.size();
}
public int numberOfUpdates() {
return updates.size();
}
public int numberOfInsertions() {
return insertions.size();
}
public TransactionCompletionProcesses getTransactionCompletionProcesses() {
return new TransactionCompletionProcesses( beforeTransactionProcesses, afterTransactionProcesses );
}
public void setTransactionCompletionProcesses(TransactionCompletionProcesses processes, boolean isTransactionCoordinatorShared) {
this.isTransactionCoordinatorShared = isTransactionCoordinatorShared;
this.beforeTransactionProcesses = processes.beforeTransactionCompletionProcesses;
this.afterTransactionProcesses = processes.afterTransactionCompletionProcesses;
}
public void sortCollectionActions() {
if ( session.getFactory().getSettings().isOrderUpdatesEnabled() ) {
collectionCreations.sort();
collectionUpdates.sort();
collectionQueuedOps.sort();
collectionRemovals.sort();
}
}
public void sortActions() {
if ( session.getFactory().getSettings().isOrderUpdatesEnabled() ) {
updates.sort();
}
if ( session.getFactory().getSettings().isOrderInsertsEnabled() ) {
insertions.sort();
}
}
public void clearFromFlushNeededCheck(int previousCollectionRemovalSize) {
collectionCreations.clear();
collectionUpdates.clear();
collectionQueuedOps.clear();
updates.clear();
if ( collectionRemovals.size() > previousCollectionRemovalSize ) {
collectionRemovals.removeLastN( collectionRemovals.size() - previousCollectionRemovalSize );
}
}
public boolean hasAfterTransactionActions() {
return isTransactionCoordinatorShared ? false : afterTransactionProcesses.hasActions();
}
public boolean hasBeforeTransactionActions() {
return isTransactionCoordinatorShared ? false : beforeTransactionProcesses.hasActions();
}
public boolean hasAnyQueuedActions() {
return !updates.isEmpty() || !insertions.isEmpty() || !unresolvedInsertions.isEmpty() || !deletions.isEmpty() || !collectionUpdates.isEmpty()
|| !collectionQueuedOps.isEmpty() || !collectionRemovals.isEmpty() || !collectionCreations.isEmpty();
}
public void unScheduleDeletion(EntityEntry entry, Object rescuedEntity) {
if ( rescuedEntity instanceof HibernateProxy ) {
LazyInitializer initializer = ( ( HibernateProxy ) rescuedEntity ).getHibernateLazyInitializer();
if ( !initializer.isUninitialized() ) {
rescuedEntity = initializer.getImplementation( session );
}
}
for ( int i = 0; i < deletions.size(); i++ ) {
EntityDeleteAction action = deletions.get( i );
if ( action.getInstance() == rescuedEntity ) {
deletions.remove( i );
return;
}
}
for ( int i = 0; i < orphanRemovals.size(); i++ ) {
EntityDeleteAction action = orphanRemovals.get( i );
if ( action.getInstance() == rescuedEntity ) {
orphanRemovals.remove( i );
return;
}
}
throw new AssertionFailure( "Unable to perform un-delete for instance " + entry.getEntityName() );
}
public void serialize(ObjectOutputStream oos) throws IOException {
LOG.trace( "Serializing action-queue" );
unresolvedInsertions.serialize( oos );
for ( ExecutableList<?> l : executableLists ) {
l.writeExternal( oos );
}
}
public static ActionQueue deserialize(ObjectInputStream ois, SessionImplementor session) throws IOException, ClassNotFoundException {
final boolean traceEnabled = LOG.isTraceEnabled();
if ( traceEnabled ) {
LOG.trace( "Deserializing action-queue" );
}
ActionQueue rtn = new ActionQueue( session );
rtn.unresolvedInsertions = UnresolvedEntityInsertActions.deserialize( ois, session );
for ( ExecutableList<?> l : rtn.executableLists ) {
l.readExternal( ois );
if ( traceEnabled ) {
LOG.tracev( "Deserialized [{0}] entries", l.size() );
}
l.afterDeserialize( session );
}
return rtn;
}
private static abstract class AbstractTransactionCompletionProcessQueue<T> {
protected SessionImplementor session;
protected Queue<T> processes = new ConcurrentLinkedQueue<T>();
private AbstractTransactionCompletionProcessQueue(SessionImplementor session) {
this.session = session;
}
public void register(T process) {
if ( process == null ) {
return;
}
processes.add( process );
}
public boolean hasActions() {
return !processes.isEmpty();
}
}
private static class BeforeTransactionCompletionProcessQueue extends AbstractTransactionCompletionProcessQueue<BeforeTransactionCompletionProcess> {
private BeforeTransactionCompletionProcessQueue(SessionImplementor session) {
super( session );
}
public void beforeTransactionCompletion() {
while ( !processes.isEmpty() ) {
try {
processes.poll().doBeforeTransactionCompletion( session );
}
catch (HibernateException he) {
throw he;
}
catch (Exception e) {
throw new AssertionFailure( "Unable to perform beforeTransactionCompletion callback", e );
}
}
}
}
private static class AfterTransactionCompletionProcessQueue extends AbstractTransactionCompletionProcessQueue<AfterTransactionCompletionProcess> {
private Set<String> querySpacesToInvalidate = new HashSet<String>();
private AfterTransactionCompletionProcessQueue(SessionImplementor session) {
super( session );
}
public void addSpaceToInvalidate(String space) {
querySpacesToInvalidate.add( space );
}
public void afterTransactionCompletion(boolean success) {
while ( !processes.isEmpty() ) {
try {
processes.poll().doAfterTransactionCompletion( success, session );
}
catch (CacheException ce) {
LOG.unableToReleaseCacheLock( ce );
}
catch (Exception e) {
throw new AssertionFailure( "Exception releasing cache locks", e );
}
}
if ( session.getFactory().getSettings().isQueryCacheEnabled() ) {
session.getFactory().getUpdateTimestampsCache().invalidate(
querySpacesToInvalidate.toArray( new String[querySpacesToInvalidate.size()] ),
session
);
}
querySpacesToInvalidate.clear();
}
}
public static class TransactionCompletionProcesses {
private final BeforeTransactionCompletionProcessQueue beforeTransactionCompletionProcesses;
private final AfterTransactionCompletionProcessQueue afterTransactionCompletionProcesses;
private TransactionCompletionProcesses(
BeforeTransactionCompletionProcessQueue beforeTransactionCompletionProcessQueue,
AfterTransactionCompletionProcessQueue afterTransactionCompletionProcessQueue) {
this.beforeTransactionCompletionProcesses = beforeTransactionCompletionProcessQueue;
this.afterTransactionCompletionProcesses = afterTransactionCompletionProcessQueue;
}
}
private static class InsertActionSorter implements ExecutableList.Sorter<AbstractEntityInsertAction> {
public static final InsertActionSorter INSTANCE = new InsertActionSorter();
private Map<String, Integer> latestBatches;
private Map<Object, Integer> entityBatchNumber;
private Map<Integer, List<AbstractEntityInsertAction>> actionBatches;
public InsertActionSorter() {
}
public void sort(List<AbstractEntityInsertAction> insertions) {
this.latestBatches = new HashMap<String, Integer>();
this.entityBatchNumber = new HashMap<Object, Integer>( insertions.size() + 1, 1.0f );
this.actionBatches = new HashMap<Integer, List<AbstractEntityInsertAction>>();
for ( AbstractEntityInsertAction action : insertions ) {
String entityName = action.getEntityName();
Object currentEntity = action.getInstance();
Integer batchNumber;
if ( latestBatches.containsKey( entityName ) ) {
batchNumber = findBatchNumber( action, entityName );
}
else {
batchNumber = actionBatches.size();
latestBatches.put( entityName, batchNumber );
}
entityBatchNumber.put( currentEntity, batchNumber );
addToBatch( batchNumber, action );
}
insertions.clear();
for ( int i = 0; i < actionBatches.size(); i++ ) {
List<AbstractEntityInsertAction> batch = actionBatches.get( i );
insertions.addAll( batch );
}
}
private Integer findBatchNumber(AbstractEntityInsertAction action, String entityName) {
Integer latestBatchNumberForType = latestBatches.get( entityName );
Object[] propertyValues = action.getState();
Type[] propertyTypes = action.getPersister().getClassMetadata().getPropertyTypes();
for ( int i = 0; i < propertyValues.length; i++ ) {
Object value = propertyValues[i];
Type type = propertyTypes[i];
if ( type.isEntityType() && value != null ) {
Integer associationBatchNumber = entityBatchNumber.get( value );
if ( associationBatchNumber != null && associationBatchNumber.compareTo( latestBatchNumberForType ) > 0 ) {
latestBatchNumberForType = actionBatches.size();
latestBatches.put( entityName, latestBatchNumberForType );
break;
}
}
}
return latestBatchNumberForType;
}
private void addToBatch(Integer batchNumber, AbstractEntityInsertAction action) {
List<AbstractEntityInsertAction> actions = actionBatches.get( batchNumber );
if ( actions == null ) {
actions = new LinkedList<AbstractEntityInsertAction>();
actionBatches.put( batchNumber, actions );
}
actions.add( action );
}
}
}