package com.datastax.oss.driver.internal.core.cql;
import com.datastax.oss.driver.api.core.PagingIterable;
import com.datastax.oss.driver.shaded.guava.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.Objects;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Consumer;
import net.jcip.annotations.NotThreadSafe;
@NotThreadSafe
public class PagingIterableSpliterator<ElementT> implements Spliterator<ElementT> {
@NonNull
public static <ElementT> Builder<ElementT> builder(@NonNull PagingIterable<ElementT> iterable) {
return new Builder<>(iterable);
}
public static final int DEFAULT_CHUNK_SIZE = 128;
private final PagingIterable<ElementT> iterable;
private long estimatedSize;
private final int chunkSize;
private final int characteristics;
public PagingIterableSpliterator(@NonNull PagingIterable<ElementT> iterable) {
this(iterable, Long.MAX_VALUE, DEFAULT_CHUNK_SIZE);
}
private PagingIterableSpliterator(
@NonNull PagingIterable<ElementT> iterable, long estimatedSize, int chunkSize) {
this.iterable = Objects.requireNonNull(iterable, "iterable cannot be null");
this.estimatedSize = estimatedSize;
this.chunkSize = chunkSize;
if (estimatedSize < Long.MAX_VALUE) {
characteristics =
Spliterator.ORDERED
| Spliterator.IMMUTABLE
| Spliterator.NONNULL
| Spliterator.SIZED
| Spliterator.SUBSIZED;
} else {
characteristics = Spliterator.ORDERED | Spliterator.IMMUTABLE | Spliterator.NONNULL;
}
}
@Override
public boolean tryAdvance(Consumer<? super ElementT> action) {
Objects.requireNonNull(action, "action cannot be null");
ElementT row = iterable.one();
if (row == null) {
return false;
}
action.accept(row);
return true;
}
@Override
@Nullable
public Spliterator<ElementT> trySplit() {
if (estimatedSize != Long.MAX_VALUE && estimatedSize <= chunkSize) {
return null;
}
ElementT row = iterable.one();
if (row == null) {
return null;
}
Object[] array = new Object[chunkSize];
int i = 0;
do {
array[i++] = row;
if (i < chunkSize) {
row = iterable.one();
} else {
break;
}
} while (row != null);
if (estimatedSize != Long.MAX_VALUE) {
estimatedSize -= i;
}
return Spliterators.spliterator(array, 0, i, characteristics());
}
@Override
public void forEachRemaining(Consumer<? super ElementT> action) {
iterable.iterator().forEachRemaining(action);
}
@Override
public long estimateSize() {
return estimatedSize;
}
@Override
public int characteristics() {
return characteristics;
}
public static class Builder<ElementT> {
private final PagingIterable<ElementT> iterable;
private long estimatedSize = Long.MAX_VALUE;
private int chunkSize = DEFAULT_CHUNK_SIZE;
Builder(@NonNull PagingIterable<ElementT> iterable) {
this.iterable = iterable;
}
@NonNull
public Builder<ElementT> withEstimatedSize(long estimatedSize) {
Preconditions.checkArgument(estimatedSize >= 0, "estimatedSize must be >= 0");
this.estimatedSize = estimatedSize;
return this;
}
@NonNull
public Builder<ElementT> withChunkSize(int chunkSize) {
Preconditions.checkArgument(chunkSize > 0, "chunkSize must be > 0");
this.chunkSize = chunkSize;
return this;
}
@NonNull
public PagingIterableSpliterator<ElementT> build() {
return new PagingIterableSpliterator<>(iterable, estimatedSize, chunkSize);
}
}
}