/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.lucene.index;

import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.util.IOUtils;

@lucene.internal
/** * @lucene.internal */
final class DocumentsWriterFlushQueue { private final Queue<FlushTicket> queue = new LinkedList<>(); // we track tickets separately since count must be present even before the ticket is // constructed ie. queue.size would not reflect it. private final AtomicInteger ticketCount = new AtomicInteger(); private final ReentrantLock purgeLock = new ReentrantLock(); synchronized boolean addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException { incTickets();// first inc the ticket count - freeze opens // a window for #anyChanges to fail boolean success = false; try { FrozenBufferedUpdates frozenBufferedUpdates = deleteQueue.maybeFreezeGlobalBuffer(); if (frozenBufferedUpdates != null) { // no need to publish anything if we don't have any frozen updates queue.add(new FlushTicket(frozenBufferedUpdates, false)); success = true; } } finally { if (!success) { decTickets(); } } return success; } private void incTickets() { int numTickets = ticketCount.incrementAndGet(); assert numTickets > 0; } private void decTickets() { int numTickets = ticketCount.decrementAndGet(); assert numTickets >= 0; } synchronized FlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) throws IOException { // Each flush is assigned a ticket in the order they acquire the ticketQueue // lock incTickets(); boolean success = false; try { // prepare flush freezes the global deletes - do in synced block! final FlushTicket ticket = new FlushTicket(dwpt.prepareFlush(), true); queue.add(ticket); success = true; return ticket; } finally { if (!success) { decTickets(); } } } synchronized void addSegment(FlushTicket ticket, FlushedSegment segment) { assert ticket.hasSegment; // the actual flush is done asynchronously and once done the FlushedSegment // is passed to the flush ticket ticket.setSegment(segment); } synchronized void markTicketFailed(FlushTicket ticket) { assert ticket.hasSegment; // to free the queue we mark tickets as failed just to clean up the queue. ticket.setFailed(); } boolean hasTickets() { assert ticketCount.get() >= 0 : "ticketCount should be >= 0 but was: " + ticketCount.get(); return ticketCount.get() != 0; } private void innerPurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException { assert purgeLock.isHeldByCurrentThread(); while (true) { final FlushTicket head; final boolean canPublish; synchronized (this) { head = queue.peek(); canPublish = head != null && head.canPublish(); // do this synced } if (canPublish) { try { /* * if we block on publish -> lock IW -> lock BufferedDeletes we don't block * concurrent segment flushes just because they want to append to the queue. * the downside is that we need to force a purge on fullFlush since there could * be a ticket still in the queue. */ consumer.accept(head); } finally { synchronized (this) { // finally remove the published ticket from the queue final FlushTicket poll = queue.poll(); decTickets(); // we hold the purgeLock so no other thread should have polled: assert poll == head; } } } else { break; } } } void forcePurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException { assert !Thread.holdsLock(this); purgeLock.lock(); try { innerPurge(consumer); } finally { purgeLock.unlock(); } } void tryPurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException { assert !Thread.holdsLock(this); if (purgeLock.tryLock()) { try { innerPurge(consumer); } finally { purgeLock.unlock(); } } } int getTicketCount() { return ticketCount.get(); } static final class FlushTicket { private final FrozenBufferedUpdates frozenUpdates; private final boolean hasSegment; private FlushedSegment segment; private boolean failed = false; private boolean published = false; FlushTicket(FrozenBufferedUpdates frozenUpdates, boolean hasSegment) { this.frozenUpdates = frozenUpdates; this.hasSegment = hasSegment; } boolean canPublish() { return hasSegment == false || segment != null || failed; } synchronized void markPublished() { assert published == false: "ticket was already published - can not publish twice"; published = true; } private void setSegment(FlushedSegment segment) { assert !failed; this.segment = segment; } private void setFailed() { assert segment == null; failed = true; }
Returns the flushed segment or null if this flush ticket doesn't have a segment. This can be the case if this ticket represents a flushed global frozen updates package.
/** * Returns the flushed segment or <code>null</code> if this flush ticket doesn't have a segment. This can be the * case if this ticket represents a flushed global frozen updates package. */
FlushedSegment getFlushedSegment() { return segment; }
Returns a frozen global deletes package.
/** * Returns a frozen global deletes package. */
FrozenBufferedUpdates getFrozenUpdates() { return frozenUpdates; } } }