/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.streaming;

import java.net.InetAddress;
import java.util.*;

import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.UUIDGen;

StreamPlan is a helper class that builds StreamOperation of given configuration. This is the class you want to use for building streaming plan and starting streaming.
/** * {@link StreamPlan} is a helper class that builds StreamOperation of given configuration. * * This is the class you want to use for building streaming plan and starting streaming. */
public class StreamPlan { public static final String[] EMPTY_COLUMN_FAMILIES = new String[0]; private final UUID planId = UUIDGen.getTimeUUID(); private final String description; private final List<StreamEventHandler> handlers = new ArrayList<>(); private final long repairedAt; private final StreamCoordinator coordinator; private boolean flushBeforeTransfer = true;
Start building stream plan.
Params:
  • description – Stream type that describes this StreamPlan
/** * Start building stream plan. * * @param description Stream type that describes this StreamPlan */
public StreamPlan(String description) { this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false, false); } public StreamPlan(String description, boolean keepSSTableLevels, boolean connectSequentially) { this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false, connectSequentially); } public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels, boolean isIncremental, boolean connectSequentially) { this.description = description; this.repairedAt = repairedAt; this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(), connectSequentially); }
Request data in keyspace and ranges from specific node.
Params:
  • from – endpoint address to fetch data from.
  • connecting – Actual connecting address for the endpoint
  • keyspace – name of keyspace
  • ranges – ranges to fetch
Returns:this object for chaining
/** * Request data in {@code keyspace} and {@code ranges} from specific node. * * @param from endpoint address to fetch data from. * @param connecting Actual connecting address for the endpoint * @param keyspace name of keyspace * @param ranges ranges to fetch * @return this object for chaining */
public StreamPlan requestRanges(InetAddress from, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges) { return requestRanges(from, connecting, keyspace, ranges, EMPTY_COLUMN_FAMILIES); }
Request data in columnFamilies under keyspace and ranges from specific node.
Params:
  • from – endpoint address to fetch data from.
  • connecting – Actual connecting address for the endpoint
  • keyspace – name of keyspace
  • ranges – ranges to fetch
  • columnFamilies – specific column families
Returns:this object for chaining
/** * Request data in {@code columnFamilies} under {@code keyspace} and {@code ranges} from specific node. * * @param from endpoint address to fetch data from. * @param connecting Actual connecting address for the endpoint * @param keyspace name of keyspace * @param ranges ranges to fetch * @param columnFamilies specific column families * @return this object for chaining */
public StreamPlan requestRanges(InetAddress from, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) { StreamSession session = coordinator.getOrCreateNextSession(from, connecting); session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies), repairedAt); return this; }
Add transfer task to send data of specific columnFamilies under keyspace and ranges.
See Also:
/** * Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}. * * @see #transferRanges(java.net.InetAddress, java.net.InetAddress, String, java.util.Collection, String...) */
public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) { return transferRanges(to, to, keyspace, ranges, columnFamilies); }
Add transfer task to send data of specific keyspace and ranges.
Params:
  • to – endpoint address of receiver
  • connecting – Actual connecting address of the endpoint
  • keyspace – name of keyspace
  • ranges – ranges to send
Returns:this object for chaining
/** * Add transfer task to send data of specific keyspace and ranges. * * @param to endpoint address of receiver * @param connecting Actual connecting address of the endpoint * @param keyspace name of keyspace * @param ranges ranges to send * @return this object for chaining */
public StreamPlan transferRanges(InetAddress to, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges) { return transferRanges(to, connecting, keyspace, ranges, EMPTY_COLUMN_FAMILIES); }
Add transfer task to send data of specific columnFamilies under keyspace and ranges.
Params:
  • to – endpoint address of receiver
  • connecting – Actual connecting address of the endpoint
  • keyspace – name of keyspace
  • ranges – ranges to send
  • columnFamilies – specific column families
Returns:this object for chaining
/** * Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}. * * @param to endpoint address of receiver * @param connecting Actual connecting address of the endpoint * @param keyspace name of keyspace * @param ranges ranges to send * @param columnFamilies specific column families * @return this object for chaining */
public StreamPlan transferRanges(InetAddress to, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies) { StreamSession session = coordinator.getOrCreateNextSession(to, connecting); session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer, repairedAt); return this; }
Add transfer task to send given SSTable files.
Params:
  • to – endpoint address of receiver
  • sstableDetails – sstables with file positions and estimated key count. this collection will be modified to remove those files that are successfully handed off
Returns:this object for chaining
/** * Add transfer task to send given SSTable files. * * @param to endpoint address of receiver * @param sstableDetails sstables with file positions and estimated key count. * this collection will be modified to remove those files that are successfully handed off * @return this object for chaining */
public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails) { coordinator.transferFiles(to, sstableDetails); return this; } public StreamPlan listeners(StreamEventHandler handler, StreamEventHandler... handlers) { this.handlers.add(handler); if (handlers != null) Collections.addAll(this.handlers, handlers); return this; }
Set custom StreamConnectionFactory to be used for establishing connection
Params:
  • factory – StreamConnectionFactory to use
Returns:self
/** * Set custom StreamConnectionFactory to be used for establishing connection * * @param factory StreamConnectionFactory to use * @return self */
public StreamPlan connectionFactory(StreamConnectionFactory factory) { this.coordinator.setConnectionFactory(factory); return this; }
Returns:true if this plan has no plan to execute
/** * @return true if this plan has no plan to execute */
public boolean isEmpty() { return !coordinator.hasActiveSessions(); }
Execute this StreamPlan asynchronously.
Returns:Future StreamState that you can use to listen on progress of streaming.
/** * Execute this {@link StreamPlan} asynchronously. * * @return Future {@link StreamState} that you can use to listen on progress of streaming. */
public StreamResultFuture execute() { return StreamResultFuture.init(planId, description, handlers, coordinator); }
Set flushBeforeTransfer option. When it's true, will flush before streaming ranges. (Default: true)
Params:
  • flushBeforeTransfer – set to true when the node should flush before transfer
Returns:this object for chaining
/** * Set flushBeforeTransfer option. * When it's true, will flush before streaming ranges. (Default: true) * * @param flushBeforeTransfer set to true when the node should flush before transfer * @return this object for chaining */
public StreamPlan flushBeforeTransfer(boolean flushBeforeTransfer) { this.flushBeforeTransfer = flushBeforeTransfer; return this; } }