 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.cassandra.streaming;

import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;

/** * {@link StreamCoordinator} is a helper class that abstracts away maintaining multiple * StreamSession and ProgressInfo instances per peer. * * This class coordinates multiple SessionStreams per peer in both the outgoing StreamPlan context and on the * inbound StreamResultFuture context. */
public class StreamCoordinator { private static final Logger logger = LoggerFactory.getLogger(StreamCoordinator.class); // Executor strictly for establishing the initial connections. Once we're connected to the other end the rest of the // streaming is handled directly by the ConnectionHandler's incoming and outgoing threads. private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher", FBUtilities.getAvailableProcessors()); private final boolean connectSequentially; private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>(); private final int connectionsPerHost; private StreamConnectionFactory factory; private final boolean keepSSTableLevel; private final boolean isIncremental; private Iterator<StreamSession> sessionsToConnect = null; public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, boolean isIncremental, StreamConnectionFactory factory, boolean connectSequentially) { this.connectionsPerHost = connectionsPerHost; this.factory = factory; this.keepSSTableLevel = keepSSTableLevel; this.isIncremental = isIncremental; this.connectSequentially = connectSequentially; } public void setConnectionFactory(StreamConnectionFactory factory) { this.factory = factory; }
Returns:true if any stream session is active
/** * @return true if any stream session is active */
public synchronized boolean hasActiveSessions() { for (HostStreamingData data : peerSessions.values()) { if (data.hasActiveSessions()) return true; } return false; } public synchronized Collection<StreamSession> getAllStreamSessions() { Collection<StreamSession> results = new ArrayList<>(); for (HostStreamingData data : peerSessions.values()) { results.addAll(data.getAllStreamSessions()); } return results; } public boolean isReceiving() { return connectionsPerHost == 0; } public void connect(StreamResultFuture future) { if (this.connectSequentially) connectSequentially(future); else connectAllStreamSessions(); } private void connectAllStreamSessions() { for (HostStreamingData data : peerSessions.values()) data.connectAllStreamSessions(); } private void connectSequentially(StreamResultFuture future) { sessionsToConnect = getAllStreamSessions().iterator(); future.addEventListener(new StreamEventHandler() { public void handleStreamEvent(StreamEvent event) { if (event.eventType == StreamEvent.Type.STREAM_PREPARED || event.eventType == StreamEvent.Type.STREAM_COMPLETE) connectNext(); } public void onSuccess(StreamState result) { } public void onFailure(Throwable t) { } }); connectNext(); } private void connectNext() { if (sessionsToConnect == null) return; if (sessionsToConnect.hasNext()) { StreamSession next = sessionsToConnect.next(); logger.debug("Connecting next session {} with {}.", next.planId(), next.peer.getHostAddress()); streamExecutor.execute(new StreamSessionConnector(next)); } else logger.debug("Finished connecting all sessions"); } public synchronized Set<InetAddress> getPeers() { return new HashSet<>(peerSessions.keySet()); } public synchronized StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connecting) { return getOrCreateHostData(peer).getOrCreateNextSession(peer, connecting); } public synchronized StreamSession getOrCreateSessionById(InetAddress peer, int id, InetAddress connecting) { return getOrCreateHostData(peer).getOrCreateSessionById(peer, id, connecting); } public synchronized void updateProgress(ProgressInfo info) { getHostData(info.peer).updateProgress(info); } public synchronized void addSessionInfo(SessionInfo session) { HostStreamingData data = getOrCreateHostData(session.peer); data.addSessionInfo(session); } public synchronized Set<SessionInfo> getAllSessionInfo() { Set<SessionInfo> result = new HashSet<>(); for (HostStreamingData data : peerSessions.values()) { result.addAll(data.getAllSessionInfo()); } return result; } public synchronized void transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails) { HostStreamingData sessionList = getOrCreateHostData(to); if (connectionsPerHost > 1) { List<List<StreamSession.SSTableStreamingSections>> buckets = sliceSSTableDetails(sstableDetails); for (List<StreamSession.SSTableStreamingSections> subList : buckets) { StreamSession session = sessionList.getOrCreateNextSession(to, to); session.addTransferFiles(subList); } } else { StreamSession session = sessionList.getOrCreateNextSession(to, to); session.addTransferFiles(sstableDetails); } } private List<List<StreamSession.SSTableStreamingSections>> sliceSSTableDetails(Collection<StreamSession.SSTableStreamingSections> sstableDetails) { // There's no point in divvying things up into more buckets than we have sstableDetails int targetSlices = Math.min(sstableDetails.size(), connectionsPerHost); int step = Math.round((float) sstableDetails.size() / (float) targetSlices); int index = 0; List<List<StreamSession.SSTableStreamingSections>> result = new ArrayList<>(); List<StreamSession.SSTableStreamingSections> slice = null; Iterator<StreamSession.SSTableStreamingSections> iter = sstableDetails.iterator(); while (iter.hasNext()) { StreamSession.SSTableStreamingSections streamSession = iter.next(); if (index % step == 0) { slice = new ArrayList<>(); result.add(slice); } slice.add(streamSession); ++index; iter.remove(); } return result; } private HostStreamingData getHostData(InetAddress peer) { HostStreamingData data = peerSessions.get(peer); if (data == null) throw new IllegalArgumentException("Unknown peer requested: " + peer); return data; } private HostStreamingData getOrCreateHostData(InetAddress peer) { HostStreamingData data = peerSessions.get(peer); if (data == null) { data = new HostStreamingData(); peerSessions.put(peer, data); } return data; } private static class StreamSessionConnector implements Runnable { private final StreamSession session; public StreamSessionConnector(StreamSession session) { this.session = session; } @Override public void run() { session.start(); logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", session.planId(), session.sessionIndex(), session.peer); } } private class HostStreamingData { private Map<Integer, StreamSession> streamSessions = new HashMap<>(); private Map<Integer, SessionInfo> sessionInfos = new HashMap<>(); private int lastReturned = -1; public boolean hasActiveSessions() { for (StreamSession session : streamSessions.values()) { StreamSession.State state = session.state(); if (state != StreamSession.State.COMPLETE && state != StreamSession.State.FAILED) return true; } return false; } public StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connecting) { // create if (streamSessions.size() < connectionsPerHost) { StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, isIncremental); streamSessions.put(++lastReturned, session); return session; } // get else { if (lastReturned >= streamSessions.size() - 1) lastReturned = 0; return streamSessions.get(lastReturned++); } } public void connectAllStreamSessions() { for (StreamSession session : streamSessions.values()) { streamExecutor.execute(new StreamSessionConnector(session)); } } public Collection<StreamSession> getAllStreamSessions() { return Collections.unmodifiableCollection(streamSessions.values()); } public StreamSession getOrCreateSessionById(InetAddress peer, int id, InetAddress connecting) { StreamSession session = streamSessions.get(id); if (session == null) { session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, isIncremental); streamSessions.put(id, session); } return session; } public void updateProgress(ProgressInfo info) { sessionInfos.get(info.sessionIndex).updateProgress(info); } public void addSessionInfo(SessionInfo info) { sessionInfos.put(info.sessionIndex, info); } public Collection<SessionInfo> getAllSessionInfo() { return sessionInfos.values(); } } @VisibleForTesting public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { ExecutorUtils.shutdownAndWait(timeout, unit, streamExecutor); } }