/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.lucene.index;

import java.io.IOException;
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.SetOnce;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.Version;

final class DocumentsWriterPerThread implements Accountable {

  
The IndexingChain must define the getChain(int, SegmentInfo, Directory, Builder, LiveIndexWriterConfig, Consumer<Throwable>) method which returns the DocConsumer that the DocumentsWriter calls to process the documents.
/** * The IndexingChain must define the {@link #getChain(int, SegmentInfo, Directory, FieldInfos.Builder, LiveIndexWriterConfig, Consumer)} method * which returns the DocConsumer that the DocumentsWriter calls to process the * documents. */
abstract static class IndexingChain { abstract DocConsumer getChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory, FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig, Consumer<Throwable> abortingExceptionConsumer); } private Throwable abortingException; private void onAbortingException(Throwable throwable) { assert throwable != null : "aborting exception must not be null"; assert abortingException == null: "aborting exception has already been set"; abortingException = throwable; } final boolean isAborted() { return aborted; } static final IndexingChain defaultIndexingChain = new IndexingChain() { @Override DocConsumer getChain(int indexCreatedVersionMajor, SegmentInfo segmentInfo, Directory directory, FieldInfos.Builder fieldInfos, LiveIndexWriterConfig indexWriterConfig, Consumer<Throwable> abortingExceptionConsumer) { return new DefaultIndexingChain(indexCreatedVersionMajor, segmentInfo, directory, fieldInfos, indexWriterConfig, abortingExceptionConsumer); } }; static final class FlushedSegment { final SegmentCommitInfo segmentInfo; final FieldInfos fieldInfos; final FrozenBufferedUpdates segmentUpdates; final FixedBitSet liveDocs; final Sorter.DocMap sortMap; final int delCount; private FlushedSegment(InfoStream infoStream, SegmentCommitInfo segmentInfo, FieldInfos fieldInfos, BufferedUpdates segmentUpdates, FixedBitSet liveDocs, int delCount, Sorter.DocMap sortMap) { this.segmentInfo = segmentInfo; this.fieldInfos = fieldInfos; this.segmentUpdates = segmentUpdates != null && segmentUpdates.any() ? new FrozenBufferedUpdates(infoStream, segmentUpdates, segmentInfo) : null; this.liveDocs = liveDocs; this.delCount = delCount; this.sortMap = sortMap; } }
Called if we hit an exception at a bad time (when updating the index files) and must discard all currently buffered docs. This resets our state, discarding any docs added since last flush.
/** Called if we hit an exception at a bad time (when * updating the index files) and must discard all * currently buffered docs. This resets our state, * discarding any docs added since last flush. */
void abort() throws IOException{ aborted = true; pendingNumDocs.addAndGet(-numDocsInRAM); try { if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "now abort"); } try { consumer.abort(); } finally { pendingUpdates.clear(); } } finally { if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "done abort"); } } } private final static boolean INFO_VERBOSE = false; final Codec codec; final TrackingDirectoryWrapper directory; private final DocConsumer consumer; // Updates for our still-in-RAM (to be flushed next) segment private final BufferedUpdates pendingUpdates; private final SegmentInfo segmentInfo; // Current segment we are working on private boolean aborted = false; // True if we aborted private SetOnce<Boolean> flushPending = new SetOnce<>(); private volatile long lastCommittedBytesUsed; private SetOnce<Boolean> hasFlushed = new SetOnce<>(); private final FieldInfos.Builder fieldInfos; private final InfoStream infoStream; private int numDocsInRAM; final DocumentsWriterDeleteQueue deleteQueue; private final DeleteSlice deleteSlice; private final NumberFormat nf = NumberFormat.getInstance(Locale.ROOT); private final AtomicLong pendingNumDocs; private final LiveIndexWriterConfig indexWriterConfig; private final boolean enableTestPoints; private final ReentrantLock lock = new ReentrantLock(); private int[] deleteDocIDs = new int[0]; private int numDeletedDocIds = 0; DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, DocumentsWriterDeleteQueue deleteQueue, FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) { this.directory = new TrackingDirectoryWrapper(directory); this.fieldInfos = fieldInfos; this.indexWriterConfig = indexWriterConfig; this.infoStream = indexWriterConfig.getInfoStream(); this.codec = indexWriterConfig.getCodec(); this.pendingNumDocs = pendingNumDocs; pendingUpdates = new BufferedUpdates(segmentName); this.deleteQueue = Objects.requireNonNull(deleteQueue); assert numDocsInRAM == 0 : "num docs " + numDocsInRAM; deleteSlice = deleteQueue.newSlice(); segmentInfo = new SegmentInfo(directoryOrig, Version.LATEST, Version.LATEST, segmentName, -1, false, codec, Collections.emptyMap(), StringHelper.randomId(), Collections.emptyMap(), indexWriterConfig.getIndexSort()); assert numDocsInRAM == 0; if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segmentName + " delQueue=" + deleteQueue); } this.enableTestPoints = enableTestPoints; consumer = indexWriterConfig.getIndexingChain().getChain(indexVersionCreated, segmentInfo, this.directory, fieldInfos, indexWriterConfig, this::onAbortingException); } final void testPoint(String message) { if (enableTestPoints) { assert infoStream.isEnabled("TP"); // don't enable unless you need them. infoStream.message("TP", message); } }
Anything that will add N docs to the index should reserve first to make sure it's allowed.
/** Anything that will add N docs to the index should reserve first to * make sure it's allowed. */
private void reserveOneDoc() { if (pendingNumDocs.incrementAndGet() > IndexWriter.getActualMaxDocs()) { // Reserve failed: put the one doc back and throw exc: pendingNumDocs.decrementAndGet(); throw new IllegalArgumentException("number of documents in the index cannot exceed " + IndexWriter.getActualMaxDocs()); } } long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException { try { testPoint("DocumentsWriterPerThread addDocuments start"); assert abortingException == null: "DWPT has hit aborting exception but is still indexing"; if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + deleteNode + " docID=" + numDocsInRAM + " seg=" + segmentInfo.name); } final int docsInRamBefore = numDocsInRAM; boolean allDocsIndexed = false; try { for (Iterable<? extends IndexableField> doc : docs) { // Even on exception, the document is still added (but marked // deleted), so we don't need to un-reserve at that point. // Aborting exceptions will actually "lose" more than one // document, so the counter will be "wrong" in that case, but // it's very hard to fix (we can't easily distinguish aborting // vs non-aborting exceptions): reserveOneDoc(); consumer.processDocument(numDocsInRAM++, doc); } allDocsIndexed = true; return finishDocuments(deleteNode, docsInRamBefore); } finally { if (!allDocsIndexed && !aborted) { // the iterator threw an exception that is not aborting // go and mark all docs from this block as deleted deleteLastDocs(numDocsInRAM - docsInRamBefore); } } } finally { maybeAbort("updateDocuments", flushNotifications); } } private long finishDocuments(DocumentsWriterDeleteQueue.Node<?> deleteNode, int docIdUpTo) { /* * here we actually finish the document in two steps 1. push the delete into * the queue and update our slice. 2. increment the DWPT private document * id. * * the updated slice we get from 1. holds all the deletes that have occurred * since we updated the slice the last time. */ // Apply delTerm only after all indexing has // succeeded, but apply it only to docs prior to when // this batch started: long seqNo; if (deleteNode != null) { seqNo = deleteQueue.add(deleteNode, deleteSlice); assert deleteSlice.isTail(deleteNode) : "expected the delete term as the tail item"; deleteSlice.apply(pendingUpdates, docIdUpTo); return seqNo; } else { seqNo = deleteQueue.updateSlice(deleteSlice); if (seqNo < 0) { seqNo = -seqNo; deleteSlice.apply(pendingUpdates, docIdUpTo); } else { deleteSlice.reset(); } } return seqNo; } // This method marks the last N docs as deleted. This is used // in the case of a non-aborting exception. There are several cases // where we fail a document ie. due to an exception during analysis // that causes the doc to be rejected but won't cause the DWPT to be // stale nor the entire IW to abort and shutdown. In such a case // we only mark these docs as deleted and turn it into a livedocs // during flush private void deleteLastDocs(int docCount) { int from = numDocsInRAM-docCount; int to = numDocsInRAM; deleteDocIDs = ArrayUtil.grow(deleteDocIDs, numDeletedDocIds + (to-from)); for (int docId = from; docId < to; docId++) { deleteDocIDs[numDeletedDocIds++] = docId; } // NOTE: we do not trigger flush here. This is // potentially a RAM leak, if you have an app that tries // to add docs but every single doc always hits a // non-aborting exception. Allowing a flush here gets // very messy because we are only invoked when handling // exceptions so to do this properly, while handling an // exception we'd have to go off and flush new deletes // which is risky (likely would hit some other // confounding exception). }
Returns the number of RAM resident documents in this DocumentsWriterPerThread
/** * Returns the number of RAM resident documents in this {@link DocumentsWriterPerThread} */
public int getNumDocsInRAM() { // public for FlushPolicy return numDocsInRAM; }
Prepares this DWPT for flushing. This method will freeze and return the DocumentsWriterDeleteQueues global buffer and apply all pending deletes to this DWPT.
/** * Prepares this DWPT for flushing. This method will freeze and return the * {@link DocumentsWriterDeleteQueue}s global buffer and apply all pending * deletes to this DWPT. */
FrozenBufferedUpdates prepareFlush() { assert numDocsInRAM > 0; final FrozenBufferedUpdates globalUpdates = deleteQueue.freezeGlobalBuffer(deleteSlice); /* deleteSlice can possibly be null if we have hit non-aborting exceptions during indexing and never succeeded adding a document. */ if (deleteSlice != null) { // apply all deletes before we flush and release the delete slice deleteSlice.apply(pendingUpdates, numDocsInRAM); assert deleteSlice.isEmpty(); deleteSlice.reset(); } return globalUpdates; }
Flush all pending docs to a new segment
/** Flush all pending docs to a new segment */
FlushedSegment flush(DocumentsWriter.FlushNotifications flushNotifications) throws IOException { assert flushPending.get() == Boolean.TRUE; assert numDocsInRAM > 0; assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush"; segmentInfo.setMaxDoc(numDocsInRAM); final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(), pendingUpdates, new IOContext(new FlushInfo(numDocsInRAM, lastCommittedBytesUsed))); final double startMBUsed = lastCommittedBytesUsed / 1024. / 1024.; // Apply delete-by-docID now (delete-byDocID only // happens when an exception is hit processing that // doc, eg if analyzer has some problem w/ the text): if (numDeletedDocIds > 0) { flushState.liveDocs = new FixedBitSet(numDocsInRAM); flushState.liveDocs.set(0, numDocsInRAM); for (int i = 0; i < numDeletedDocIds; i++) { flushState.liveDocs.clear(deleteDocIDs[i]); } flushState.delCountOnFlush = numDeletedDocIds; deleteDocIDs = new int[0]; } if (aborted) { if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "flush: skip because aborting is set"); } return null; } long t0 = System.nanoTime(); if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM); } final Sorter.DocMap sortMap; try { DocIdSetIterator softDeletedDocs; if (indexWriterConfig.getSoftDeletesField() != null) { softDeletedDocs = consumer.getHasDocValues(indexWriterConfig.getSoftDeletesField()); } else { softDeletedDocs = null; } sortMap = consumer.flush(flushState); if (softDeletedDocs == null) { flushState.softDelCountOnFlush = 0; } else { flushState.softDelCountOnFlush = PendingSoftDeletes.countSoftDeletes(softDeletedDocs, flushState.liveDocs); assert flushState.segmentInfo.maxDoc() >= flushState.softDelCountOnFlush + flushState.delCountOnFlush; } // We clear this here because we already resolved them (private to this segment) when writing postings: pendingUpdates.clearDeleteTerms(); segmentInfo.setFiles(new HashSet<>(directory.getCreatedFiles())); final SegmentCommitInfo segmentInfoPerCommit = new SegmentCommitInfo(segmentInfo, 0, flushState.softDelCountOnFlush, -1L, -1L, -1L, StringHelper.randomId()); if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "new segment has " + (flushState.liveDocs == null ? 0 : flushState.delCountOnFlush) + " deleted docs"); infoStream.message("DWPT", "new segment has " + flushState.softDelCountOnFlush + " soft-deleted docs"); infoStream.message("DWPT", "new segment has " + (flushState.fieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " + (flushState.fieldInfos.hasNorms() ? "norms" : "no norms") + "; " + (flushState.fieldInfos.hasDocValues() ? "docValues" : "no docValues") + "; " + (flushState.fieldInfos.hasProx() ? "prox" : "no prox") + "; " + (flushState.fieldInfos.hasFreq() ? "freqs" : "no freqs")); infoStream.message("DWPT", "flushedFiles=" + segmentInfoPerCommit.files()); infoStream.message("DWPT", "flushed codec=" + codec); } final BufferedUpdates segmentDeletes; if (pendingUpdates.deleteQueries.isEmpty() && pendingUpdates.numFieldUpdates.get() == 0) { pendingUpdates.clear(); segmentDeletes = null; } else { segmentDeletes = pendingUpdates; } if (infoStream.isEnabled("DWPT")) { final double newSegmentSize = segmentInfoPerCommit.sizeInBytes() / 1024. / 1024.; infoStream.message("DWPT", "flushed: segment=" + segmentInfo.name + " ramUsed=" + nf.format(startMBUsed) + " MB" + " newFlushedSize=" + nf.format(newSegmentSize) + " MB" + " docs/MB=" + nf.format(flushState.segmentInfo.maxDoc() / newSegmentSize)); } assert segmentInfo != null; FlushedSegment fs = new FlushedSegment(infoStream, segmentInfoPerCommit, flushState.fieldInfos, segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush, sortMap); sealFlushedSegment(fs, sortMap, flushNotifications); if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "flush time " + ((System.nanoTime() - t0) / 1000000.0) + " msec"); } return fs; } catch (Throwable t) { onAbortingException(t); throw t; } finally { maybeAbort("flush", flushNotifications); hasFlushed.set(Boolean.TRUE); } } private void maybeAbort(String location, DocumentsWriter.FlushNotifications flushNotifications) throws IOException { if (abortingException != null && aborted == false) { // if we are already aborted don't do anything here try { abort(); } finally { // whatever we do here we have to fire this tragic event up. flushNotifications.onTragicEvent(abortingException, location); } } } private final Set<String> filesToDelete = new HashSet<>(); Set<String> pendingFilesToDelete() { return filesToDelete; } private FixedBitSet sortLiveDocs(Bits liveDocs, Sorter.DocMap sortMap) { assert liveDocs != null && sortMap != null; FixedBitSet sortedLiveDocs = new FixedBitSet(liveDocs.length()); sortedLiveDocs.set(0, liveDocs.length()); for (int i = 0; i < liveDocs.length(); i++) { if (liveDocs.get(i) == false) { sortedLiveDocs.clear(sortMap.oldToNew(i)); } } return sortedLiveDocs; }
Seals the SegmentInfo for the new flushed segment and persists the deleted documents FixedBitSet.
/** * Seals the {@link SegmentInfo} for the new flushed segment and persists * the deleted documents {@link FixedBitSet}. */
void sealFlushedSegment(FlushedSegment flushedSegment, Sorter.DocMap sortMap, DocumentsWriter.FlushNotifications flushNotifications) throws IOException { assert flushedSegment != null; SegmentCommitInfo newSegment = flushedSegment.segmentInfo; IndexWriter.setDiagnostics(newSegment.info, IndexWriter.SOURCE_FLUSH); IOContext context = new IOContext(new FlushInfo(newSegment.info.maxDoc(), newSegment.sizeInBytes())); boolean success = false; try { if (indexWriterConfig.getUseCompoundFile()) { Set<String> originalFiles = newSegment.info.files(); // TODO: like addIndexes, we are relying on createCompoundFile to successfully cleanup... IndexWriter.createCompoundFile(infoStream, new TrackingDirectoryWrapper(directory), newSegment.info, context, flushNotifications::deleteUnusedFiles); filesToDelete.addAll(originalFiles); newSegment.info.setUseCompoundFile(true); } // Have codec write SegmentInfo. Must do this after // creating CFS so that 1) .si isn't slurped into CFS, // and 2) .si reflects useCompoundFile=true change // above: codec.segmentInfoFormat().write(directory, newSegment.info, context); // TODO: ideally we would freeze newSegment here!! // because any changes after writing the .si will be // lost... // Must write deleted docs after the CFS so we don't // slurp the del file into CFS: if (flushedSegment.liveDocs != null) { final int delCount = flushedSegment.delCount; assert delCount > 0; if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "flush: write " + delCount + " deletes gen=" + flushedSegment.segmentInfo.getDelGen()); } // TODO: we should prune the segment if it's 100% // deleted... but merge will also catch it. // TODO: in the NRT case it'd be better to hand // this del vector over to the // shortly-to-be-opened SegmentReader and let it // carry the changes; there's no reason to use // filesystem as intermediary here. SegmentCommitInfo info = flushedSegment.segmentInfo; Codec codec = info.info.getCodec(); final FixedBitSet bits; if (sortMap == null) { bits = flushedSegment.liveDocs; } else { bits = sortLiveDocs(flushedSegment.liveDocs, sortMap); } codec.liveDocsFormat().writeLiveDocs(bits, directory, info, delCount, context); newSegment.setDelCount(delCount); newSegment.advanceDelGen(); } success = true; } finally { if (!success) { if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "hit exception creating compound file for newly flushed segment " + newSegment.info.name); } } } }
Get current segment info we are writing.
/** Get current segment info we are writing. */
SegmentInfo getSegmentInfo() { return segmentInfo; } @Override public long ramBytesUsed() { assert lock.isHeldByCurrentThread(); return (deleteDocIDs.length * Integer.BYTES)+ pendingUpdates.ramBytesUsed() + consumer.ramBytesUsed(); } @Override public Collection<Accountable> getChildResources() { assert lock.isHeldByCurrentThread(); return Collections.unmodifiableList(Arrays.asList(pendingUpdates, consumer)); } @Override public String toString() { return "DocumentsWriterPerThread [pendingDeletes=" + pendingUpdates + ", segment=" + (segmentInfo != null ? segmentInfo.name : "null") + ", aborted=" + aborted + ", numDocsInRAM=" + numDocsInRAM + ", deleteQueue=" + deleteQueue + ", " + numDeletedDocIds + " deleted docIds" + "]"; }
Returns true iff this DWPT is marked as flush pending
/** * Returns true iff this DWPT is marked as flush pending */
boolean isFlushPending() { return flushPending.get() == Boolean.TRUE; }
Sets this DWPT as flush pending. This can only be set once.
/** * Sets this DWPT as flush pending. This can only be set once. */
void setFlushPending() { flushPending.set(Boolean.TRUE); }
Returns the last committed bytes for this DWPT. This method can be called without acquiring the DWPTs lock.
/** * Returns the last committed bytes for this DWPT. This method can be called * without acquiring the DWPTs lock. */
long getLastCommittedBytesUsed() { return lastCommittedBytesUsed; }
Commits the current ramBytesUsed() and stores it's value for later reuse. The last committed bytes used can be retrieved via getLastCommittedBytesUsed()
Returns:the delta between the current ramBytesUsed() and the current getLastCommittedBytesUsed()
/** * Commits the current {@link #ramBytesUsed()} and stores it's value for later reuse. * The last committed bytes used can be retrieved via {@link #getLastCommittedBytesUsed()} * @return the delta between the current {@link #ramBytesUsed()} and the current {@link #getLastCommittedBytesUsed()} */
long commitLastBytesUsed() { assert isHeldByCurrentThread(); long delta = ramBytesUsed() - lastCommittedBytesUsed; lastCommittedBytesUsed += delta; return delta; }
Locks this DWPT for exclusive access.
See Also:
  • lock.lock()
/** * Locks this DWPT for exclusive access. * @see ReentrantLock#lock() */
void lock() { lock.lock(); }
Acquires the DWPT's lock only if it is not held by another thread at the time of invocation.
See Also:
Returns:true if the lock was acquired.
/** * Acquires the DWPT's lock only if it is not held by another thread at the time * of invocation. * @return true if the lock was acquired. * @see ReentrantLock#tryLock() */
boolean tryLock() { return lock.tryLock(); }
Returns true if the DWPT's lock is held by the current thread
See Also:
  • isHeldByCurrentThread.isHeldByCurrentThread()
/** * Returns true if the DWPT's lock is held by the current thread * @see ReentrantLock#isHeldByCurrentThread() */
boolean isHeldByCurrentThread() { return lock.isHeldByCurrentThread(); }
Unlocks the DWPT's lock
See Also:
  • unlock.unlock()
/** * Unlocks the DWPT's lock * @see ReentrantLock#unlock() */
void unlock() { lock.unlock(); }
Returns true iff this DWPT has been flushed
/** * Returns <code>true</code> iff this DWPT has been flushed */
boolean hasFlushed() { return hasFlushed.get() == Boolean.TRUE; } }