package org.apache.lucene.index;
import java.io.IOException;
import java.text.NumberFormat;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.ByteBlockPool.Allocator;
import org.apache.lucene.util.ByteBlockPool.DirectTrackingAllocator;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.IntBlockPool;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.Version;
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
final class DocumentsWriterPerThread {
abstract static class IndexingChain {
abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) throws IOException;
}
private Throwable abortingException;
final void onAbortingException(Throwable throwable) {
assert abortingException == null: "aborting excpetion has already been set";
abortingException = throwable;
}
final boolean hasHitAbortingException() {
return abortingException != null;
}
final boolean isAborted() {
return aborted;
}
static final IndexingChain defaultIndexingChain = new IndexingChain() {
@Override
DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) throws IOException {
return new DefaultIndexingChain(documentsWriterPerThread);
}
};
static class DocState {
final DocumentsWriterPerThread docWriter;
Analyzer analyzer;
InfoStream infoStream;
Similarity similarity;
int docID;
Iterable<? extends IndexableField> doc;
DocState(DocumentsWriterPerThread docWriter, InfoStream infoStream) {
this.docWriter = docWriter;
this.infoStream = infoStream;
}
public void clear() {
doc = null;
analyzer = null;
}
}
static final class FlushedSegment {
final SegmentCommitInfo segmentInfo;
final FieldInfos fieldInfos;
final FrozenBufferedUpdates segmentUpdates;
final FixedBitSet liveDocs;
final Sorter.DocMap sortMap;
final int delCount;
private FlushedSegment(InfoStream infoStream, SegmentCommitInfo segmentInfo, FieldInfos fieldInfos,
BufferedUpdates segmentUpdates, FixedBitSet liveDocs, int delCount, Sorter.DocMap sortMap)
throws IOException {
this.segmentInfo = segmentInfo;
this.fieldInfos = fieldInfos;
this.segmentUpdates = segmentUpdates != null && segmentUpdates.any() ? new FrozenBufferedUpdates(infoStream, segmentUpdates, segmentInfo) : null;
this.liveDocs = liveDocs;
this.delCount = delCount;
this.sortMap = sortMap;
}
}
void abort() throws IOException{
aborted = true;
pendingNumDocs.addAndGet(-numDocsInRAM);
try {
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "now abort");
}
try {
consumer.abort();
} finally {
pendingUpdates.clear();
}
} finally {
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "done abort");
}
}
}
private final static boolean INFO_VERBOSE = false;
final Codec codec;
final TrackingDirectoryWrapper directory;
final Directory directoryOrig;
final DocState docState;
final DocConsumer consumer;
final Counter bytesUsed;
final BufferedUpdates pendingUpdates;
final SegmentInfo segmentInfo;
private boolean aborted = false;
private final FieldInfos.Builder fieldInfos;
private final InfoStream infoStream;
private int numDocsInRAM;
final DocumentsWriterDeleteQueue deleteQueue;
private final DeleteSlice deleteSlice;
private final NumberFormat nf = NumberFormat.getInstance(Locale.ROOT);
final Allocator byteBlockAllocator;
final IntBlockPool.Allocator intBlockAllocator;
private final AtomicLong pendingNumDocs;
private final LiveIndexWriterConfig indexWriterConfig;
private final boolean enableTestPoints;
private final int indexVersionCreated;
public DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException {
this.directoryOrig = directoryOrig;
this.directory = new TrackingDirectoryWrapper(directory);
this.fieldInfos = fieldInfos;
this.indexWriterConfig = indexWriterConfig;
this.infoStream = infoStream;
this.codec = indexWriterConfig.getCodec();
this.docState = new DocState(this, infoStream);
this.docState.similarity = indexWriterConfig.getSimilarity();
this.pendingNumDocs = pendingNumDocs;
bytesUsed = Counter.newCounter();
byteBlockAllocator = new DirectTrackingAllocator(bytesUsed);
pendingUpdates = new BufferedUpdates(segmentName);
intBlockAllocator = new IntBlockAllocator(bytesUsed);
this.deleteQueue = deleteQueue;
assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
deleteSlice = deleteQueue.newSlice();
segmentInfo = new SegmentInfo(directoryOrig, Version.LATEST, Version.LATEST, segmentName, -1, false, codec, Collections.emptyMap(), StringHelper.randomId(), Collections.emptyMap(), indexWriterConfig.getIndexSort());
assert numDocsInRAM == 0;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segmentName + " delQueue=" + deleteQueue);
}
consumer = indexWriterConfig.getIndexingChain().getChain(this);
this.enableTestPoints = enableTestPoints;
this.indexVersionCreated = indexVersionCreated;
}
public FieldInfos.Builder getFieldInfosBuilder() {
return fieldInfos;
}
public int getIndexCreatedVersionMajor() {
return indexVersionCreated;
}
final void testPoint(String message) {
if (enableTestPoints) {
assert infoStream.isEnabled("TP");
infoStream.message("TP", message);
}
}
private void reserveOneDoc() {
if (pendingNumDocs.incrementAndGet() > IndexWriter.getActualMaxDocs()) {
pendingNumDocs.decrementAndGet();
throw new IllegalArgumentException("number of documents in the index cannot exceed " + IndexWriter.getActualMaxDocs());
}
}
public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
try {
assert hasHitAbortingException() == false: "DWPT has hit aborting exception but is still indexing";
testPoint("DocumentsWriterPerThread addDocument start");
assert deleteQueue != null;
reserveOneDoc();
docState.doc = doc;
docState.analyzer = analyzer;
docState.docID = numDocsInRAM;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + deleteNode + " docID=" + docState.docID + " seg=" + segmentInfo.name);
}
boolean success = false;
try {
try {
consumer.processDocument();
} finally {
docState.clear();
}
success = true;
} finally {
if (!success) {
deleteDocID(docState.docID);
numDocsInRAM++;
}
}
return finishDocument(deleteNode);
} finally {
maybeAbort("updateDocument", flushNotifications);
}
}
public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
try {
testPoint("DocumentsWriterPerThread addDocuments start");
assert hasHitAbortingException() == false: "DWPT has hit aborting exception but is still indexing";
assert deleteQueue != null;
docState.analyzer = analyzer;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + deleteNode + " docID=" + docState.docID + " seg=" + segmentInfo.name);
}
int docCount = 0;
boolean allDocsIndexed = false;
try {
for (Iterable<? extends IndexableField> doc : docs) {
reserveOneDoc();
docState.doc = doc;
docState.docID = numDocsInRAM;
docCount++;
boolean success = false;
try {
consumer.processDocument();
success = true;
} finally {
if (!success) {
numDocsInRAM++;
}
}
numDocsInRAM++;
}
allDocsIndexed = true;
long seqNo;
if (deleteNode != null) {
seqNo = deleteQueue.add(deleteNode, deleteSlice);
assert deleteSlice.isTail(deleteNode) : "expected the delete term as the tail item";
deleteSlice.apply(pendingUpdates, numDocsInRAM - docCount);
return seqNo;
} else {
seqNo = deleteQueue.updateSlice(deleteSlice);
if (seqNo < 0) {
seqNo = -seqNo;
deleteSlice.apply(pendingUpdates, numDocsInRAM - docCount);
} else {
deleteSlice.reset();
}
}
return seqNo;
} finally {
if (!allDocsIndexed && !aborted) {
int docID = numDocsInRAM - 1;
final int endDocID = docID - docCount;
while (docID > endDocID) {
deleteDocID(docID);
docID--;
}
}
docState.clear();
}
} finally {
maybeAbort("updateDocuments", flushNotifications);
}
}
private long finishDocument(DocumentsWriterDeleteQueue.Node<?> deleteNode) {
boolean applySlice = numDocsInRAM != 0;
long seqNo;
if (deleteNode != null) {
seqNo = deleteQueue.add(deleteNode, deleteSlice);
assert deleteSlice.isTail(deleteNode) : "expected the delete node as the tail";
} else {
seqNo = deleteQueue.updateSlice(deleteSlice);
if (seqNo < 0) {
seqNo = -seqNo;
} else {
applySlice = false;
}
}
if (applySlice) {
deleteSlice.apply(pendingUpdates, numDocsInRAM);
} else {
deleteSlice.reset();
}
++numDocsInRAM;
return seqNo;
}
void deleteDocID(int docIDUpto) {
pendingUpdates.addDocID(docIDUpto);
}
public int getNumDocsInRAM() {
return numDocsInRAM;
}
FrozenBufferedUpdates prepareFlush() throws IOException {
assert numDocsInRAM > 0;
final FrozenBufferedUpdates globalUpdates = deleteQueue.freezeGlobalBuffer(deleteSlice);
if (deleteSlice != null) {
deleteSlice.apply(pendingUpdates, numDocsInRAM);
assert deleteSlice.isEmpty();
deleteSlice.reset();
}
return globalUpdates;
}
FlushedSegment flush(DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
assert numDocsInRAM > 0;
assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush";
segmentInfo.setMaxDoc(numDocsInRAM);
final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(),
pendingUpdates, new IOContext(new FlushInfo(numDocsInRAM, bytesUsed())));
final double startMBUsed = bytesUsed() / 1024. / 1024.;
if (pendingUpdates.deleteDocIDs.size() > 0) {
flushState.liveDocs = new FixedBitSet(numDocsInRAM);
flushState.liveDocs.set(0, numDocsInRAM);
for(int delDocID : pendingUpdates.deleteDocIDs) {
flushState.liveDocs.clear(delDocID);
}
flushState.delCountOnFlush = pendingUpdates.deleteDocIDs.size();
pendingUpdates.clearDeletedDocIds();
}
if (aborted) {
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush: skip because aborting is set");
}
return null;
}
long t0 = System.nanoTime();
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM);
}
final Sorter.DocMap sortMap;
try {
DocIdSetIterator softDeletedDocs;
if (indexWriterConfig.getSoftDeletesField() != null) {
softDeletedDocs = consumer.getHasDocValues(indexWriterConfig.getSoftDeletesField());
} else {
softDeletedDocs = null;
}
sortMap = consumer.flush(flushState);
if (softDeletedDocs == null) {
flushState.softDelCountOnFlush = 0;
} else {
flushState.softDelCountOnFlush = PendingSoftDeletes.countSoftDeletes(softDeletedDocs, flushState.liveDocs);
assert flushState.segmentInfo.maxDoc() >= flushState.softDelCountOnFlush + flushState.delCountOnFlush;
}
pendingUpdates.clearDeleteTerms();
segmentInfo.setFiles(new HashSet<>(directory.getCreatedFiles()));
final SegmentCommitInfo segmentInfoPerCommit = new SegmentCommitInfo(segmentInfo, 0, flushState.softDelCountOnFlush, -1L, -1L, -1L);
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "new segment has " + (flushState.liveDocs == null ? 0 : flushState.delCountOnFlush) + " deleted docs");
infoStream.message("DWPT", "new segment has " + flushState.softDelCountOnFlush + " soft-deleted docs");
infoStream.message("DWPT", "new segment has " +
(flushState.fieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " +
(flushState.fieldInfos.hasNorms() ? "norms" : "no norms") + "; " +
(flushState.fieldInfos.hasDocValues() ? "docValues" : "no docValues") + "; " +
(flushState.fieldInfos.hasProx() ? "prox" : "no prox") + "; " +
(flushState.fieldInfos.hasFreq() ? "freqs" : "no freqs"));
infoStream.message("DWPT", "flushedFiles=" + segmentInfoPerCommit.files());
infoStream.message("DWPT", "flushed codec=" + codec);
}
final BufferedUpdates segmentDeletes;
if (pendingUpdates.deleteQueries.isEmpty() && pendingUpdates.numFieldUpdates.get() == 0) {
pendingUpdates.clear();
segmentDeletes = null;
} else {
segmentDeletes = pendingUpdates;
}
if (infoStream.isEnabled("DWPT")) {
final double newSegmentSize = segmentInfoPerCommit.sizeInBytes() / 1024. / 1024.;
infoStream.message("DWPT", "flushed: segment=" + segmentInfo.name +
" ramUsed=" + nf.format(startMBUsed) + " MB" +
" newFlushedSize=" + nf.format(newSegmentSize) + " MB" +
" docs/MB=" + nf.format(flushState.segmentInfo.maxDoc() / newSegmentSize));
}
assert segmentInfo != null;
FlushedSegment fs = new FlushedSegment(infoStream, segmentInfoPerCommit, flushState.fieldInfos,
segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush, sortMap);
sealFlushedSegment(fs, sortMap, flushNotifications);
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush time " + ((System.nanoTime() - t0) / 1000000.0) + " msec");
}
return fs;
} catch (Throwable t) {
onAbortingException(t);
throw t;
} finally {
maybeAbort("flush", flushNotifications);
}
}
private void maybeAbort(String location, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
if (hasHitAbortingException() && aborted == false) {
try {
abort();
} finally {
flushNotifications.onTragicEvent(abortingException, location);
}
}
}
private final Set<String> filesToDelete = new HashSet<>();
public Set<String> pendingFilesToDelete() {
return filesToDelete;
}
private FixedBitSet sortLiveDocs(Bits liveDocs, Sorter.DocMap sortMap) throws IOException {
assert liveDocs != null && sortMap != null;
FixedBitSet sortedLiveDocs = new FixedBitSet(liveDocs.length());
sortedLiveDocs.set(0, liveDocs.length());
for (int i = 0; i < liveDocs.length(); i++) {
if (liveDocs.get(i) == false) {
sortedLiveDocs.clear(sortMap.oldToNew(i));
}
}
return sortedLiveDocs;
}
void sealFlushedSegment(FlushedSegment flushedSegment, Sorter.DocMap sortMap, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
assert flushedSegment != null;
SegmentCommitInfo newSegment = flushedSegment.segmentInfo;
IndexWriter.setDiagnostics(newSegment.info, IndexWriter.SOURCE_FLUSH);
IOContext context = new IOContext(new FlushInfo(newSegment.info.maxDoc(), newSegment.sizeInBytes()));
boolean success = false;
try {
if (indexWriterConfig.getUseCompoundFile()) {
Set<String> originalFiles = newSegment.info.files();
IndexWriter.createCompoundFile(infoStream, new TrackingDirectoryWrapper(directory), newSegment.info, context, flushNotifications::deleteUnusedFiles);
filesToDelete.addAll(originalFiles);
newSegment.info.setUseCompoundFile(true);
}
codec.segmentInfoFormat().write(directory, newSegment.info, context);
if (flushedSegment.liveDocs != null) {
final int delCount = flushedSegment.delCount;
assert delCount > 0;
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", "flush: write " + delCount + " deletes gen=" + flushedSegment.segmentInfo.getDelGen());
}
SegmentCommitInfo info = flushedSegment.segmentInfo;
Codec codec = info.info.getCodec();
final FixedBitSet bits;
if (sortMap == null) {
bits = flushedSegment.liveDocs;
} else {
bits = sortLiveDocs(flushedSegment.liveDocs, sortMap);
}
codec.liveDocsFormat().writeLiveDocs(bits, directory, info, delCount, context);
newSegment.setDelCount(delCount);
newSegment.advanceDelGen();
}
success = true;
} finally {
if (!success) {
if (infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT",
"hit exception creating compound file for newly flushed segment " + newSegment.info.name);
}
}
}
}
SegmentInfo getSegmentInfo() {
return segmentInfo;
}
long bytesUsed() {
return bytesUsed.get() + pendingUpdates.ramBytesUsed();
}
final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2;
private static class IntBlockAllocator extends IntBlockPool.Allocator {
private final Counter bytesUsed;
public IntBlockAllocator(Counter bytesUsed) {
super(IntBlockPool.INT_BLOCK_SIZE);
this.bytesUsed = bytesUsed;
}
@Override
public int[] getIntBlock() {
int[] b = new int[IntBlockPool.INT_BLOCK_SIZE];
bytesUsed.addAndGet(IntBlockPool.INT_BLOCK_SIZE * Integer.BYTES);
return b;
}
@Override
public void recycleIntBlocks(int[][] blocks, int offset, int length) {
bytesUsed.addAndGet(-(length * (IntBlockPool.INT_BLOCK_SIZE * Integer.BYTES)));
}
}
@Override
public String toString() {
return "DocumentsWriterPerThread [pendingDeletes=" + pendingUpdates
+ ", segment=" + (segmentInfo != null ? segmentInfo.name : "null") + ", aborted=" + aborted + ", numDocsInRAM="
+ numDocsInRAM + ", deleteQueue=" + deleteQueue + "]";
}
}