package org.eclipse.collections.impl.parallel;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Collection;
import java.util.List;
import java.util.RandomAccess;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.collections.api.block.function.Function;
import org.eclipse.collections.api.block.function.Function0;
import org.eclipse.collections.api.block.function.Function2;
import org.eclipse.collections.api.block.function.primitive.DoubleFunction;
import org.eclipse.collections.api.block.function.primitive.FloatFunction;
import org.eclipse.collections.api.block.function.primitive.IntFunction;
import org.eclipse.collections.api.block.function.primitive.LongFunction;
import org.eclipse.collections.api.block.predicate.Predicate;
import org.eclipse.collections.api.block.procedure.Procedure;
import org.eclipse.collections.api.block.procedure.Procedure2;
import org.eclipse.collections.api.block.procedure.primitive.ObjectIntProcedure;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.ListIterable;
import org.eclipse.collections.api.map.MutableMap;
import org.eclipse.collections.api.map.MutableMapIterable;
import org.eclipse.collections.api.map.primitive.ObjectDoubleMap;
import org.eclipse.collections.api.map.primitive.ObjectLongMap;
import org.eclipse.collections.api.multimap.MutableMultimap;
import org.eclipse.collections.api.tuple.primitive.DoubleDoublePair;
import org.eclipse.collections.impl.block.factory.Functions0;
import org.eclipse.collections.impl.block.procedure.MultimapPutProcedure;
import org.eclipse.collections.impl.block.procedure.MutatingAggregationProcedure;
import org.eclipse.collections.impl.block.procedure.NonMutatingAggregationProcedure;
import org.eclipse.collections.impl.list.fixed.ArrayAdapter;
import org.eclipse.collections.impl.map.mutable.ConcurrentHashMap;
import org.eclipse.collections.impl.map.mutable.UnifiedMap;
import org.eclipse.collections.impl.map.mutable.primitive.ObjectDoubleHashMap;
import org.eclipse.collections.impl.map.mutable.primitive.ObjectLongHashMap;
import org.eclipse.collections.impl.multimap.list.SynchronizedPutFastListMultimap;
import org.eclipse.collections.impl.tuple.primitive.PrimitiveTuples;
import org.eclipse.collections.impl.utility.Iterate;
import static org.eclipse.collections.impl.factory.Iterables.iList;
public final class ParallelIterate
{
static final int DEFAULT_MIN_FORK_SIZE = 10000;
static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
static final int TASK_RATIO = 2;
static final int DEFAULT_PARALLEL_TASK_COUNT = ParallelIterate.getDefaultTaskCount();
static final ExecutorService EXECUTOR_SERVICE = ParallelIterate.newPooledExecutor(ParallelIterate.class.getSimpleName(), true);
private ParallelIterate()
{
throw new AssertionError("Suppress default constructor for noninstantiability");
}
static boolean isExecutorShutdown()
{
return ParallelIterate.EXECUTOR_SERVICE.isShutdown();
}
static void shutdownExecutor()
{
ParallelIterate.EXECUTOR_SERVICE.shutdown();
}
public static <T> void forEachWithIndex(
Iterable<T> iterable,
ObjectIntProcedure<? super T> objectIntProcedure)
{
ParallelIterate.forEachWithIndex(iterable, objectIntProcedure, ParallelIterate.EXECUTOR_SERVICE);
}
public static <T, BT extends ObjectIntProcedure<? super T>> void forEachWithIndex(
Iterable<T> iterable,
BT procedure,
Executor executor)
{
ParallelIterate.forEachWithIndex(
iterable,
new PassThruObjectIntProcedureFactory<>(procedure),
new PassThruCombiner<>(), executor);
}
public static <T, BT extends ObjectIntProcedure<? super T>> void forEachWithIndex(
Iterable<T> iterable,
BT procedure,
int minForkSize,
int taskCount)
{
ParallelIterate.forEachWithIndex(
iterable,
new PassThruObjectIntProcedureFactory<>(procedure),
new PassThruCombiner<>(),
minForkSize,
taskCount);
}
public static <T, BT extends ObjectIntProcedure<? super T>> void forEachWithIndex(
Iterable<T> iterable,
ObjectIntProcedureFactory<BT> procedureFactory,
Combiner<BT> combiner,
Executor executor)
{
int taskCount = Math.max(
ParallelIterate.DEFAULT_PARALLEL_TASK_COUNT,
Iterate.sizeOf(iterable) / ParallelIterate.DEFAULT_MIN_FORK_SIZE);
ParallelIterate.forEachWithIndex(
iterable,
procedureFactory,
combiner,
ParallelIterate.DEFAULT_MIN_FORK_SIZE,
taskCount,
executor);
}
public static <T, BT extends ObjectIntProcedure<? super T>> void forEachWithIndex(
Iterable<T> iterable,
ObjectIntProcedureFactory<BT> procedureFactory,
Combiner<BT> combiner,
int minForkSize,
int taskCount)
{
ParallelIterate.forEachWithIndex(iterable, procedureFactory, combiner, minForkSize, taskCount, ParallelIterate.EXECUTOR_SERVICE);
}
public static <T, BT extends ObjectIntProcedure<? super T>> void forEachWithIndex(
Iterable<T> iterable,
ObjectIntProcedureFactory<BT> procedureFactory,
Combiner<BT> combiner,
int minForkSize,
int taskCount,
Executor executor)
{
if (Iterate.notEmpty(iterable))
{
if (iterable instanceof RandomAccess || iterable instanceof ListIterable
&& iterable instanceof List)
{
ParallelIterate.forEachWithIndexInListOnExecutor(
(List<T>) iterable,
procedureFactory,
combiner,
minForkSize,
taskCount,
executor);
}
else
{
ParallelIterate.forEachWithIndexInListOnExecutor(
ArrayAdapter.adapt((T[]) Iterate.toArray(iterable)),
procedureFactory,
combiner,
minForkSize,
taskCount,
executor);
}
}
}
public static <T, BT extends ObjectIntProcedure<? super T>> void forEachWithIndexInListOnExecutor(
List<T> list,
ObjectIntProcedureFactory<BT> procedureFactory,
Combiner<BT> combiner,
int minForkSize,
int taskCount,
Executor executor)
{
int size = list.size();
if (size < minForkSize)
{
BT procedure = procedureFactory.create();
Iterate.forEachWithIndex(list, procedure);
if (combiner.useCombineOne())
{
combiner.combineOne(procedure);
}
else
{
combiner.combineAll(iList(procedure));
}
}
else
{
int threadCount = Math.min(size, taskCount);
ObjectIntProcedureFJTaskRunner<T, BT> runner =
new ObjectIntProcedureFJTaskRunner<>(combiner, threadCount);
runner.executeAndCombine(executor, procedureFactory, list);
}
}
public static <T> void forEach(Iterable<T> iterable, Procedure<? super T> procedure)
{
ParallelIterate.forEach(iterable, procedure, ParallelIterate.EXECUTOR_SERVICE);
}
public static <T> void forEach(Iterable<T> iterable, Procedure<? super T> procedure, int batchSize)
{
ParallelIterate.forEach(iterable, procedure, batchSize, ParallelIterate.EXECUTOR_SERVICE);
}
public static <T> void forEach(Iterable<T> iterable, Procedure<? super T> procedure, int batchSize, Executor executor)
{
ParallelIterate.forEach(iterable, procedure, batchSize, ParallelIterate.calculateTaskCount(iterable, batchSize), executor);
}
public static <T, BT extends Procedure<? super T>> void forEach(
Iterable<T> iterable,
BT procedure,
Executor executor)
{
ParallelIterate.forEach(
iterable,
new PassThruProcedureFactory<>(procedure),
new PassThruCombiner<>(),
executor);
}
public static <T, BT extends Procedure<? super T>> void forEach(
Iterable<T> iterable,
BT procedure,
int minForkSize,
int taskCount)
{
ParallelIterate.forEach(iterable, procedure, minForkSize, taskCount, ParallelIterate.EXECUTOR_SERVICE);
}
public static <T, BT extends Procedure<? super T>> void forEach(
Iterable<T> iterable,
BT procedure,
int minForkSize,
int taskCount,
Executor executor)
{
ParallelIterate.forEach(
iterable,
new PassThruProcedureFactory<>(procedure),
new PassThruCombiner<>(),
minForkSize,
taskCount,
executor);
}
public static <T, BT extends Procedure<? super T>> void forEach(
Iterable<T> iterable,
ProcedureFactory<BT> procedureFactory,
Combiner<BT> combiner,
Executor executor)
{
ParallelIterate.forEach(iterable, procedureFactory, combiner, ParallelIterate.DEFAULT_MIN_FORK_SIZE, executor);
}
public static <T, BT extends Procedure<? super T>> void forEach(
Iterable<T> iterable,
ProcedureFactory<BT> procedureFactory,
Combiner<BT> combiner)
{
ParallelIterate.forEach(iterable, procedureFactory, combiner, ParallelIterate.EXECUTOR_SERVICE);
}
@SuppressWarnings("JavaDoc")
public static <T, BT extends Procedure<? super T>> void forEach(
Iterable<T> iterable,
ProcedureFactory<BT> procedureFactory,
Combiner<BT> combiner,
int batchSize)
{
ParallelIterate.forEach(iterable, procedureFactory, combiner, batchSize, ParallelIterate.EXECUTOR_SERVICE);
}
public static <T, BT extends Procedure<? super T>> void forEach(
Iterable<T> iterable,
ProcedureFactory<BT> procedureFactory,
Combiner<BT> combiner,
int batchSize,
Executor executor)
{
ParallelIterate.forEach(iterable, procedureFactory, combiner, batchSize, ParallelIterate.calculateTaskCount(iterable, batchSize), executor);
}
@SuppressWarnings("JavaDoc")
public static <T, BT extends Procedure<? super T>> void forEach(
Iterable<T> iterable,
ProcedureFactory<BT> procedureFactory,
Combiner<BT> combiner,
int minForkSize,
int taskCount)
{
ParallelIterate.forEach(iterable, procedureFactory, combiner, minForkSize, taskCount, ParallelIterate.EXECUTOR_SERVICE);
}
public static <T, BT extends Procedure<? super T>> void forEach(
Iterable<T> iterable,
ProcedureFactory<BT> procedureFactory,
Combiner<BT> combiner,
int minForkSize,
int taskCount,
Executor executor)
{
if (Iterate.notEmpty(iterable))
{
if (iterable instanceof BatchIterable)
{
ParallelIterate.forEachInBatchWithExecutor(
(BatchIterable<T>) iterable,
procedureFactory,
combiner,
minForkSize,
taskCount,
executor);
}
else if ((iterable instanceof RandomAccess || iterable instanceof ListIterable)
&& iterable instanceof List)
{
ParallelIterate.forEachInListOnExecutor(
(List<T>) iterable,
procedureFactory,
combiner,
minForkSize,
taskCount,
executor);
}
else
{
ParallelArrayIterate.forEachOn(
(T[]) Iterate.toArray(iterable),
procedureFactory,
combiner,
minForkSize,
taskCount,
executor);
}
}
}
public static <T, BT extends Procedure<? super T>> void forEachInListOnExecutor(
List<T> list,
ProcedureFactory<BT> procedureFactory,
Combiner<BT> combiner,
int minForkSize,
int taskCount,
Executor executor)
{
int size = list.size();
if (size < minForkSize)
{
BT procedure = procedureFactory.create();
Iterate.forEach(list, procedure);
if (combiner.useCombineOne())
{
combiner.combineOne(procedure);
}
else
{
combiner.combineAll(iList(procedure));
}
}
else
{
int threadCount = Math.min(size, taskCount);
ProcedureFJTaskRunner<T, BT> runner =
new ProcedureFJTaskRunner<>(combiner, threadCount);
runner.executeAndCombine(executor, procedureFactory, list);
}
}
public static <T, BT extends Procedure<? super T>> void forEachInBatchWithExecutor(
BatchIterable<T> set,
ProcedureFactory<BT> procedureFactory,
Combiner<BT> combiner,
int minForkSize,
int taskCount,
Executor executor)
{
int size = set.size();
if (size < minForkSize)
{
BT procedure = procedureFactory.create();
set.forEach(procedure);
if (combiner.useCombineOne())
{
combiner.combineOne(procedure);
}
else
{
combiner.combineAll(iList(procedure));
}
}
else
{
int threadCount = Math.min(size, Math.min(taskCount, set.getBatchCount((int) Math.ceil((double) size / (double) taskCount))));
BatchIterableProcedureFJTaskRunner<T, BT> runner =
new BatchIterableProcedureFJTaskRunner<>(combiner, threadCount);
runner.executeAndCombine(executor, procedureFactory, set);
}
}
public static <T> Collection<T> select(
Iterable<T> iterable,
Predicate<? super T> predicate)
{
return ParallelIterate.select(iterable, predicate, false);
}
public static <T> Collection<T> select(
Iterable<T> iterable,
Predicate<? super T> predicate,
boolean allowReorderedResult)
{
return ParallelIterate.select(iterable, predicate, null, allowReorderedResult);
}
public static <T, R extends Collection<T>> R select(
Iterable<T> iterable,
Predicate<? super T> predicate,
R target,
boolean allowReorderedResult)
{
return ParallelIterate.select(
iterable,
predicate,
target,
ParallelIterate.DEFAULT_MIN_FORK_SIZE,
ParallelIterate.EXECUTOR_SERVICE,
allowReorderedResult);
}
public static <T, R extends Collection<T>> R select(
Iterable<T> iterable,
Predicate<? super T> predicate,
R target,
int batchSize,
Executor executor,
boolean allowReorderedResult)
{
FastListSelectProcedureCombiner<T> combiner = new FastListSelectProcedureCombiner<>(iterable, target, 10, allowReorderedResult);
FastListSelectProcedureFactory<T> procedureFactory = new FastListSelectProcedureFactory<>(predicate, batchSize);
ParallelIterate.forEach(
iterable,
procedureFactory,
combiner,
batchSize,
ParallelIterate.calculateTaskCount(iterable, batchSize),
executor);
return (R) combiner.getResult();
}
private static <T> int calculateTaskCount(Iterable<T> iterable, int batchSize)
{
if (iterable instanceof BatchIterable<?>)
{
return ParallelIterate.calculateTaskCount((BatchIterable<?>) iterable, batchSize);
}
return ParallelIterate.calculateTaskCount(Iterate.sizeOf(iterable), batchSize);
}
private static <T> int calculateTaskCount(BatchIterable<T> batchIterable, int batchSize)
{
return Math.max(2, batchIterable.getBatchCount(batchSize));
}
private static int calculateTaskCount(int size, int batchSize)
{
return Math.max(2, size / batchSize);
}
public static <T> Collection<T> reject(
Iterable<T> iterable,
Predicate<? super T> predicate)
{
return ParallelIterate.reject(iterable, predicate, false);
}
public static <T> Collection<T> reject(
Iterable<T> iterable,
Predicate<? super T> predicate,
boolean allowReorderedResult)
{
return ParallelIterate.reject(iterable, predicate, null, allowReorderedResult);
}
public static <T, R extends Collection<T>> R reject(
Iterable<T> iterable,
Predicate<? super T> predicate,
R target,
boolean allowReorderedResult)
{
return ParallelIterate.reject(
iterable,
predicate,
target,
ParallelIterate.DEFAULT_MIN_FORK_SIZE,
ParallelIterate.EXECUTOR_SERVICE,
allowReorderedResult);
}
public static <T, R extends Collection<T>> R reject(
Iterable<T> iterable,
Predicate<? super T> predicate,
R target,
int batchSize,
Executor executor,
boolean allowReorderedResult)
{
FastListRejectProcedureCombiner<T> combiner = new FastListRejectProcedureCombiner<>(iterable, target, 10, allowReorderedResult);
FastListRejectProcedureFactory<T> procedureFactory = new FastListRejectProcedureFactory<>(predicate, batchSize);
ParallelIterate.forEach(
iterable,
procedureFactory,
combiner,
batchSize,
ParallelIterate.calculateTaskCount(iterable, batchSize),
executor);
return (R) combiner.getResult();
}
public static <T> int count(Iterable<T> iterable, Predicate<? super T> predicate)
{
return ParallelIterate.count(iterable, predicate, ParallelIterate.DEFAULT_MIN_FORK_SIZE, ParallelIterate.EXECUTOR_SERVICE);
}
public static <T> int count(Iterable<T> iterable, Predicate<? super T> predicate, int batchSize, Executor executor)
{
CountCombiner<T> combiner = new CountCombiner<>();
CountProcedureFactory<T> procedureFactory = new CountProcedureFactory<>(predicate);
ParallelIterate.forEach(
iterable,
procedureFactory,
combiner,
batchSize,
executor);
return combiner.getCount();
}
public static <T, V> Collection<V> collect(
Iterable<T> iterable,
Function<? super T, V> function)
{
return ParallelIterate.collect(iterable, function, false);
}
public static <T, V> Collection<V> collect(
Iterable<T> iterable,
Function<? super T, V> function,
boolean allowReorderedResult)
{
return ParallelIterate.collect(iterable, function, null, allowReorderedResult);
}
public static <T, V, R extends Collection<V>> R collect(
Iterable<T> iterable,
Function<? super T, V> function,
R target,
boolean allowReorderedResult)
{
return ParallelIterate.collect(
iterable,
function,
target,
ParallelIterate.DEFAULT_MIN_FORK_SIZE,
ParallelIterate.EXECUTOR_SERVICE,
allowReorderedResult);
}
public static <T, V, R extends Collection<V>> R collect(
Iterable<T> iterable,
Function<? super T, V> function,
R target,
int batchSize,
Executor executor,
boolean allowReorderedResult)
{
int size = Iterate.sizeOf(iterable);
FastListCollectProcedureCombiner<T, V> combiner = new FastListCollectProcedureCombiner<>(iterable, target, size, allowReorderedResult);
int taskCount = ParallelIterate.calculateTaskCount(iterable, batchSize);
FastListCollectProcedureFactory<T, V> procedureFactory = new FastListCollectProcedureFactory<>(function, size / taskCount);
ParallelIterate.forEach(
iterable,
procedureFactory,
combiner,
batchSize,
taskCount,
executor);
return (R) combiner.getResult();
}
public static <T, V> Collection<V> flatCollect(
Iterable<T> iterable,
Function<? super T, Collection<V>> function)
{
return ParallelIterate.flatCollect(iterable, function, false);
}
public static <T, V> Collection<V> flatCollect(
Iterable<T> iterable,
Function<? super T, Collection<V>> function,
boolean allowReorderedResult)
{
return ParallelIterate.flatCollect(iterable, function, null, allowReorderedResult);
}
public static <T, V, R extends Collection<V>> R flatCollect(
Iterable<T> iterable,
Function<? super T, Collection<V>> function,
R target,
boolean allowReorderedResult)
{
return ParallelIterate.flatCollect(
iterable,
function,
target,
ParallelIterate.DEFAULT_MIN_FORK_SIZE,
ParallelIterate.EXECUTOR_SERVICE,
allowReorderedResult);
}
public static <T, V, R extends Collection<V>> R flatCollect(
Iterable<T> iterable,
Function<? super T, Collection<V>> function,
R target,
int batchSize,
Executor executor,
boolean allowReorderedResult)
{
int size = Iterate.sizeOf(iterable);
int taskCount = ParallelIterate.calculateTaskCount(iterable, batchSize);
int taskSize = size / taskCount;
FlatCollectProcedureCombiner<T, V> combiner =
new FlatCollectProcedureCombiner<>(iterable, target, size, allowReorderedResult);
FlatCollectProcedureFactory<T, V> procedureFactory = new FlatCollectProcedureFactory<>(function, taskSize);
ParallelIterate.forEach(
iterable,
procedureFactory,
combiner,
batchSize,
taskCount,
executor);
return (R) combiner.getResult();
}
public static <T, V> Collection<V> collectIf(
Iterable<T> iterable,
Predicate<? super T> predicate,
Function<? super T, V> function)
{
return ParallelIterate.collectIf(iterable, predicate, function, false);
}
public static <T, V> Collection<V> collectIf(
Iterable<T> iterable,
Predicate<? super T> predicate,
Function<? super T, V> function,
boolean allowReorderedResult)
{
return ParallelIterate.collectIf(iterable, predicate, function, null, allowReorderedResult);
}
public static <T, V, R extends Collection<V>> R collectIf(
Iterable<T> iterable,
Predicate<? super T> predicate,
Function<? super T, V> function,
R target,
boolean allowReorderedResult)
{
return ParallelIterate.collectIf(
iterable,
predicate,
function,
target,
ParallelIterate.DEFAULT_MIN_FORK_SIZE,
ParallelIterate.EXECUTOR_SERVICE,
allowReorderedResult);
}
public static <T, V, R extends Collection<V>> R collectIf(
Iterable<T> iterable,
Predicate<? super T> predicate,
Function<? super T, V> function,
R target,
int batchSize,
Executor executor,
boolean allowReorderedResult)
{
FastListCollectIfProcedureCombiner<T, V> combiner = new FastListCollectIfProcedureCombiner<>(iterable, target, 10, allowReorderedResult);
FastListCollectIfProcedureFactory<T, V> procedureFactory = new FastListCollectIfProcedureFactory<>(function, predicate, batchSize);
ParallelIterate.forEach(
iterable,
procedureFactory,
combiner,
batchSize,
ParallelIterate.calculateTaskCount(iterable, batchSize),
executor);
return (R) combiner.getResult();
}
public static <K, V> MutableMultimap<K, V> groupBy(
Iterable<V> iterable,
Function<? super V, ? extends K> function)
{
return ParallelIterate.groupBy(iterable, function, ParallelIterate.DEFAULT_MIN_FORK_SIZE, ParallelIterate.EXECUTOR_SERVICE);
}
public static <T, K, V> MutableMap<K, V> aggregateBy(
Iterable<T> iterable,
Function<? super T, ? extends K> groupBy,
Function0<? extends V> zeroValueFactory,
Function2<? super V, ? super T, ? extends V> nonMutatingAggregator)
{
return ParallelIterate.aggregateBy(
iterable,
groupBy,
zeroValueFactory,
nonMutatingAggregator,
ParallelIterate.DEFAULT_MIN_FORK_SIZE);
}
public static <T, K, V, R extends MutableMapIterable<K, V>> R aggregateBy(
Iterable<T> iterable,
Function<? super T, ? extends K> groupBy,
Function0<? extends V> zeroValueFactory,
Function2<? super V, ? super T, ? extends V> nonMutatingAggregator,
R mutableMap)
{
return ParallelIterate.aggregateBy(
iterable,
groupBy,
zeroValueFactory,
nonMutatingAggregator,
mutableMap,
ParallelIterate.DEFAULT_MIN_FORK_SIZE);
}
public static <T, K, V> MutableMap<K, V> aggregateBy(
Iterable<T> iterable,
Function<? super T, ? extends K> groupBy,
Function0<? extends V> zeroValueFactory,
Function2<? super V, ? super T, ? extends V> nonMutatingAggregator,
int batchSize)
{
return ParallelIterate.aggregateBy(
iterable,
groupBy,
zeroValueFactory,
nonMutatingAggregator,
batchSize,
ParallelIterate.EXECUTOR_SERVICE);
}
public static <T, K, V, R extends MutableMapIterable<K, V>> R aggregateBy(
Iterable<T> iterable,
Function<? super T, ? extends K> groupBy,
Function0<? extends V> zeroValueFactory,
Function2<? super V, ? super T, ? extends V> nonMutatingAggregator,
R mutableMap,
int batchSize)
{
return ParallelIterate.aggregateBy(
iterable,
groupBy,
zeroValueFactory,
nonMutatingAggregator,
mutableMap,
batchSize,
ParallelIterate.EXECUTOR_SERVICE);
}
public static <T, K, V> MutableMap<K, V> aggregateBy(
Iterable<T> iterable,
Function<? super T, ? extends K> groupBy,
Function0<? extends V> zeroValueFactory,
Function2<? super V, ? super T, ? extends V> nonMutatingAggregator,
int batchSize,
Executor executor)
{
return ParallelIterate.aggregateBy(
iterable,
groupBy,
zeroValueFactory,
nonMutatingAggregator,
ConcurrentHashMap.newMap(),
batchSize,
executor);
}
public static <T, K, V, R extends MutableMapIterable<K, V>> R aggregateBy(
Iterable<T> iterable,
Function<? super T, ? extends K> groupBy,
Function0<? extends V> zeroValueFactory,
Function2<? super V, ? super T, ? extends V> nonMutatingAggregator,
R mutableMap,
int batchSize,
Executor executor)
{
NonMutatingAggregationProcedure<T, K, V> nonMutatingAggregationProcedure =
new NonMutatingAggregationProcedure<>(mutableMap, groupBy, zeroValueFactory, nonMutatingAggregator);
ParallelIterate.forEach(
iterable,
new PassThruProcedureFactory<>(nonMutatingAggregationProcedure),
Combiners.<Procedure<T>>passThru(),
batchSize,
executor);
return mutableMap;
}
public static <T, K, V> MutableMap<K, V> aggregateInPlaceBy(
Iterable<T> iterable,
Function<? super T, ? extends K> groupBy,
Function0<? extends V> zeroValueFactory,
Procedure2<? super V, ? super T> mutatingAggregator)
{
return ParallelIterate.aggregateInPlaceBy(
iterable,
groupBy,
zeroValueFactory,
mutatingAggregator,
ParallelIterate.DEFAULT_MIN_FORK_SIZE);
}
public static <T, K, V, R extends MutableMapIterable<K, V>> R aggregateInPlaceBy(
Iterable<T> iterable,
Function<? super T, ? extends K> groupBy,
Function0<? extends V> zeroValueFactory,
Procedure2<? super V, ? super T> mutatingAggregator,
R mutableMap)
{
return ParallelIterate.aggregateInPlaceBy(
iterable,
groupBy,
zeroValueFactory,
mutatingAggregator,
mutableMap,
ParallelIterate.DEFAULT_MIN_FORK_SIZE);
}
public static <T, K, V> MutableMap<K, V> aggregateInPlaceBy(
Iterable<T> iterable,
Function<? super T, ? extends K> groupBy,
Function0<? extends V> zeroValueFactory,
Procedure2<? super V, ? super T> mutatingAggregator,
int batchSize)
{
return ParallelIterate.aggregateInPlaceBy(
iterable,
groupBy,
zeroValueFactory,
mutatingAggregator,
batchSize,
ParallelIterate.EXECUTOR_SERVICE);
}
public static <T, K, V, R extends MutableMapIterable<K, V>> R aggregateInPlaceBy(
Iterable<T> iterable,
Function<? super T, ? extends K> groupBy,
Function0<? extends V> zeroValueFactory,
Procedure2<? super V, ? super T> mutatingAggregator,
R mutableMap,
int batchSize)
{
return ParallelIterate.aggregateInPlaceBy(
iterable,
groupBy,
zeroValueFactory,
mutatingAggregator,
mutableMap,
batchSize,
ParallelIterate.EXECUTOR_SERVICE);
}
public static <T, K, V> MutableMap<K, V> aggregateInPlaceBy(
Iterable<T> iterable,
Function<? super T, ? extends K> groupBy,
Function0<? extends V> zeroValueFactory,
Procedure2<? super V, ? super T> mutatingAggregator,
int batchSize,
Executor executor)
{
MutableMap<K, V> map = ConcurrentHashMap.newMap();
MutatingAggregationProcedure<T, K, V> mutatingAggregationProcedure =
new MutatingAggregationProcedure<>(map, groupBy, zeroValueFactory, mutatingAggregator);
ParallelIterate.forEach(
iterable,
new PassThruProcedureFactory<>(mutatingAggregationProcedure),
Combiners.<Procedure<T>>passThru(),
batchSize,
executor);
return map;
}
public static <T, K, V, R extends MutableMapIterable<K, V>> R aggregateInPlaceBy(
Iterable<T> iterable,
Function<? super T, ? extends K> groupBy,
Function0<? extends V> zeroValueFactory,
Procedure2<? super V, ? super T> mutatingAggregator,
R mutableMap,
int batchSize,
Executor executor)
{
MutatingAggregationProcedure<T, K, V> mutatingAggregationProcedure =
new MutatingAggregationProcedure<>(mutableMap, groupBy, zeroValueFactory, mutatingAggregator);
ParallelIterate.forEach(
iterable,
new PassThruProcedureFactory<>(mutatingAggregationProcedure),
Combiners.<Procedure<T>>passThru(),
batchSize,
executor);
return mutableMap;
}
public static <K, V, R extends MutableMultimap<K, V>> MutableMultimap<K, V> groupBy(
Iterable<V> iterable,
Function<? super V, ? extends K> function,
R concurrentMultimap)
{
return ParallelIterate.groupBy(iterable, function, concurrentMultimap, ParallelIterate.DEFAULT_MIN_FORK_SIZE);
}
public static <K, V, R extends MutableMultimap<K, V>> MutableMultimap<K, V> groupBy(
Iterable<V> iterable,
Function<? super V, ? extends K> function,
R concurrentMultimap,
int batchSize)
{
return ParallelIterate.groupBy(iterable, function, concurrentMultimap, batchSize, ParallelIterate.EXECUTOR_SERVICE);
}
public static <K, V> MutableMultimap<K, V> groupBy(
Iterable<V> iterable,
Function<? super V, ? extends K> function,
int batchSize)
{
return ParallelIterate.groupBy(iterable, function, batchSize, ParallelIterate.EXECUTOR_SERVICE);
}
public static <K, V> MutableMultimap<K, V> groupBy(
Iterable<V> iterable,
Function<? super V, ? extends K> function,
int batchSize,
Executor executor)
{
return ParallelIterate.groupBy(iterable, function, SynchronizedPutFastListMultimap.newMultimap(), batchSize, executor);
}
public static <K, V, R extends MutableMultimap<K, V>> MutableMultimap<K, V> groupBy(
Iterable<V> iterable,
Function<? super V, ? extends K> function,
R concurrentMultimap,
int batchSize,
Executor executor)
{
ParallelIterate.forEach(
iterable,
new PassThruProcedureFactory<>(new MultimapPutProcedure<>(concurrentMultimap, function)),
Combiners.<Procedure<V>>passThru(),
batchSize,
executor);
return concurrentMultimap;
}
public static <T, V> ObjectDoubleMap<V> sumByDouble(
Iterable<T> iterable,
Function<? super T, ? extends V> groupBy,
DoubleFunction<? super T> function)
{
ObjectDoubleHashMap<V> result = ObjectDoubleHashMap.newMap();
ParallelIterate.forEach(
iterable,
new SumByDoubleProcedure<>(groupBy, function),
new SumByDoubleCombiner<>(result),
ParallelIterate.DEFAULT_MIN_FORK_SIZE,
ParallelIterate.EXECUTOR_SERVICE);
return result;
}
public static <T, V> ObjectDoubleMap<V> sumByFloat(
Iterable<T> iterable,
Function<? super T, ? extends V> groupBy,
FloatFunction<? super T> function)
{
ObjectDoubleHashMap<V> result = ObjectDoubleHashMap.newMap();
ParallelIterate.forEach(
iterable,
new SumByFloatProcedure<>(groupBy, function),
new SumByFloatCombiner<>(result),
ParallelIterate.DEFAULT_MIN_FORK_SIZE,
ParallelIterate.EXECUTOR_SERVICE);
return result;
}
public static <T, V> ObjectLongMap<V> sumByLong(
Iterable<T> iterable,
Function<? super T, ? extends V> groupBy,
LongFunction<? super T> function)
{
ObjectLongHashMap<V> result = ObjectLongHashMap.newMap();
ParallelIterate.forEach(
iterable,
new SumByLongProcedure<>(groupBy, function),
new SumByLongCombiner<>(result),
ParallelIterate.DEFAULT_MIN_FORK_SIZE,
ParallelIterate.EXECUTOR_SERVICE);
return result;
}
public static <T, V> ObjectLongMap<V> sumByInt(
Iterable<T> iterable,
Function<? super T, ? extends V> groupBy,
IntFunction<? super T> function)
{
ObjectLongHashMap<V> result = ObjectLongHashMap.newMap();
ParallelIterate.forEach(
iterable,
new SumByIntProcedure<>(groupBy, function),
new SumByIntCombiner<>(result),
ParallelIterate.DEFAULT_MIN_FORK_SIZE,
ParallelIterate.EXECUTOR_SERVICE);
return result;
}
public static <V, T> MutableMap<V, BigDecimal> sumByBigDecimal(
Iterable<T> iterable,
Function<? super T, ? extends V> groupBy,
Function<? super T, BigDecimal> function)
{
MutableMap<V, BigDecimal> result = UnifiedMap.newMap();
ParallelIterate.forEach(
iterable,
new SumByBigDecimalProcedure<>(groupBy, function),
new SumByBigDecimalCombiner<>(result),
ParallelIterate.DEFAULT_MIN_FORK_SIZE,
ParallelIterate.EXECUTOR_SERVICE);
return result;
}
public static <V, T> MutableMap<V, BigInteger> sumByBigInteger(
Iterable<T> iterable,
Function<? super T, ? extends V> groupBy,
Function<? super T, BigInteger> function)
{
MutableMap<V, BigInteger> result = UnifiedMap.newMap();
ParallelIterate.forEach(
iterable,
new SumByBigIntegerProcedure<>(groupBy, function),
new SumByBigIntegerCombiner<>(result),
ParallelIterate.DEFAULT_MIN_FORK_SIZE,
ParallelIterate.EXECUTOR_SERVICE);
return result;
}
public static ExecutorService newPooledExecutor(int newPoolSize, String poolName, boolean useDaemonThreads)
{
return new ThreadPoolExecutor(
newPoolSize,
newPoolSize,
0L,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
new CollectionsThreadFactory(poolName, useDaemonThreads),
new ThreadPoolExecutor.CallerRunsPolicy());
}
public static ExecutorService newPooledExecutor(String poolName, boolean useDaemonThreads)
{
return ParallelIterate.newPooledExecutor(ParallelIterate.getDefaultMaxThreadPoolSize(), poolName, useDaemonThreads);
}
public static int getDefaultTaskCount()
{
return ParallelIterate.getDefaultMaxThreadPoolSize() * ParallelIterate.getTaskRatio();
}
public static int getDefaultMaxThreadPoolSize()
{
return Math.min(AVAILABLE_PROCESSORS + 1, 100);
}
public static int getTaskRatio()
{
return TASK_RATIO;
}
private static final class SumByDoubleProcedure<T, V> implements Procedure<T>, ProcedureFactory<SumByDoubleProcedure<T, V>>
{
private final MutableMap<V, DoubleDoublePair> map = Maps.mutable.of();
private final Function<? super T, ? extends V> groupBy;
private final DoubleFunction<? super T> function;
private SumByDoubleProcedure(Function<? super T, ? extends V> groupBy, DoubleFunction<? super T> function)
{
this.groupBy = groupBy;
this.function = function;
}
@Override
public void value(T each)
{
V groupKey = this.groupBy.valueOf(each);
DoubleDoublePair sumCompensation = this.map.getIfAbsentPut(groupKey, () -> PrimitiveTuples.pair(0.0d, 0.0d));
double sum = sumCompensation.getOne();
double compensation = sumCompensation.getTwo();
double adjustedValue = this.function.doubleValueOf(each) - compensation;
double nextSum = sum + adjustedValue;
compensation = nextSum - sum - adjustedValue;
sum = nextSum;
this.map.put(groupKey, PrimitiveTuples.pair(sum, compensation));
}
public MutableMap<V, DoubleDoublePair> getResult()
{
return this.map;
}
@Override
public SumByDoubleProcedure<T, V> create()
{
return new SumByDoubleProcedure<>(this.groupBy, this.function);
}
}
private static final class SumByDoubleCombiner<T, V> extends AbstractProcedureCombiner<SumByDoubleProcedure<T, V>>
{
private final ObjectDoubleHashMap<V> result;
private final ObjectDoubleHashMap<V> compensation = ObjectDoubleHashMap.newMap();
private SumByDoubleCombiner(ObjectDoubleHashMap<V> result)
{
super(true);
this.result = result;
}
@Override
public void combineOne(SumByDoubleProcedure<T, V> thingToCombine)
{
if (this.result.isEmpty())
{
thingToCombine.getResult().forEachKeyValue((each, sumCompensation) -> {
this.result.put(each, sumCompensation.getOne());
this.compensation.put(each, sumCompensation.getTwo());
});
}
else
{
thingToCombine.getResult().forEachKeyValue((each, sumCompensation) -> {
double sum = this.result.get(each);
double currentCompensation = this.compensation.getIfAbsentPut(each, () -> 0.0d) + sumCompensation.getTwo();
double adjustedValue = sumCompensation.getOne() - currentCompensation;
double nextSum = sum + adjustedValue;
this.compensation.put(each, nextSum - sum - adjustedValue);
this.result.put(each, nextSum);
});
}
}
}
private static final class SumByFloatProcedure<T, V> implements Procedure<T>, ProcedureFactory<SumByFloatProcedure<T, V>>
{
private final MutableMap<V, DoubleDoublePair> map = Maps.mutable.of();
private final Function<? super T, ? extends V> groupBy;
private final FloatFunction<? super T> function;
private SumByFloatProcedure(Function<? super T, ? extends V> groupBy, FloatFunction<? super T> function)
{
this.groupBy = groupBy;
this.function = function;
}
@Override
public void value(T each)
{
V groupKey = this.groupBy.valueOf(each);
DoubleDoublePair sumCompensation = this.map.getIfAbsentPut(groupKey, () -> PrimitiveTuples.pair(0.0d, 0.0d));
double sum = sumCompensation.getOne();
double compensation = sumCompensation.getTwo();
double adjustedValue = this.function.floatValueOf(each) - compensation;
double nextSum = sum + adjustedValue;
compensation = nextSum - sum - adjustedValue;
sum = nextSum;
this.map.put(groupKey, PrimitiveTuples.pair(sum, compensation));
}
public MutableMap<V, DoubleDoublePair> getResult()
{
return this.map;
}
@Override
public SumByFloatProcedure<T, V> create()
{
return new SumByFloatProcedure<>(this.groupBy, this.function);
}
}
private static final class SumByFloatCombiner<T, V> extends AbstractProcedureCombiner<SumByFloatProcedure<T, V>>
{
private final ObjectDoubleHashMap<V> result;
private final ObjectDoubleHashMap<V> compensation = ObjectDoubleHashMap.newMap();
private SumByFloatCombiner(ObjectDoubleHashMap<V> result)
{
super(true);
this.result = result;
}
@Override
public void combineOne(SumByFloatProcedure<T, V> thingToCombine)
{
if (this.result.isEmpty())
{
thingToCombine.getResult().forEachKeyValue((each, sumCompensation) -> {
this.result.put(each, sumCompensation.getOne());
this.compensation.put(each, sumCompensation.getTwo());
});
}
else
{
thingToCombine.getResult().forEachKeyValue((each, sumCompensation) -> {
double sum = this.result.get(each);
double currentCompensation = this.compensation.getIfAbsentPut(each, () -> 0.0d) + sumCompensation.getTwo();
double adjustedValue = sumCompensation.getOne() - currentCompensation;
double nextSum = sum + adjustedValue;
this.compensation.put(each, nextSum - sum - adjustedValue);
this.result.put(each, nextSum);
});
}
}
}
private static final class SumByLongProcedure<T, V> implements Procedure<T>, ProcedureFactory<SumByLongProcedure<T, V>>
{
private final ObjectLongHashMap<V> map = ObjectLongHashMap.newMap();
private final Function<? super T, ? extends V> groupBy;
private final LongFunction<? super T> function;
private SumByLongProcedure(Function<? super T, ? extends V> groupBy, LongFunction<? super T> function)
{
this.groupBy = groupBy;
this.function = function;
}
@Override
public void value(T each)
{
this.map.addToValue(this.groupBy.valueOf(each), this.function.longValueOf(each));
}
public ObjectLongHashMap<V> getResult()
{
return this.map;
}
@Override
public SumByLongProcedure<T, V> create()
{
return new SumByLongProcedure<>(this.groupBy, this.function);
}
}
private static final class SumByLongCombiner<T, V> extends AbstractProcedureCombiner<SumByLongProcedure<T, V>>
{
private final ObjectLongHashMap<V> result;
private SumByLongCombiner(ObjectLongHashMap<V> result)
{
super(true);
this.result = result;
}
@Override
public void combineOne(SumByLongProcedure<T, V> thingToCombine)
{
if (this.result.isEmpty())
{
this.result.putAll(thingToCombine.getResult());
}
else
{
thingToCombine.getResult().forEachKeyValue(this.result::addToValue);
}
}
}
private static final class SumByIntProcedure<T, V> implements Procedure<T>, ProcedureFactory<SumByIntProcedure<T, V>>
{
private final ObjectLongHashMap<V> map = ObjectLongHashMap.newMap();
private final Function<? super T, ? extends V> groupBy;
private final IntFunction<? super T> function;
private SumByIntProcedure(Function<? super T, ? extends V> groupBy, IntFunction<? super T> function)
{
this.groupBy = groupBy;
this.function = function;
}
@Override
public void value(T each)
{
this.map.addToValue(this.groupBy.valueOf(each), (long) this.function.intValueOf(each));
}
public ObjectLongHashMap<V> getResult()
{
return this.map;
}
@Override
public SumByIntProcedure<T, V> create()
{
return new SumByIntProcedure<>(this.groupBy, this.function);
}
}
private static final class SumByIntCombiner<T, V> extends AbstractProcedureCombiner<SumByIntProcedure<T, V>>
{
private final ObjectLongHashMap<V> result;
private SumByIntCombiner(ObjectLongHashMap<V> result)
{
super(true);
this.result = result;
}
@Override
public void combineOne(SumByIntProcedure<T, V> thingToCombine)
{
if (this.result.isEmpty())
{
this.result.putAll(thingToCombine.getResult());
}
else
{
thingToCombine.getResult().forEachKeyValue(this.result::addToValue);
}
}
}
private static final class SumByBigDecimalProcedure<T, V> implements Procedure<T>, ProcedureFactory<SumByBigDecimalProcedure<T, V>>
{
private final MutableMap<V, BigDecimal> map = UnifiedMap.newMap();
private final Function<? super T, ? extends V> groupBy;
private final Function<? super T, BigDecimal> function;
private SumByBigDecimalProcedure(Function<? super T, ? extends V> groupBy, Function<? super T, BigDecimal> function)
{
this.groupBy = groupBy;
this.function = function;
}
@Override
public void value(T each)
{
this.map.updateValue(this.groupBy.valueOf(each), Functions0.zeroBigDecimal(), original -> original.add(this.function.valueOf(each)));
}
public MutableMap<V, BigDecimal> getResult()
{
return this.map;
}
@Override
public SumByBigDecimalProcedure<T, V> create()
{
return new SumByBigDecimalProcedure<>(this.groupBy, this.function);
}
}
private static final class SumByBigDecimalCombiner<T, V> extends AbstractProcedureCombiner<SumByBigDecimalProcedure<T, V>>
{
private final MutableMap<V, BigDecimal> result;
private SumByBigDecimalCombiner(MutableMap<V, BigDecimal> result)
{
super(true);
this.result = result;
}
@Override
public void combineOne(SumByBigDecimalProcedure<T, V> thingToCombine)
{
if (this.result.isEmpty())
{
this.result.putAll(thingToCombine.getResult());
}
else
{
thingToCombine.getResult().forEachKeyValue((key, value) -> this.result.updateValue(key, Functions0.zeroBigDecimal(), original -> original.add(value)));
}
}
}
private static final class SumByBigIntegerProcedure<T, V> implements Procedure<T>, ProcedureFactory<SumByBigIntegerProcedure<T, V>>
{
private final MutableMap<V, BigInteger> map = UnifiedMap.newMap();
private final Function<? super T, ? extends V> groupBy;
private final Function<? super T, BigInteger> function;
private SumByBigIntegerProcedure(Function<? super T, ? extends V> groupBy, Function<? super T, BigInteger> function)
{
this.groupBy = groupBy;
this.function = function;
}
@Override
public void value(T each)
{
this.map.updateValue(this.groupBy.valueOf(each), Functions0.zeroBigInteger(), original -> original.add(this.function.valueOf(each)));
}
public MutableMap<V, BigInteger> getResult()
{
return this.map;
}
@Override
public SumByBigIntegerProcedure<T, V> create()
{
return new SumByBigIntegerProcedure<>(this.groupBy, this.function);
}
}
private static final class SumByBigIntegerCombiner<T, V> extends AbstractProcedureCombiner<SumByBigIntegerProcedure<T, V>>
{
private final MutableMap<V, BigInteger> result;
private SumByBigIntegerCombiner(MutableMap<V, BigInteger> result)
{
super(true);
this.result = result;
}
@Override
public void combineOne(SumByBigIntegerProcedure<T, V> thingToCombine)
{
if (this.result.isEmpty())
{
this.result.putAll(thingToCombine.getResult());
}
else
{
thingToCombine.getResult().forEachKeyValue((key, value) -> this.result.updateValue(key, Functions0.zeroBigInteger(), original -> original.add(value)));
}
}
}
}