/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.search;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.util.ThreadInterruptedException;
Utility class that runs a thread to manage periodicc reopens of a ReferenceManager
, with methods to wait for a specific index changes to become visible. When a given search request needs to see a specific index change, call the {#waitForGeneration} to wait for that change to be visible. Note that this will only scale well if most searches do not need to wait for a specific index generation. @lucene.experimental
/** Utility class that runs a thread to manage periodicc
* reopens of a {@link ReferenceManager}, with methods to wait for a specific
* index changes to become visible. When a given search request needs to see a specific
* index change, call the {#waitForGeneration} to wait for
* that change to be visible. Note that this will only
* scale well if most searches do not need to wait for a
* specific index generation.
*
* @lucene.experimental */
public class ControlledRealTimeReopenThread<T> extends Thread implements Closeable {
private final ReferenceManager<T> manager;
private final long targetMaxStaleNS;
private final long targetMinStaleNS;
private final IndexWriter writer;
private volatile boolean finish;
private volatile long waitingGen;
private volatile long searchingGen;
private long refreshStartGen;
private final ReentrantLock reopenLock = new ReentrantLock();
private final Condition reopenCond = reopenLock.newCondition();
Create ControlledRealTimeReopenThread, to periodically reopen the ReferenceManager
. Params: - targetMaxStaleSec – Maximum time until a new
reader must be opened; this sets the upper bound
on how slowly reopens may occur, when no
caller is waiting for a specific generation to
become visible.
- targetMinStaleSec – Mininum time until a new
reader can be opened; this sets the lower bound
on how quickly reopens may occur, when a caller
is waiting for a specific generation to
become visible.
/**
* Create ControlledRealTimeReopenThread, to periodically
* reopen the {@link ReferenceManager}.
*
* @param targetMaxStaleSec Maximum time until a new
* reader must be opened; this sets the upper bound
* on how slowly reopens may occur, when no
* caller is waiting for a specific generation to
* become visible.
*
* @param targetMinStaleSec Mininum time until a new
* reader can be opened; this sets the lower bound
* on how quickly reopens may occur, when a caller
* is waiting for a specific generation to
* become visible.
*/
public ControlledRealTimeReopenThread(IndexWriter writer, ReferenceManager<T> manager, double targetMaxStaleSec, double targetMinStaleSec) {
if (targetMaxStaleSec < targetMinStaleSec) {
throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")");
}
this.writer = writer;
this.manager = manager;
this.targetMaxStaleNS = (long) (1000000000*targetMaxStaleSec);
this.targetMinStaleNS = (long) (1000000000*targetMinStaleSec);
manager.addListener(new HandleRefresh());
}
private class HandleRefresh implements ReferenceManager.RefreshListener {
@Override
public void beforeRefresh() {
}
@Override
public void afterRefresh(boolean didRefresh) {
refreshDone();
}
}
private synchronized void refreshDone() {
searchingGen = refreshStartGen;
notifyAll();
}
@Override
public synchronized void close() {
//System.out.println("NRT: set finish");
finish = true;
// So thread wakes up and notices it should finish:
reopenLock.lock();
try {
reopenCond.signal();
} finally {
reopenLock.unlock();
}
try {
join();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
// Max it out so any waiting search threads will return:
searchingGen = Long.MAX_VALUE;
notifyAll();
}
Waits for the target generation to become visible in the searcher. If the current searcher is older than the target generation, this method will block until the searcher is reopened, by another via ReferenceManager.maybeRefresh
or until the ReferenceManager
is closed. Params: - targetGen – the generation to wait for
/**
* Waits for the target generation to become visible in
* the searcher.
* If the current searcher is older than the
* target generation, this method will block
* until the searcher is reopened, by another via
* {@link ReferenceManager#maybeRefresh} or until the {@link ReferenceManager} is closed.
*
* @param targetGen the generation to wait for
*/
public void waitForGeneration(long targetGen) throws InterruptedException {
waitForGeneration(targetGen, -1);
}
Waits for the target generation to become visible in the searcher, up to a maximum specified milli-seconds. If the current searcher is older than the target generation, this method will block until the searcher has been reopened by another thread via ReferenceManager.maybeRefresh
, the given waiting time has elapsed, or until the ReferenceManager
is closed. NOTE: if the waiting time elapses before the requested target generation is available the current SearcherManager
is returned instead.
Params: - targetGen –
the generation to wait for
- maxMS –
maximum milliseconds to wait, or -1 to wait indefinitely
Returns: true if the targetGeneration is now available,
or false if maxMS wait time was exceeded
/**
* Waits for the target generation to become visible in
* the searcher, up to a maximum specified milli-seconds.
* If the current searcher is older than the target
* generation, this method will block until the
* searcher has been reopened by another thread via
* {@link ReferenceManager#maybeRefresh}, the given waiting time has elapsed, or until
* the {@link ReferenceManager} is closed.
* <p>
* NOTE: if the waiting time elapses before the requested target generation is
* available the current {@link SearcherManager} is returned instead.
*
* @param targetGen
* the generation to wait for
* @param maxMS
* maximum milliseconds to wait, or -1 to wait indefinitely
* @return true if the targetGeneration is now available,
* or false if maxMS wait time was exceeded
*/
public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException {
if (targetGen > searchingGen) {
// Notify the reopen thread that the waitingGen has
// changed, so it may wake up and realize it should
// not sleep for much or any longer before reopening:
reopenLock.lock();
// Need to find waitingGen inside lock as it's used to determine
// stale time
waitingGen = Math.max(waitingGen, targetGen);
try {
reopenCond.signal();
} finally {
reopenLock.unlock();
}
long startMS = System.nanoTime()/1000000;
while (targetGen > searchingGen) {
if (maxMS < 0) {
wait();
} else {
long msLeft = (startMS + maxMS) - System.nanoTime()/1000000;
if (msLeft <= 0) {
return false;
} else {
wait(msLeft);
}
}
}
}
return true;
}
@Override
public void run() {
// TODO: maybe use private thread ticktock timer, in
// case clock shift messes up nanoTime?
long lastReopenStartNS = System.nanoTime();
//System.out.println("reopen: start");
while (!finish) {
// TODO: try to guestimate how long reopen might
// take based on past data?
// Loop until we've waiting long enough before the
// next reopen:
while (!finish) {
// Need lock before finding out if has waiting
reopenLock.lock();
try {
// True if we have someone waiting for reopened searcher:
boolean hasWaiting = waitingGen > searchingGen;
final long nextReopenStartNS = lastReopenStartNS + (hasWaiting ? targetMinStaleNS : targetMaxStaleNS);
final long sleepNS = nextReopenStartNS - System.nanoTime();
if (sleepNS > 0) {
reopenCond.awaitNanos(sleepNS);
} else {
break;
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return;
} finally {
reopenLock.unlock();
}
}
if (finish) {
break;
}
lastReopenStartNS = System.nanoTime();
// Save the gen as of when we started the reopen; the
// listener (HandleRefresh above) copies this to
// searchingGen once the reopen completes:
refreshStartGen = writer.getMaxCompletedSequenceNumber();
try {
manager.maybeRefreshBlocking();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
}
Returns which generation
the current searcher is guaranteed to include. /** Returns which {@code generation} the current searcher is guaranteed to include. */
public long getSearchingGen() {
return searchingGen;
}
}