 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.lucene.index;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntConsumer;

import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.RamUsageEstimator;

Holds buffered deletes and updates by term or query, once pushed. Pushed deletes/updates are write-once, so we shift to more memory efficient data structure to hold them. We don't hold docIDs because these are applied on flush.
/** * Holds buffered deletes and updates by term or query, once pushed. Pushed * deletes/updates are write-once, so we shift to more memory efficient data * structure to hold them. We don't hold docIDs because these are applied on * flush. */
final class FrozenBufferedUpdates { /* NOTE: we now apply this frozen packet immediately on creation, yet this process is heavy, and runs * in multiple threads, and this compression is sizable (~8.3% of the original size), so it's important * we run this before applying the deletes/updates. */ /* Query we often undercount (say 24 bytes), plus int. */ final static int BYTES_PER_DEL_QUERY = RamUsageEstimator.NUM_BYTES_OBJECT_REF + Integer.BYTES + 24; // Terms, in sorted order: final PrefixCodedTerms deleteTerms; // Parallel array of deleted query, and the docIDUpto for each final Query[] deleteQueries; final int[] deleteQueryLimits;
Counts down once all deletes/updates have been applied
/** Counts down once all deletes/updates have been applied */
public final CountDownLatch applied = new CountDownLatch(1); private final ReentrantLock applyLock = new ReentrantLock(); private final Map<String, FieldUpdatesBuffer> fieldUpdates;
How many total documents were deleted/updated.
/** How many total documents were deleted/updated. */
public long totalDelCount; private final int fieldUpdatesCount; final int bytesUsed; final int numTermDeletes; private long delGen = -1; // assigned by BufferedUpdatesStream once pushed final SegmentCommitInfo privateSegment; // non-null iff this frozen packet represents // a segment private deletes. in that case is should // only have Queries and doc values updates private final InfoStream infoStream; public FrozenBufferedUpdates(InfoStream infoStream, BufferedUpdates updates, SegmentCommitInfo privateSegment) { this.infoStream = infoStream; this.privateSegment = privateSegment; assert privateSegment == null || updates.deleteTerms.isEmpty() : "segment private packet should only have del queries"; Term termsArray[] = updates.deleteTerms.keySet().toArray(new Term[updates.deleteTerms.size()]); ArrayUtil.timSort(termsArray); PrefixCodedTerms.Builder builder = new PrefixCodedTerms.Builder(); for (Term term : termsArray) { builder.add(term); } deleteTerms = builder.finish(); deleteQueries = new Query[updates.deleteQueries.size()]; deleteQueryLimits = new int[updates.deleteQueries.size()]; int upto = 0; for(Map.Entry<Query,Integer> ent : updates.deleteQueries.entrySet()) { deleteQueries[upto] = ent.getKey(); deleteQueryLimits[upto] = ent.getValue(); upto++; } // TODO if a Term affects multiple fields, we could keep the updates key'd by Term // so that it maps to all fields it affects, sorted by their docUpto, and traverse // that Term only once, applying the update to all fields that still need to be // updated. updates.fieldUpdates.values().forEach(FieldUpdatesBuffer::finish); this.fieldUpdates = Collections.unmodifiableMap(new HashMap<>(updates.fieldUpdates)); this.fieldUpdatesCount = updates.numFieldUpdates.get(); bytesUsed = (int) ((deleteTerms.ramBytesUsed() + deleteQueries.length * BYTES_PER_DEL_QUERY) + updates.fieldUpdatesBytesUsed.get()); numTermDeletes = updates.numTermDeletes.get(); if (infoStream != null && infoStream.isEnabled("BD")) { infoStream.message("BD", String.format(Locale.ROOT, "compressed %d to %d bytes (%.2f%%) for deletes/updates; private segment %s", updates.ramBytesUsed(), bytesUsed, 100.*bytesUsed/updates.ramBytesUsed(), privateSegment)); } }
Tries to lock this buffered update instance
Returns:true if the lock was successfully acquired. otherwise false.
/** * Tries to lock this buffered update instance * @return true if the lock was successfully acquired. otherwise false. */
boolean tryLock() { return applyLock.tryLock(); }
locks this buffered update instance
/** * locks this buffered update instance */
void lock() { applyLock.lock(); }
Releases the lock of this buffered update instance
/** * Releases the lock of this buffered update instance */
void unlock() { applyLock.unlock(); }
Returns true iff this buffered updates instance was already applied
/** * Returns true iff this buffered updates instance was already applied */
boolean isApplied() { assert applyLock.isHeldByCurrentThread(); return applied.getCount() == 0; }
Applies pending delete-by-term, delete-by-query and doc values updates to all segments in the index, returning the number of new deleted or updated documents.
/** Applies pending delete-by-term, delete-by-query and doc values updates to all segments in the index, returning * the number of new deleted or updated documents. */
long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException { assert applyLock.isHeldByCurrentThread(); if (delGen == -1) { // we were not yet pushed throw new IllegalArgumentException("gen is not yet set; call BufferedUpdatesStream.push first"); } assert applied.getCount() != 0; if (privateSegment != null) { assert segStates.length == 1; assert privateSegment == segStates[0].reader.getOriginalSegmentInfo(); } totalDelCount += applyTermDeletes(segStates); totalDelCount += applyQueryDeletes(segStates); totalDelCount += applyDocValuesUpdates(segStates); return totalDelCount; } private long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState[] segStates) throws IOException { if (fieldUpdates.isEmpty()) { return 0; } long startNS = System.nanoTime(); long updateCount = 0; for (BufferedUpdatesStream.SegmentState segState : segStates) { if (delGen < segState.delGen) { // segment is newer than this deletes packet continue; } if (segState.rld.refCount() == 1) { // This means we are the only remaining reference to this segment, meaning // it was merged away while we were running, so we can safely skip running // because we will run on the newly merged segment next: continue; } final boolean isSegmentPrivateDeletes = privateSegment != null; if (fieldUpdates.isEmpty() == false) { updateCount += applyDocValuesUpdates(segState, fieldUpdates, delGen, isSegmentPrivateDeletes); } } if (infoStream.isEnabled("BD")) { infoStream.message("BD", String.format(Locale.ROOT, "applyDocValuesUpdates %.1f msec for %d segments, %d field updates; %d new updates", (System.nanoTime()-startNS)/1000000., segStates.length, fieldUpdatesCount, updateCount)); } return updateCount; } private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState segState, Map<String, FieldUpdatesBuffer> updates, long delGen, boolean segmentPrivateDeletes) throws IOException { // TODO: we can process the updates per DV field, from last to first so that // if multiple terms affect same document for the same field, we add an update // only once (that of the last term). To do that, we can keep a bitset which // marks which documents have already been updated. So e.g. if term T1 // updates doc 7, and then we process term T2 and it updates doc 7 as well, // we don't apply the update since we know T1 came last and therefore wins // the update. // We can also use that bitset as 'liveDocs' to pass to TermEnum.docs(), so // that these documents aren't even returned. long updateCount = 0; // We first write all our updates private, and only in the end publish to the ReadersAndUpdates */ final List<DocValuesFieldUpdates> resolvedUpdates = new ArrayList<>(); for (Map.Entry<String, FieldUpdatesBuffer> fieldUpdate : updates.entrySet()) { String updateField = fieldUpdate.getKey(); DocValuesFieldUpdates dvUpdates = null; FieldUpdatesBuffer value = fieldUpdate.getValue(); boolean isNumeric = value.isNumeric(); FieldUpdatesBuffer.BufferedUpdateIterator iterator = value.iterator(); FieldUpdatesBuffer.BufferedUpdate bufferedUpdate; TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, iterator.isSortedTerms()); while ((bufferedUpdate = iterator.next()) != null) { // TODO: we traverse the terms in update order (not term order) so that we // apply the updates in the correct order, i.e. if two terms update the // same document, the last one that came in wins, irrespective of the // terms lexical order. // we can apply the updates in terms order if we keep an updatesGen (and // increment it with every update) and attach it to each NumericUpdate. Note // that we cannot rely only on docIDUpto because an app may send two updates // which will get same docIDUpto, yet will still need to respect the order // those updates arrived. // TODO: we could at least *collate* by field? final DocIdSetIterator docIdSetIterator = termDocsIterator.nextTerm(bufferedUpdate.termField, bufferedUpdate.termValue); if (docIdSetIterator != null) { final int limit; if (delGen == segState.delGen) { assert segmentPrivateDeletes; limit = bufferedUpdate.docUpTo; } else { limit = Integer.MAX_VALUE; } final BytesRef binaryValue; final long longValue; if (bufferedUpdate.hasValue == false) { longValue = -1; binaryValue = null; } else { longValue = bufferedUpdate.numericValue; binaryValue = bufferedUpdate.binaryValue; } if (dvUpdates == null) { if (isNumeric) { if (value.hasSingleValue()) { dvUpdates = new NumericDocValuesFieldUpdates .SingleValueNumericDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc(), value.getNumericValue(0)); } else { dvUpdates = new NumericDocValuesFieldUpdates(delGen, updateField, value.getMinNumeric(), value.getMaxNumeric(), segState.reader.maxDoc()); } } else { dvUpdates = new BinaryDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc()); } resolvedUpdates.add(dvUpdates); } final IntConsumer docIdConsumer; final DocValuesFieldUpdates update = dvUpdates; if (bufferedUpdate.hasValue == false) { docIdConsumer = doc -> update.reset(doc); } else if (isNumeric) { docIdConsumer = doc -> update.add(doc, longValue); } else { docIdConsumer = doc -> update.add(doc, binaryValue); } final Bits acceptDocs = segState.rld.getLiveDocs(); if (segState.rld.sortMap != null && segmentPrivateDeletes) { // This segment was sorted on flush; we must apply seg-private deletes carefully in this case: int doc; while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { if (acceptDocs == null || acceptDocs.get(doc)) { // The limit is in the pre-sorted doc space: if (segState.rld.sortMap.newToOld(doc) < limit) { docIdConsumer.accept(doc); updateCount++; } } } } else { int doc; while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { if (doc >= limit) { break; // no more docs that can be updated for this term } if (acceptDocs == null || acceptDocs.get(doc)) { docIdConsumer.accept(doc); updateCount++; } } } } } } // now freeze & publish: for (DocValuesFieldUpdates update : resolvedUpdates) { if (update.any()) { update.finish(); segState.rld.addDVUpdate(update); } } return updateCount; } // Delete by query private long applyQueryDeletes(BufferedUpdatesStream.SegmentState[] segStates) throws IOException { if (deleteQueries.length == 0) { return 0; } long startNS = System.nanoTime(); long delCount = 0; for (BufferedUpdatesStream.SegmentState segState : segStates) { if (delGen < segState.delGen) { // segment is newer than this deletes packet continue; } if (segState.rld.refCount() == 1) { // This means we are the only remaining reference to this segment, meaning // it was merged away while we were running, so we can safely skip running // because we will run on the newly merged segment next: continue; } final LeafReaderContext readerContext = segState.reader.getContext(); for (int i = 0; i < deleteQueries.length; i++) { Query query = deleteQueries[i]; int limit; if (delGen == segState.delGen) { assert privateSegment != null; limit = deleteQueryLimits[i]; } else { limit = Integer.MAX_VALUE; } final IndexSearcher searcher = new IndexSearcher(readerContext.reader()); searcher.setQueryCache(null); query = searcher.rewrite(query); final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1); final Scorer scorer = weight.scorer(readerContext); if (scorer != null) { final DocIdSetIterator it = scorer.iterator(); if (segState.rld.sortMap != null && limit != Integer.MAX_VALUE) { assert privateSegment != null; // This segment was sorted on flush; we must apply seg-private deletes carefully in this case: int docID; while ((docID = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { // The limit is in the pre-sorted doc space: if (segState.rld.sortMap.newToOld(docID) < limit) { if (segState.rld.delete(docID)) { delCount++; } } } } else { int docID; while ((docID = it.nextDoc()) < limit) { if (segState.rld.delete(docID)) { delCount++; } } } } } } if (infoStream.isEnabled("BD")) { infoStream.message("BD", String.format(Locale.ROOT, "applyQueryDeletes took %.2f msec for %d segments and %d queries; %d new deletions", (System.nanoTime()-startNS)/1000000., segStates.length, deleteQueries.length, delCount)); } return delCount; } private long applyTermDeletes(BufferedUpdatesStream.SegmentState[] segStates) throws IOException { if (deleteTerms.size() == 0) { return 0; } // We apply segment-private deletes on flush: assert privateSegment == null; long startNS = System.nanoTime(); long delCount = 0; for (BufferedUpdatesStream.SegmentState segState : segStates) { assert segState.delGen != delGen: "segState.delGen=" + segState.delGen + " vs this.gen=" + delGen; if (segState.delGen > delGen) { // our deletes don't apply to this segment continue; } if (segState.rld.refCount() == 1) { // This means we are the only remaining reference to this segment, meaning // it was merged away while we were running, so we can safely skip running // because we will run on the newly merged segment next: continue; } FieldTermIterator iter = deleteTerms.iterator(); BytesRef delTerm; TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, true); while ((delTerm = iter.next()) != null) { final DocIdSetIterator iterator = termDocsIterator.nextTerm(iter.field(), delTerm); if (iterator != null) { int docID; while ((docID = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { // NOTE: there is no limit check on the docID // when deleting by Term (unlike by Query) // because on flush we apply all Term deletes to // each segment. So all Term deleting here is // against prior segments: if (segState.rld.delete(docID)) { delCount++; } } } } } if (infoStream.isEnabled("BD")) { infoStream.message("BD", String.format(Locale.ROOT, "applyTermDeletes took %.2f msec for %d segments and %d del terms; %d new deletions", (System.nanoTime()-startNS)/1000000., segStates.length, deleteTerms.size(), delCount)); } return delCount; } public void setDelGen(long delGen) { assert this.delGen == -1: "delGen was already previously set to " + this.delGen; this.delGen = delGen; deleteTerms.setDelGen(delGen); } public long delGen() { assert delGen != -1; return delGen; } @Override public String toString() { String s = "delGen=" + delGen; if (numTermDeletes != 0) { s += " numDeleteTerms=" + numTermDeletes; if (numTermDeletes != deleteTerms.size()) { s += " (" + deleteTerms.size() + " unique)"; } } if (deleteQueries.length != 0) { s += " numDeleteQueries=" + deleteQueries.length; } if (fieldUpdates.size() > 0) { s += " fieldUpdates=" + fieldUpdatesCount; } if (bytesUsed != 0) { s += " bytesUsed=" + bytesUsed; } if (privateSegment != null) { s += " privateSegment=" + privateSegment; } return s; } boolean any() { return deleteTerms.size() > 0 || deleteQueries.length > 0 || fieldUpdatesCount > 0 ; }
This class helps iterating a term dictionary and consuming all the docs for each terms. It accepts a field, value tuple and returns a DocIdSetIterator if the field has an entry for the given value. It has an optimized way of iterating the term dictionary if the terms are passed in sorted order and makes sure terms and postings are reused as much as possible.
/** * This class helps iterating a term dictionary and consuming all the docs for each terms. * It accepts a field, value tuple and returns a {@link DocIdSetIterator} if the field has an entry * for the given value. It has an optimized way of iterating the term dictionary if the terms are * passed in sorted order and makes sure terms and postings are reused as much as possible. */
static final class TermDocsIterator { private final TermsProvider provider; private String field; private TermsEnum termsEnum; private PostingsEnum postingsEnum; private final boolean sortedTerms; private BytesRef readerTerm; private BytesRef lastTerm; // only set with asserts @FunctionalInterface interface TermsProvider { Terms terms(String field) throws IOException; } TermDocsIterator(Fields fields, boolean sortedTerms) { this(fields::terms, sortedTerms); } TermDocsIterator(LeafReader reader, boolean sortedTerms) { this(reader::terms, sortedTerms); } private TermDocsIterator(TermsProvider provider, boolean sortedTerms) { this.sortedTerms = sortedTerms; this.provider = provider; } private void setField(String field) throws IOException { if (this.field == null || this.field.equals(field) == false) { this.field = field; Terms terms = provider.terms(field); if (terms != null) { termsEnum = terms.iterator(); if (sortedTerms) { assert (lastTerm = null) == null; // need to reset otherwise we fail the assertSorted below since we sort per field readerTerm = termsEnum.next(); } } else { termsEnum = null; } } } DocIdSetIterator nextTerm(String field, BytesRef term) throws IOException { setField(field); if (termsEnum != null) { if (sortedTerms) { assert assertSorted(term); // in the sorted case we can take advantage of the "seeking forward" property // this allows us depending on the term dict impl to reuse data-structures internally // which speed up iteration over terms and docs significantly. int cmp = term.compareTo(readerTerm); if (cmp < 0) { return null; // requested term does not exist in this segment } else if (cmp == 0) { return getDocs(); } else { TermsEnum.SeekStatus status = termsEnum.seekCeil(term); switch (status) { case FOUND: return getDocs(); case NOT_FOUND: readerTerm = termsEnum.term(); return null; case END: // no more terms in this segment termsEnum = null; return null; default: throw new AssertionError("unknown status"); } } } else if (termsEnum.seekExact(term)) { return getDocs(); } } return null; } private boolean assertSorted(BytesRef term) { assert sortedTerms; assert lastTerm == null || term.compareTo(lastTerm) >= 0 : "boom: " + term.utf8ToString() + " last: " + lastTerm.utf8ToString(); lastTerm = BytesRef.deepCopyOf(term); return true; } private DocIdSetIterator getDocs() throws IOException { assert termsEnum != null; return postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE); } } }