/*
 * Copyright 2017 Andrew Rucker Jones.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.opencsv.bean.concurrent;

import com.opencsv.ICSVParser;
import com.opencsv.bean.util.OrderedObject;
import com.opencsv.exceptions.CsvException;
import org.apache.commons.collections4.ListValuedMap;
import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.ObjectUtils;

import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;

This ThreadPoolExecutor automatically shuts down on any failed thread.

This is the historically established precedent for dealing with input errors in opencsv. This implementation expects all uncaught exceptions from its threads to be wrapped in a RuntimeException. The number of threads in the pool is fixed.

It is not intended for this executor to be instantiated and receive jobs directly. There are function-specific derived classes for that purpose.

This executor adds significant logic to the basic idea of an Executor, and thus must be used differently from other executors. Usage follows this pattern:

  1. prepare()
  2. Submit tasks. This is not intended to be done directly to this class, but rather to one of the submission methods of the derived classes.
  3. complete()
  4. The results are had by creating a Stream out of the executor itself. This is most easily done with StreamSupport.stream(Spliterator<Object>, boolean)
  5. Possibly getCapturedExceptions()

The execution structure of this class is:

  1. The main thread (outside of this executor) parses input and passes it on to
  2. This executor, which performs a number of conversions in parallel and passes these results and any resultant errors to
  3. The accumulator, which creates an ordered list of the results.

The threads in the executor queue their results in a thread-safe queue, which should be O(1), minimizing wait time due to synchronization. The accumulator then removes items from the queue and inserts them into a sorted data structure, which is O(log n) on average and O(n) in the worst case. If the user has told us she doesn't need sorted data, the accumulator is not necessary, and thus is not started.

Author:Andrew Rucker Jones
Type parameters:
  • <T> – The type of the object being created by the threads run
Since:4.0
/** * This ThreadPoolExecutor automatically shuts down on any failed thread. * <p>This is the historically established precedent for dealing with input errors * in opencsv. This implementation expects all uncaught exceptions from its * threads to be wrapped in a {@link java.lang.RuntimeException}. The number of * threads in the pool is fixed.</p> * <p>It is not intended for this executor to be instantiated and receive jobs * directly. There are function-specific derived classes for that purpose.</p> * <p>This executor adds significant logic to the basic idea of an * {@link java.util.concurrent.Executor}, and thus must be used differently * from other executors. Usage follows this pattern: * <ol><li>{@link #prepare()}</li> * <li>Submit tasks. This is not intended to be done directly to this class, but * rather to one of the submission methods of the derived classes.</li> * <li>{@link #complete()}</li> * <li>The results are had by creating a {@link java.util.stream.Stream} out of * the executor itself. This is most easily done with * {@link java.util.stream.StreamSupport#stream(Spliterator, boolean)}</li> * <li>Possibly {@link #getCapturedExceptions()}</li></ol></p> * <p>The execution structure of this class is: * <ol><li>The main thread (outside of this executor) parses input and passes * it on to</li> * <li>This executor, which performs a number of conversions in parallel and * passes these results and any resultant errors to</li> * <li>The accumulator, which creates an ordered list of the results.</li></ol></p> * <p>The threads in the executor queue their results in a thread-safe * queue, which should be O(1), minimizing wait time due to synchronization. * The accumulator then removes items from the queue and inserts them into a * sorted data structure, which is O(log n) on average and O(n) in the worst * case. If the user has told us she doesn't need sorted data, the * accumulator is not necessary, and thus is not started.</p> * * @param <T> The type of the object being created by the threads run * @author Andrew Rucker Jones * @since 4.0 */
class IntolerantThreadPoolExecutor<T> extends ThreadPoolExecutor implements Spliterator<T> {
A queue of the beans created.
/** A queue of the beans created. */
protected final BlockingQueue<OrderedObject<T>> resultQueue = new LinkedBlockingQueue<>();
A queue of exceptions thrown by threads during processing.
/** A queue of exceptions thrown by threads during processing. */
protected final BlockingQueue<OrderedObject<CsvException>> thrownExceptionsQueue = new LinkedBlockingQueue<>();
A sorted, concurrent map for the beans created.
/** A sorted, concurrent map for the beans created. */
private ConcurrentNavigableMap<Long, T> resultantBeansMap = null;
A multi-valued map for any exceptions captured.

The multi-valued part is important because the same line can throw more than one exception.

All access to this variable must be synchronized.

/** * A multi-valued map for any exceptions captured. * <p>The multi-valued part is important because the same line can throw more * than one exception.</p> * <p><em>All access to this variable must be synchronized.</em></p> * */
private ListValuedMap<Long, CsvException> thrownExceptionsMap = null;
A separate thread that accumulates and orders results.
/** A separate thread that accumulates and orders results. */
protected AccumulateCsvResults<T> accumulateThread = null;
A list of the ordinals of data records still to be expected by the accumulator.
/** A list of the ordinals of data records still to be expected by the accumulator. */
protected final SortedSet<Long> expectedRecords = new ConcurrentSkipListSet<>();
Determines whether resulting data sets have to be in the same order as the input.
/** * Determines whether resulting data sets have to be in the same order as * the input. */
private final boolean orderedResults;
The locale for error messages.
/** The locale for error messages. */
protected final Locale errorLocale;
The exception that caused this Executor to stop executing.
/** The exception that caused this Executor to stop executing. */
private Throwable terminalException;
Constructor for a thread pool executor that stops by itself as soon as any thread throws an exception. Threads never time out and the queue for inbound work is unbounded.
Params:
  • orderedResults – Whether order should be preserved in the results
  • errorLocale – The errorLocale to use for error messages.
/** * Constructor for a thread pool executor that stops by itself as soon as * any thread throws an exception. * Threads never time out and the queue for inbound work is unbounded. * @param orderedResults Whether order should be preserved in the results * @param errorLocale The errorLocale to use for error messages. */
IntolerantThreadPoolExecutor(boolean orderedResults, Locale errorLocale) { super(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), Long.MAX_VALUE, TimeUnit.NANOSECONDS, new LinkedBlockingQueue<>()); this.orderedResults = orderedResults; this.errorLocale = ObjectUtils.defaultIfNull(errorLocale, Locale.getDefault()); }
Prepares this Executor to receive jobs.
/** * Prepares this Executor to receive jobs. */
public void prepare() { prestartAllCoreThreads(); // The ordered maps and accumulator are only necessary if ordering is // stipulated. After this, the presence or absence of the accumulator is // used to indicate ordering or not so as to guard against the unlikely // problem that someone sets orderedResults right in the middle of // processing. if(orderedResults) { resultantBeansMap = new ConcurrentSkipListMap<>(); thrownExceptionsMap = new ArrayListValuedHashMap<>(); // Start the process for accumulating results and cleaning up accumulateThread = new AccumulateCsvResults<>( resultQueue, thrownExceptionsQueue, expectedRecords, resultantBeansMap, thrownExceptionsMap); accumulateThread.start(); } }
Sends a signal to the Executor that it should shut down once all threads have completed.
Throws:
  • InterruptedException – If the current thread is interrupted while waiting. Shouldn't be thrown, since the Executor waits indefinitely for all threads to end.
  • RejectedExecutionException – If an exception during processing forced this Executor to shut down.
/** * Sends a signal to the Executor that it should shut down once all threads * have completed. * * @throws InterruptedException If the current thread is interrupted while * waiting. Shouldn't be thrown, since the Executor * waits indefinitely for all threads to end. * @throws RejectedExecutionException If an exception during processing * forced this Executor to shut down. */
public void complete() throws InterruptedException { // Normal termination shutdown(); awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); // Wait indefinitely if(accumulateThread != null) { accumulateThread.setMustStop(true); accumulateThread.join(); } // There's one more possibility: The very last bean caused a problem. if(terminalException != null) { // Trigger a catch in the calling method throw new RejectedExecutionException(); } }
Returns exceptions captured during the conversion process if the conversion process was set not to propagate these errors up the call stack. The call is nondestructive.
Returns:All exceptions captured
/** * Returns exceptions captured during the conversion process if * the conversion process was set not to propagate these errors * up the call stack. * The call is nondestructive. * * @return All exceptions captured */
public List<CsvException> getCapturedExceptions() { List<CsvException> returnList = null; if(thrownExceptionsMap == null) { returnList = thrownExceptionsQueue.stream() .filter(Objects::nonNull) .map(OrderedObject::getElement) .collect(Collectors.toList()); } else { returnList = new LinkedList<>(); synchronized (thrownExceptionsMap) { final List<CsvException> finalReturnList = returnList; thrownExceptionsMap.keySet().stream() .sorted() .forEach(l -> finalReturnList.addAll(thrownExceptionsMap.get(l))); } } return returnList; } @Override public List<Runnable> shutdownNow() { if(accumulateThread != null) { accumulateThread.setMustStop(true); try { accumulateThread.join(); } catch (InterruptedException e) { // Do nothing. Best faith effort. } } return super.shutdownNow(); }
Shuts the Executor down if the thread ended in an exception.
Params:
  • r – {@inheritDoc}
  • t – {@inheritDoc}
/** * Shuts the Executor down if the thread ended in an exception. * @param r {@inheritDoc} * @param t {@inheritDoc} */
@Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if(t != null) { if(t.getCause() != null) { // Normally, everything that gets to this point should be // wrapped in a RuntimeException to get past the lack of checked // exceptions in Runnable.run(). terminalException = t.getCause(); } else { terminalException = t; } shutdownNow(); } }
If an unrecoverable exception was thrown during processing, it can be retrieved here.
Returns:The exception that halted one of the threads, which caused the executor to shut itself down
/** * If an unrecoverable exception was thrown during processing, it can be * retrieved here. * @return The exception that halted one of the threads, which caused the * executor to shut itself down */
public Throwable getTerminalException() { return terminalException; }
Checks whether exceptions are available that should halt processing. This is the case with unrecoverable errors, such as parsing the input, or if exceptions in conversion should be thrown by request of the user.
/** * Checks whether exceptions are available that should halt processing. * This is the case with unrecoverable errors, such as parsing the input, * or if exceptions in conversion should be thrown by request of the user. */
protected void checkExceptions() { if(terminalException != null) { if(terminalException instanceof CsvException) { CsvException csve = (CsvException) terminalException; throw new RuntimeException(String.format(ResourceBundle.getBundle(ICSVParser.DEFAULT_BUNDLE_NAME, errorLocale).getString("parsing.error.linenumber"), csve.getLineNumber(), String.join(",", ArrayUtils.nullToEmpty(csve.getLine()))), csve); } throw new RuntimeException(terminalException); } } private boolean isConversionComplete() { return isTerminated() && (accumulateThread == null || !accumulateThread.isAlive()); }
Determines whether more conversion results can be expected. Since Spliterators have no way of indicating that they don't have a result at the moment, but might in the future, we must ensure that every call to tryAdvance(Consumer) or trySplit() only returns null if the entire conversion apparatus has shut down and all result queues are cleared. Thus, this method waits until either that is true, or there is truly at least one result that can be returned to users of the Spliterator interface.
Returns:false if conversion is complete and no more results can ever be expected out of this Spliterator, true otherwise. If true is returned, it is guaranteed that at least one result is available immediately to the caller.
/** * Determines whether more conversion results can be expected. * Since {@link Spliterator}s have no way of indicating that they don't * have a result at the moment, but might in the future, we must ensure * that every call to {@link #tryAdvance(Consumer)} or {@link #trySplit()} * only returns {@code null} if the entire conversion apparatus has shut * down and all result queues are cleared. Thus, this method waits until * either that is true, or there is truly at least one result that can be * returned to users of the {@link Spliterator} interface. * * @return {@code false} if conversion is complete and no more results * can ever be expected out of this {@link Spliterator}, {@code true} * otherwise. If {@code true} is returned, it is guaranteed that at * least one result is available immediately to the caller. */
private boolean areMoreResultsAvailable() { // If an exception has been thrown that needs to be passed on, // throw it here. checkExceptions(); // Check conditions for completion boolean elementFound = false; while(!elementFound && !isConversionComplete()) { if(accumulateThread == null) { if(resultQueue.isEmpty()) { Thread.yield(); } else { elementFound = true; } } else { if(resultantBeansMap.isEmpty()) { Thread.yield(); } else { elementFound = true; } } // If an exception has been thrown that needs to be passed on, // throw it here. checkExceptions(); } return accumulateThread == null ? !resultQueue.isEmpty() : !resultantBeansMap.isEmpty(); } @Override public boolean tryAdvance(Consumer<? super T> action) { T bean = null; if (areMoreResultsAvailable()) { // Since we are now guaranteed to have a result, we don't // really have to do all of the null checking below, but // better safe than sorry. if(accumulateThread == null) { OrderedObject<T> orderedObject = resultQueue.poll(); if(orderedObject != null) { bean = orderedObject.getElement(); } } else { Map.Entry<Long, T> mapEntry = resultantBeansMap.pollFirstEntry(); if(mapEntry != null) { bean = mapEntry.getValue(); } } if(bean != null) { action.accept(bean); } } return bean != null; } // WARNING! This code is untested because I have no way of telling the JDK // streaming code how to do its job. @Override public Spliterator<T> trySplit() { Spliterator<T> s = null; // Check if all threads are through if(areMoreResultsAvailable()) { if(isConversionComplete()) { // Return everything we have if(accumulateThread == null) { s = resultQueue.stream().map(OrderedObject::getElement).spliterator(); } else { s = resultantBeansMap.values().spliterator(); } } else { int size; ArrayList<T> c; if(accumulateThread == null) { // May seem like an odd implementation, but we can't use // resultQueue.drainTo() because bulk operations are not // thread-safe. So, we have to poll each object individually. // We don't want to use a LinkedList for the Spliterator // because another split would presumably be inefficient. With // an ArrayList, on the other hand, we have to make sure we // avoid a costly resize operation. size = resultQueue.size(); c = new ArrayList<>(size); for(int i = 0; i < size; i++) { // Result guaranteed to exist through areMoreResultsAvailable() OrderedObject<T> orderedObject = resultQueue.poll(); if(orderedObject != null) { c.add(orderedObject.getElement()); } } } else { size = resultantBeansMap.size(); c = new ArrayList<>(size); for(int i = 0; i < size; i++) { Map.Entry<Long, T> mapEntry = resultantBeansMap.pollFirstEntry(); if(mapEntry != null) { c.add(mapEntry.getValue()); } } } s = c.spliterator(); } } return s; } // WARNING! This code is untested because I have no way of telling the JDK // streaming code how to do its job. @Override public long estimateSize() { return accumulateThread == null ? resultQueue.size() : resultantBeansMap.size(); } @Override public int characteristics() { int characteristics = Spliterator.CONCURRENT | Spliterator.NONNULL; if(accumulateThread != null) { characteristics |= Spliterator.ORDERED; } return characteristics; } }