/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.streaming;

import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;

StreamCoordinator is a helper class that abstracts away maintaining multiple StreamSession and ProgressInfo instances per peer. This class coordinates multiple SessionStreams per peer in both the outgoing StreamPlan context and on the inbound StreamResultFuture context.
/** * {@link StreamCoordinator} is a helper class that abstracts away maintaining multiple * StreamSession and ProgressInfo instances per peer. * * This class coordinates multiple SessionStreams per peer in both the outgoing StreamPlan context and on the * inbound StreamResultFuture context. */
public class StreamCoordinator { private static final Logger logger = LoggerFactory.getLogger(StreamCoordinator.class); // Executor strictly for establishing the initial connections. Once we're connected to the other end the rest of the // streaming is handled directly by the ConnectionHandler's incoming and outgoing threads. private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher", FBUtilities.getAvailableProcessors()); private final boolean connectSequentially; private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>(); private final int connectionsPerHost; private StreamConnectionFactory factory; private final boolean keepSSTableLevel; private final boolean isIncremental; private Iterator<StreamSession> sessionsToConnect = null; public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, boolean isIncremental, StreamConnectionFactory factory, boolean connectSequentially) { this.connectionsPerHost = connectionsPerHost; this.factory = factory; this.keepSSTableLevel = keepSSTableLevel; this.isIncremental = isIncremental; this.connectSequentially = connectSequentially; } public void setConnectionFactory(StreamConnectionFactory factory) { this.factory = factory; }
Returns:true if any stream session is active
/** * @return true if any stream session is active */
public synchronized boolean hasActiveSessions() { for (HostStreamingData data : peerSessions.values()) { if (data.hasActiveSessions()) return true; } return false; } public synchronized Collection<StreamSession> getAllStreamSessions() { Collection<StreamSession> results = new ArrayList<>(); for (HostStreamingData data : peerSessions.values()) { results.addAll(data.getAllStreamSessions()); } return results; } public boolean isReceiving() { return connectionsPerHost == 0; } public void connect(StreamResultFuture future) { if (this.connectSequentially) connectSequentially(future); else connectAllStreamSessions(); } private void connectAllStreamSessions() { for (HostStreamingData data : peerSessions.values()) data.connectAllStreamSessions(); } private void connectSequentially(StreamResultFuture future) { sessionsToConnect = getAllStreamSessions().iterator(); future.addEventListener(new StreamEventHandler() { public void handleStreamEvent(StreamEvent event) { if (event.eventType == StreamEvent.Type.STREAM_PREPARED || event.eventType == StreamEvent.Type.STREAM_COMPLETE) connectNext(); } public void onSuccess(StreamState result) { } public void onFailure(Throwable t) { } }); connectNext(); } private void connectNext() { if (sessionsToConnect == null) return; if (sessionsToConnect.hasNext()) { StreamSession next = sessionsToConnect.next(); logger.debug("Connecting next session {} with {}.", next.planId(), next.peer.getHostAddress()); streamExecutor.execute(new StreamSessionConnector(next)); } else logger.debug("Finished connecting all sessions"); } public synchronized Set<InetAddress> getPeers() { return new HashSet<>(peerSessions.keySet()); } public synchronized StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connecting) { return getOrCreateHostData(peer).getOrCreateNextSession(peer, connecting); } public synchronized StreamSession getOrCreateSessionById(InetAddress peer, int id, InetAddress connecting) { return getOrCreateHostData(peer).getOrCreateSessionById(peer, id, connecting); } public synchronized void updateProgress(ProgressInfo info) { getHostData(info.peer).updateProgress(info); } public synchronized void addSessionInfo(SessionInfo session) { HostStreamingData data = getOrCreateHostData(session.peer); data.addSessionInfo(session); } public synchronized Set<SessionInfo> getAllSessionInfo() { Set<SessionInfo> result = new HashSet<>(); for (HostStreamingData data : peerSessions.values()) { result.addAll(data.getAllSessionInfo()); } return result; } public synchronized void transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails) { HostStreamingData sessionList = getOrCreateHostData(to); if (connectionsPerHost > 1) { List<List<StreamSession.SSTableStreamingSections>> buckets = sliceSSTableDetails(sstableDetails); for (List<StreamSession.SSTableStreamingSections> subList : buckets) { StreamSession session = sessionList.getOrCreateNextSession(to, to); session.addTransferFiles(subList); } } else { StreamSession session = sessionList.getOrCreateNextSession(to, to); session.addTransferFiles(sstableDetails); } } private List<List<StreamSession.SSTableStreamingSections>> sliceSSTableDetails(Collection<StreamSession.SSTableStreamingSections> sstableDetails) { // There's no point in divvying things up into more buckets than we have sstableDetails int targetSlices = Math.min(sstableDetails.size(), connectionsPerHost); int step = Math.round((float) sstableDetails.size() / (float) targetSlices); int index = 0; List<List<StreamSession.SSTableStreamingSections>> result = new ArrayList<>(); List<StreamSession.SSTableStreamingSections> slice = null; Iterator<StreamSession.SSTableStreamingSections> iter = sstableDetails.iterator(); while (iter.hasNext()) { StreamSession.SSTableStreamingSections streamSession = iter.next(); if (index % step == 0) { slice = new ArrayList<>(); result.add(slice); } slice.add(streamSession); ++index; iter.remove(); } return result; } private HostStreamingData getHostData(InetAddress peer) { HostStreamingData data = peerSessions.get(peer); if (data == null) throw new IllegalArgumentException("Unknown peer requested: " + peer); return data; } private HostStreamingData getOrCreateHostData(InetAddress peer) { HostStreamingData data = peerSessions.get(peer); if (data == null) { data = new HostStreamingData(); peerSessions.put(peer, data); } return data; } private static class StreamSessionConnector implements Runnable { private final StreamSession session; public StreamSessionConnector(StreamSession session) { this.session = session; } @Override public void run() { session.start(); logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", session.planId(), session.sessionIndex(), session.peer); } } private class HostStreamingData { private Map<Integer, StreamSession> streamSessions = new HashMap<>(); private Map<Integer, SessionInfo> sessionInfos = new HashMap<>(); private int lastReturned = -1; public boolean hasActiveSessions() { for (StreamSession session : streamSessions.values()) { StreamSession.State state = session.state(); if (state != StreamSession.State.COMPLETE && state != StreamSession.State.FAILED) return true; } return false; } public StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connecting) { // create if (streamSessions.size() < connectionsPerHost) { StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, isIncremental); streamSessions.put(++lastReturned, session); return session; } // get else { if (lastReturned >= streamSessions.size() - 1) lastReturned = 0; return streamSessions.get(lastReturned++); } } public void connectAllStreamSessions() { for (StreamSession session : streamSessions.values()) { streamExecutor.execute(new StreamSessionConnector(session)); } } public Collection<StreamSession> getAllStreamSessions() { return Collections.unmodifiableCollection(streamSessions.values()); } public StreamSession getOrCreateSessionById(InetAddress peer, int id, InetAddress connecting) { StreamSession session = streamSessions.get(id); if (session == null) { session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, isIncremental); streamSessions.put(id, session); } return session; } public void updateProgress(ProgressInfo info) { sessionInfos.get(info.sessionIndex).updateProgress(info); } public void addSessionInfo(SessionInfo info) { sessionInfos.put(info.sessionIndex, info); } public Collection<SessionInfo> getAllSessionInfo() { return sessionInfos.values(); } } @VisibleForTesting public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { ExecutorUtils.shutdownAndWait(timeout, unit, streamExecutor); } }