package org.apache.cassandra.thrift;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.utils.AbstractIterator;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.partitions.*;
public class ThriftResultsMerger extends Transformation<UnfilteredRowIterator>
{
private final int nowInSec;
private ThriftResultsMerger(int nowInSec)
{
this.nowInSec = nowInSec;
}
public static UnfilteredPartitionIterator maybeWrap(UnfilteredPartitionIterator iterator, CFMetaData metadata, int nowInSec)
{
if (!metadata.isStaticCompactTable() && !metadata.isSuper())
return iterator;
return Transformation.apply(iterator, new ThriftResultsMerger(nowInSec));
}
public static UnfilteredRowIterator maybeWrap(UnfilteredRowIterator iterator, int nowInSec)
{
if (!iterator.metadata().isStaticCompactTable() && !iterator.metadata().isSuper())
return iterator;
return iterator.metadata().isSuper()
? Transformation.apply(iterator, new SuperColumnsPartitionMerger(iterator, nowInSec))
: new PartitionMerger(iterator, nowInSec);
}
@Override
public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
{
return iter.metadata().isSuper()
? Transformation.apply(iter, new SuperColumnsPartitionMerger(iter, nowInSec))
: new PartitionMerger(iter, nowInSec);
}
private static class PartitionMerger extends WrappingUnfilteredRowIterator
{
private final int nowInSec;
private boolean isInit;
private Iterator<Cell> staticCells;
private final Row.Builder builder;
private Row nextToMerge;
private Unfiltered nextFromWrapped;
private PartitionMerger(UnfilteredRowIterator results, int nowInSec)
{
super(results);
assert results.metadata().isStaticCompactTable();
this.nowInSec = nowInSec;
this.builder = BTreeRow.sortedBuilder();
}
private void init()
{
assert !isInit;
Row staticRow = super.staticRow();
assert !staticRow.hasComplex();
staticCells = staticRow.cells().iterator();
updateNextToMerge();
isInit = true;
}
@Override
public Row staticRow()
{
return Rows.EMPTY_STATIC_ROW;
}
@Override
public boolean hasNext()
{
if (!isInit)
init();
return nextFromWrapped != null || nextToMerge != null || super.hasNext();
}
@Override
public Unfiltered next()
{
if (!isInit)
init();
if (nextFromWrapped == null && super.hasNext())
nextFromWrapped = super.next();
if (nextFromWrapped == null)
{
if (nextToMerge == null)
throw new NoSuchElementException();
return consumeNextToMerge();
}
if (nextToMerge == null)
return consumeNextWrapped();
int cmp = metadata().comparator.compare(nextToMerge, nextFromWrapped);
if (cmp < 0)
return consumeNextToMerge();
if (cmp > 0)
return consumeNextWrapped();
assert nextFromWrapped instanceof Row;
return Rows.merge((Row)consumeNextWrapped(), consumeNextToMerge(), nowInSec);
}
private Unfiltered consumeNextWrapped()
{
Unfiltered toReturn = nextFromWrapped;
nextFromWrapped = null;
return toReturn;
}
private Row consumeNextToMerge()
{
Row toReturn = nextToMerge;
updateNextToMerge();
return toReturn;
}
private void updateNextToMerge()
{
if (!staticCells.hasNext())
{
nextToMerge = null;
return;
}
Cell cell = staticCells.next();
builder.newRow(Clustering.make(cell.column().name.bytes));
builder.addCell(new BufferCell(metadata().compactValueColumn(), cell.timestamp(), cell.ttl(), cell.localDeletionTime(), cell.value(), cell.path()));
nextToMerge = builder.build();
}
}
private static class SuperColumnsPartitionMerger extends Transformation
{
private final int nowInSec;
private final Row.Builder builder;
private final ColumnDefinition superColumnMapColumn;
private final AbstractType<?> columnComparator;
private SuperColumnsPartitionMerger(UnfilteredRowIterator applyTo, int nowInSec)
{
assert applyTo.metadata().isSuper();
this.nowInSec = nowInSec;
this.superColumnMapColumn = applyTo.metadata().compactValueColumn();
assert superColumnMapColumn != null && superColumnMapColumn.type instanceof MapType;
this.builder = BTreeRow.sortedBuilder();
this.columnComparator = ((MapType)superColumnMapColumn.type).nameComparator();
}
@Override
public Row applyToRow(Row row)
{
PeekingIterator<Cell> staticCells = Iterators.peekingIterator(simpleCellsIterator(row));
if (!staticCells.hasNext())
return row;
builder.newRow(row.clustering());
ComplexColumnData complexData = row.getComplexColumnData(superColumnMapColumn);
PeekingIterator<Cell> dynamicCells;
if (complexData == null)
{
dynamicCells = Iterators.peekingIterator(Collections.<Cell>emptyIterator());
}
else
{
dynamicCells = Iterators.peekingIterator(complexData.iterator());
builder.addComplexDeletion(superColumnMapColumn, complexData.complexDeletion());
}
while (staticCells.hasNext() && dynamicCells.hasNext())
{
Cell staticCell = staticCells.peek();
Cell dynamicCell = dynamicCells.peek();
int cmp = columnComparator.compare(staticCell.column().name.bytes, dynamicCell.path().get(0));
if (cmp < 0)
builder.addCell(makeDynamicCell(staticCells.next()));
else if (cmp > 0)
builder.addCell(dynamicCells.next());
else
builder.addCell(Cells.reconcile(makeDynamicCell(staticCells.next()), dynamicCells.next(), nowInSec));
}
while (staticCells.hasNext())
builder.addCell(makeDynamicCell(staticCells.next()));
while (dynamicCells.hasNext())
builder.addCell(dynamicCells.next());
return builder.build();
}
private Cell makeDynamicCell(Cell staticCell)
{
return new BufferCell(superColumnMapColumn, staticCell.timestamp(), staticCell.ttl(), staticCell.localDeletionTime(), staticCell.value(), CellPath.create(staticCell.column().name.bytes));
}
private Iterator<Cell> simpleCellsIterator(Row row)
{
final Iterator<Cell> cells = row.cells().iterator();
return new AbstractIterator<Cell>()
{
protected Cell computeNext()
{
if (cells.hasNext())
{
Cell cell = cells.next();
if (cell.column().isSimple())
return cell;
}
return endOfData();
}
};
}
}
}