/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.cassandra.cql3;

import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.AbstractIterator;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.service.pager.QueryPager;
import org.apache.cassandra.utils.FBUtilities;

a utility for doing internal cql-based queries
/** a utility for doing internal cql-based queries */
public abstract class UntypedResultSet implements Iterable<UntypedResultSet.Row> { public static UntypedResultSet create(ResultSet rs) { return new FromResultSet(rs); } public static UntypedResultSet create(List<Map<String, ByteBuffer>> results) { return new FromResultList(results); } public static UntypedResultSet create(SelectStatement select, QueryPager pager, int pageSize) { return new FromPager(select, pager, pageSize); }
This method is intended for testing purposes, since it executes query on cluster and not on the local node only.
/** * This method is intended for testing purposes, since it executes query on cluster * and not on the local node only. */
@VisibleForTesting public static UntypedResultSet create(SelectStatement select, ConsistencyLevel cl, ClientState clientState, QueryPager pager, int pageSize) { return new FromDistributedPager(select, cl, clientState, pager, pageSize); } public boolean isEmpty() { return size() == 0; } public abstract int size(); public abstract Row one(); // No implemented by all subclasses, but we use it when we know it's there (for tests) public abstract List<ColumnSpecification> metadata(); private static class FromResultSet extends UntypedResultSet { private final ResultSet cqlRows; private FromResultSet(ResultSet cqlRows) { this.cqlRows = cqlRows; } public int size() { return cqlRows.size(); } public Row one() { if (cqlRows.size() != 1) throw new IllegalStateException("One row required, " + cqlRows.size() + " found"); return new Row(cqlRows.metadata.requestNames(), cqlRows.rows.get(0)); } public Iterator<Row> iterator() { return new AbstractIterator<Row>() { Iterator<List<ByteBuffer>> iter = cqlRows.rows.iterator(); protected Row computeNext() { if (!iter.hasNext()) return endOfData(); return new Row(cqlRows.metadata.requestNames(), iter.next()); } }; } public List<ColumnSpecification> metadata() { return cqlRows.metadata.requestNames(); } }
Pager that calls `execute` rather than `executeInternal`
/** * Pager that calls `execute` rather than `executeInternal` */
private static class FromDistributedPager extends UntypedResultSet { private final SelectStatement select; private final ConsistencyLevel cl; private final ClientState clientState; private final QueryPager pager; private final int pageSize; private final List<ColumnSpecification> metadata; private FromDistributedPager(SelectStatement select, ConsistencyLevel cl, ClientState clientState, QueryPager pager, int pageSize) { this.select = select; this.cl = cl; this.clientState = clientState; this.pager = pager; this.pageSize = pageSize; this.metadata = select.getResultMetadata().requestNames(); } public int size() { throw new UnsupportedOperationException(); } public Row one() { throw new UnsupportedOperationException(); } public Iterator<Row> iterator() { return new AbstractIterator<Row>() { private Iterator<List<ByteBuffer>> currentPage; protected Row computeNext() { int nowInSec = FBUtilities.nowInSeconds(); while (currentPage == null || !currentPage.hasNext()) { if (pager.isExhausted()) return endOfData(); try (PartitionIterator iter = pager.fetchPage(pageSize, cl, clientState, System.nanoTime())) { currentPage = select.process(iter, nowInSec).rows.iterator(); } } return new Row(metadata, currentPage.next()); } }; } public List<ColumnSpecification> metadata() { return metadata; } } private static class FromResultList extends UntypedResultSet { private final List<Map<String, ByteBuffer>> cqlRows; private FromResultList(List<Map<String, ByteBuffer>> cqlRows) { this.cqlRows = cqlRows; } public int size() { return cqlRows.size(); } public Row one() { if (cqlRows.size() != 1) throw new IllegalStateException("One row required, " + cqlRows.size() + " found"); return new Row(cqlRows.get(0)); } public Iterator<Row> iterator() { return new AbstractIterator<Row>() { Iterator<Map<String, ByteBuffer>> iter = cqlRows.iterator(); protected Row computeNext() { if (!iter.hasNext()) return endOfData(); return new Row(iter.next()); } }; } public List<ColumnSpecification> metadata() { throw new UnsupportedOperationException(); } } private static class FromPager extends UntypedResultSet { private final SelectStatement select; private final QueryPager pager; private final int pageSize; private final List<ColumnSpecification> metadata; private FromPager(SelectStatement select, QueryPager pager, int pageSize) { this.select = select; this.pager = pager; this.pageSize = pageSize; this.metadata = select.getResultMetadata().requestNames(); } public int size() { throw new UnsupportedOperationException(); } public Row one() { throw new UnsupportedOperationException(); } public Iterator<Row> iterator() { return new AbstractIterator<Row>() { private Iterator<List<ByteBuffer>> currentPage; protected Row computeNext() { int nowInSec = FBUtilities.nowInSeconds(); while (currentPage == null || !currentPage.hasNext()) { if (pager.isExhausted()) return endOfData(); try (ReadExecutionController executionController = pager.executionController(); PartitionIterator iter = pager.fetchPageInternal(pageSize, executionController)) { currentPage = select.process(iter, nowInSec).rows.iterator(); } } return new Row(metadata, currentPage.next()); } }; } public List<ColumnSpecification> metadata() { return metadata; } } public static class Row { private final Map<String, ByteBuffer> data = new HashMap<>(); private final List<ColumnSpecification> columns = new ArrayList<>(); public Row(Map<String, ByteBuffer> data) { this.data.putAll(data); } public Row(List<ColumnSpecification> names, List<ByteBuffer> columns) { this.columns.addAll(names); for (int i = 0; i < names.size(); i++) data.put(names.get(i).name.toString(), columns.get(i)); } public static Row fromInternalRow(CFMetaData metadata, DecoratedKey key, org.apache.cassandra.db.rows.Row row) { Map<String, ByteBuffer> data = new HashMap<>(); ByteBuffer[] keyComponents = SelectStatement.getComponents(metadata, key); for (ColumnDefinition def : metadata.partitionKeyColumns()) data.put(def.name.toString(), keyComponents[def.position()]); Clustering clustering = row.clustering(); for (ColumnDefinition def : metadata.clusteringColumns()) data.put(def.name.toString(), clustering.get(def.position())); for (ColumnDefinition def : metadata.partitionColumns()) { if (def.isSimple()) { Cell cell = row.getCell(def); if (cell != null) data.put(def.name.toString(), cell.value()); } else { ComplexColumnData complexData = row.getComplexColumnData(def); if (complexData != null) data.put(def.name.toString(), ((CollectionType)def.type).serializeForNativeProtocol(complexData.iterator(), ProtocolVersion.V3)); } } return new Row(data); } public boolean has(String column) { // Note that containsKey won't work because we may have null values return data.get(column) != null; } public ByteBuffer getBlob(String column) { return data.get(column); } public String getString(String column) { return UTF8Type.instance.compose(data.get(column)); } public boolean getBoolean(String column) { return BooleanType.instance.compose(data.get(column)); } public byte getByte(String column) { return ByteType.instance.compose(data.get(column)); } public short getShort(String column) { return ShortType.instance.compose(data.get(column)); } public int getInt(String column) { return Int32Type.instance.compose(data.get(column)); } public double getDouble(String column) { return DoubleType.instance.compose(data.get(column)); } public ByteBuffer getBytes(String column) { return data.get(column); } public InetAddress getInetAddress(String column) { return InetAddressType.instance.compose(data.get(column)); } public UUID getUUID(String column) { return UUIDType.instance.compose(data.get(column)); } public Date getTimestamp(String column) { return TimestampType.instance.compose(data.get(column)); } public long getLong(String column) { return LongType.instance.compose(data.get(column)); } public <T> Set<T> getSet(String column, AbstractType<T> type) { ByteBuffer raw = data.get(column); return raw == null ? null : SetType.getInstance(type, true).compose(raw); } public <T> List<T> getList(String column, AbstractType<T> type) { ByteBuffer raw = data.get(column); return raw == null ? null : ListType.getInstance(type, true).compose(raw); } public <K, V> Map<K, V> getMap(String column, AbstractType<K> keyType, AbstractType<V> valueType) { ByteBuffer raw = data.get(column); return raw == null ? null : MapType.getInstance(keyType, valueType, true).compose(raw); } public Map<String, String> getTextMap(String column) { return getMap(column, UTF8Type.instance, UTF8Type.instance); } public <T> Set<T> getFrozenSet(String column, AbstractType<T> type) { ByteBuffer raw = data.get(column); return raw == null ? null : SetType.getInstance(type, false).compose(raw); } public <T> List<T> getFrozenList(String column, AbstractType<T> type) { ByteBuffer raw = data.get(column); return raw == null ? null : ListType.getInstance(type, false).compose(raw); } public <K, V> Map<K, V> getFrozenMap(String column, AbstractType<K> keyType, AbstractType<V> valueType) { ByteBuffer raw = data.get(column); return raw == null ? null : MapType.getInstance(keyType, valueType, false).compose(raw); } public Map<String, String> getFrozenTextMap(String column) { return getFrozenMap(column, UTF8Type.instance, UTF8Type.instance); } public List<ColumnSpecification> getColumns() { return columns; } @Override public String toString() { return data.toString(); } } }