package org.apache.cassandra.streaming;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;
public class StreamCoordinator
{
private static final Logger logger = LoggerFactory.getLogger(StreamCoordinator.class);
private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
FBUtilities.getAvailableProcessors());
private final boolean connectSequentially;
private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>();
private final int connectionsPerHost;
private StreamConnectionFactory factory;
private final boolean keepSSTableLevel;
private final boolean isIncremental;
private Iterator<StreamSession> sessionsToConnect = null;
public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, boolean isIncremental,
StreamConnectionFactory factory, boolean connectSequentially)
{
this.connectionsPerHost = connectionsPerHost;
this.factory = factory;
this.keepSSTableLevel = keepSSTableLevel;
this.isIncremental = isIncremental;
this.connectSequentially = connectSequentially;
}
public void setConnectionFactory(StreamConnectionFactory factory)
{
this.factory = factory;
}
public synchronized boolean hasActiveSessions()
{
for (HostStreamingData data : peerSessions.values())
{
if (data.hasActiveSessions())
return true;
}
return false;
}
public synchronized Collection<StreamSession> getAllStreamSessions()
{
Collection<StreamSession> results = new ArrayList<>();
for (HostStreamingData data : peerSessions.values())
{
results.addAll(data.getAllStreamSessions());
}
return results;
}
public boolean isReceiving()
{
return connectionsPerHost == 0;
}
public void connect(StreamResultFuture future)
{
if (this.connectSequentially)
connectSequentially(future);
else
connectAllStreamSessions();
}
private void connectAllStreamSessions()
{
for (HostStreamingData data : peerSessions.values())
data.connectAllStreamSessions();
}
private void connectSequentially(StreamResultFuture future)
{
sessionsToConnect = getAllStreamSessions().iterator();
future.addEventListener(new StreamEventHandler()
{
public void handleStreamEvent(StreamEvent event)
{
if (event.eventType == StreamEvent.Type.STREAM_PREPARED || event.eventType == StreamEvent.Type.STREAM_COMPLETE)
connectNext();
}
public void onSuccess(StreamState result)
{
}
public void onFailure(Throwable t)
{
}
});
connectNext();
}
private void connectNext()
{
if (sessionsToConnect == null)
return;
if (sessionsToConnect.hasNext())
{
StreamSession next = sessionsToConnect.next();
logger.debug("Connecting next session {} with {}.", next.planId(), next.peer.getHostAddress());
streamExecutor.execute(new StreamSessionConnector(next));
}
else
logger.debug("Finished connecting all sessions");
}
public synchronized Set<InetAddress> getPeers()
{
return new HashSet<>(peerSessions.keySet());
}
public synchronized StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connecting)
{
return getOrCreateHostData(peer).getOrCreateNextSession(peer, connecting);
}
public synchronized StreamSession getOrCreateSessionById(InetAddress peer, int id, InetAddress connecting)
{
return getOrCreateHostData(peer).getOrCreateSessionById(peer, id, connecting);
}
public synchronized void updateProgress(ProgressInfo info)
{
getHostData(info.peer).updateProgress(info);
}
public synchronized void addSessionInfo(SessionInfo session)
{
HostStreamingData data = getOrCreateHostData(session.peer);
data.addSessionInfo(session);
}
public synchronized Set<SessionInfo> getAllSessionInfo()
{
Set<SessionInfo> result = new HashSet<>();
for (HostStreamingData data : peerSessions.values())
{
result.addAll(data.getAllSessionInfo());
}
return result;
}
public synchronized void transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
{
HostStreamingData sessionList = getOrCreateHostData(to);
if (connectionsPerHost > 1)
{
List<List<StreamSession.SSTableStreamingSections>> buckets = sliceSSTableDetails(sstableDetails);
for (List<StreamSession.SSTableStreamingSections> subList : buckets)
{
StreamSession session = sessionList.getOrCreateNextSession(to, to);
session.addTransferFiles(subList);
}
}
else
{
StreamSession session = sessionList.getOrCreateNextSession(to, to);
session.addTransferFiles(sstableDetails);
}
}
private List<List<StreamSession.SSTableStreamingSections>> sliceSSTableDetails(Collection<StreamSession.SSTableStreamingSections> sstableDetails)
{
int targetSlices = Math.min(sstableDetails.size(), connectionsPerHost);
int step = Math.round((float) sstableDetails.size() / (float) targetSlices);
int index = 0;
List<List<StreamSession.SSTableStreamingSections>> result = new ArrayList<>();
List<StreamSession.SSTableStreamingSections> slice = null;
Iterator<StreamSession.SSTableStreamingSections> iter = sstableDetails.iterator();
while (iter.hasNext())
{
StreamSession.SSTableStreamingSections streamSession = iter.next();
if (index % step == 0)
{
slice = new ArrayList<>();
result.add(slice);
}
slice.add(streamSession);
++index;
iter.remove();
}
return result;
}
private HostStreamingData getHostData(InetAddress peer)
{
HostStreamingData data = peerSessions.get(peer);
if (data == null)
throw new IllegalArgumentException("Unknown peer requested: " + peer);
return data;
}
private HostStreamingData getOrCreateHostData(InetAddress peer)
{
HostStreamingData data = peerSessions.get(peer);
if (data == null)
{
data = new HostStreamingData();
peerSessions.put(peer, data);
}
return data;
}
private static class StreamSessionConnector implements Runnable
{
private final StreamSession session;
public StreamSessionConnector(StreamSession session)
{
this.session = session;
}
@Override
public void run()
{
session.start();
logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", session.planId(), session.sessionIndex(), session.peer);
}
}
private class HostStreamingData
{
private Map<Integer, StreamSession> streamSessions = new HashMap<>();
private Map<Integer, SessionInfo> sessionInfos = new HashMap<>();
private int lastReturned = -1;
public boolean hasActiveSessions()
{
for (StreamSession session : streamSessions.values())
{
StreamSession.State state = session.state();
if (state != StreamSession.State.COMPLETE && state != StreamSession.State.FAILED)
return true;
}
return false;
}
public StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connecting)
{
if (streamSessions.size() < connectionsPerHost)
{
StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, isIncremental);
streamSessions.put(++lastReturned, session);
return session;
}
else
{
if (lastReturned >= streamSessions.size() - 1)
lastReturned = 0;
return streamSessions.get(lastReturned++);
}
}
public void connectAllStreamSessions()
{
for (StreamSession session : streamSessions.values())
{
streamExecutor.execute(new StreamSessionConnector(session));
}
}
public Collection<StreamSession> getAllStreamSessions()
{
return Collections.unmodifiableCollection(streamSessions.values());
}
public StreamSession getOrCreateSessionById(InetAddress peer, int id, InetAddress connecting)
{
StreamSession session = streamSessions.get(id);
if (session == null)
{
session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, isIncremental);
streamSessions.put(id, session);
}
return session;
}
public void updateProgress(ProgressInfo info)
{
sessionInfos.get(info.sessionIndex).updateProgress(info);
}
public void addSessionInfo(SessionInfo info)
{
sessionInfos.put(info.sessionIndex, info);
}
public Collection<SessionInfo> getAllSessionInfo()
{
return sessionInfos.values();
}
}
@VisibleForTesting
public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
{
ExecutorUtils.shutdownAndWait(timeout, unit, streamExecutor);
}
}