package org.apache.lucene.index;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.util.IOUtils;
final class DocumentsWriterFlushQueue {
private final Queue<FlushTicket> queue = new LinkedList<>();
private final AtomicInteger ticketCount = new AtomicInteger();
private final ReentrantLock purgeLock = new ReentrantLock();
synchronized boolean addDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
incTickets();
boolean success = false;
try {
FrozenBufferedUpdates frozenBufferedUpdates = deleteQueue.maybeFreezeGlobalBuffer();
if (frozenBufferedUpdates != null) {
queue.add(new FlushTicket(frozenBufferedUpdates, false));
success = true;
}
} finally {
if (!success) {
decTickets();
}
}
return success;
}
private void incTickets() {
int numTickets = ticketCount.incrementAndGet();
assert numTickets > 0;
}
private void decTickets() {
int numTickets = ticketCount.decrementAndGet();
assert numTickets >= 0;
}
synchronized FlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) throws IOException {
incTickets();
boolean success = false;
try {
final FlushTicket ticket = new FlushTicket(dwpt.prepareFlush(), true);
queue.add(ticket);
success = true;
return ticket;
} finally {
if (!success) {
decTickets();
}
}
}
synchronized void addSegment(FlushTicket ticket, FlushedSegment segment) {
assert ticket.hasSegment;
ticket.setSegment(segment);
}
synchronized void markTicketFailed(FlushTicket ticket) {
assert ticket.hasSegment;
ticket.setFailed();
}
boolean hasTickets() {
assert ticketCount.get() >= 0 : "ticketCount should be >= 0 but was: " + ticketCount.get();
return ticketCount.get() != 0;
}
private void innerPurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException {
assert purgeLock.isHeldByCurrentThread();
while (true) {
final FlushTicket head;
final boolean canPublish;
synchronized (this) {
head = queue.peek();
canPublish = head != null && head.canPublish();
}
if (canPublish) {
try {
consumer.accept(head);
} finally {
synchronized (this) {
final FlushTicket poll = queue.poll();
decTickets();
assert poll == head;
}
}
} else {
break;
}
}
}
void forcePurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException {
assert !Thread.holdsLock(this);
purgeLock.lock();
try {
innerPurge(consumer);
} finally {
purgeLock.unlock();
}
}
void tryPurge(IOUtils.IOConsumer<FlushTicket> consumer) throws IOException {
assert !Thread.holdsLock(this);
if (purgeLock.tryLock()) {
try {
innerPurge(consumer);
} finally {
purgeLock.unlock();
}
}
}
int getTicketCount() {
return ticketCount.get();
}
static final class FlushTicket {
private final FrozenBufferedUpdates frozenUpdates;
private final boolean hasSegment;
private FlushedSegment segment;
private boolean failed = false;
private boolean published = false;
FlushTicket(FrozenBufferedUpdates frozenUpdates, boolean hasSegment) {
this.frozenUpdates = frozenUpdates;
this.hasSegment = hasSegment;
}
boolean canPublish() {
return hasSegment == false || segment != null || failed;
}
synchronized void markPublished() {
assert published == false: "ticket was already published - can not publish twice";
published = true;
}
private void setSegment(FlushedSegment segment) {
assert !failed;
this.segment = segment;
}
private void setFailed() {
assert segment == null;
failed = true;
}
FlushedSegment getFlushedSegment() {
return segment;
}
FrozenBufferedUpdates getFrozenUpdates() {
return frozenUpdates;
}
}
}