package io.vertx.ext.shell.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.ext.shell.Shell;
import io.vertx.ext.shell.ShellServer;
import io.vertx.ext.shell.ShellServerOptions;
import io.vertx.ext.shell.command.CommandBuilder;
import io.vertx.ext.shell.command.CommandResolver;
import io.vertx.ext.shell.session.impl.SessionImpl;
import io.vertx.ext.shell.system.Process;
import io.vertx.ext.shell.system.impl.InternalCommandManager;
import io.vertx.ext.shell.term.Term;
import io.vertx.ext.shell.term.TermServer;
import io.vertx.ext.shell.term.impl.SSHServer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class ShellServerImpl implements ShellServer {
private final Vertx vertx;
private final CopyOnWriteArrayList<CommandResolver> resolvers;
private final InternalCommandManager commandManager;
private final List<TermServer> termServers;
private final long timeoutMillis;
private final long reaperInterval;
private String welcomeMessage;
private boolean closed = true;
private long timerID = -1;
private final Map<String, ShellImpl> sessions;
private final Promise<Void> sessionsClosed = Promise.promise();
private Handler<Shell> shellHandler;
public ShellServerImpl(Vertx vertx, ShellServerOptions options) {
this.vertx = vertx;
this.welcomeMessage = options.getWelcomeMessage();
this.termServers = new ArrayList<>();
this.timeoutMillis = options.getSessionTimeout();
this.sessions = new ConcurrentHashMap<>();
this.reaperInterval = options.getReaperInterval();
this.resolvers = new CopyOnWriteArrayList<>();
this.commandManager = new InternalCommandManager(resolvers);
resolvers.add(() -> Arrays.asList(
CommandBuilder.command("exit").processHandler(process -> {}).build(vertx),
CommandBuilder.command("logout").processHandler(process -> {}).build(vertx),
CommandBuilder.command("jobs").processHandler(process -> {}).build(vertx),
CommandBuilder.command("fg").processHandler(process -> {}).build(vertx),
CommandBuilder.command("bg").processHandler(process -> {}).build(vertx)
));
}
@Override
public synchronized ShellServer registerCommandResolver(CommandResolver resolver) {
resolvers.add(0, resolver);
return this;
}
@Override
public synchronized ShellServer registerTermServer(TermServer termServer) {
termServers.add(termServer);
return this;
}
private void handleTerm(Term term) {
synchronized (this) {
if (closed) {
term.close();
return;
}
}
ShellImpl session = createShell(term);
session.setWelcome(welcomeMessage);
session.closedPromise.future().onComplete(ar -> {
boolean completeSessionClosed;
synchronized (ShellServerImpl.this) {
sessions.remove(session.id);
completeSessionClosed = sessions.isEmpty() && closed;
}
if (completeSessionClosed) {
sessionsClosed.complete();
}
});
session.init();
if (shellHandler != null) {
shellHandler.handle(session);
}
sessions.put(session.id, session);
session.readline();
}
@Override
public ShellServer listen(Handler<AsyncResult<Void>> listenHandler) {
List<TermServer> toStart;
synchronized (this) {
if (!closed) {
throw new IllegalStateException("Server listening");
}
toStart = termServers;
}
AtomicInteger count = new AtomicInteger(toStart.size());
if (count.get() == 0) {
synchronized (this) {
closed = false;
}
listenHandler.handle(Future.succeededFuture());
return this;
}
AtomicBoolean failed = new AtomicBoolean();
Handler<AsyncResult<Void>> handler = ar -> {
if (ar.failed()) {
failed.set(true);
}
if (count.decrementAndGet() == 0) {
if (failed.get()) {
listenHandler.handle(Future.failedFuture(ar.cause()));
toStart.forEach(TermServer::close);
} else {
synchronized (this) {
closed = false;
}
setTimer();
listenHandler.handle(Future.succeededFuture());
}
}
};
toStart.forEach(termServer -> {
if (termServer instanceof SSHServer) {
((SSHServer)termServer).setExecHandler(exec -> {
Process process = commandManager.createProcess(exec.command());
process.setSession(new SessionImpl());
process.setTty(exec);
process.terminatedHandler(exec::end);
process.run(true);
});
}
termServer.termHandler(this::handleTerm);
termServer.listen(handler);
});
return this;
}
private void evictSessions(long timerID) {
long now = System.currentTimeMillis();
Set<ShellImpl> toClose = new HashSet<>();
for (ShellImpl session: sessions.values()) {
if (now - session.lastAccessedTime() > timeoutMillis) {
toClose.add(session);
}
}
for (ShellImpl session: toClose) {
session.close();
}
setTimer();
}
private synchronized void setTimer() {
if (!closed && reaperInterval > 0) {
timerID = vertx.setTimer(reaperInterval, this::evictSessions);
}
}
@Override
public synchronized Shell createShell() {
return createShell(null);
}
@Override
public synchronized ShellImpl createShell(Term term) {
if (closed) {
throw new IllegalStateException("Closed");
}
return new ShellImpl(term, commandManager);
}
@Override
public void close(Handler<AsyncResult<Void>> completionHandler) {
List<TermServer> toStop;
List<ShellImpl> toClose;
synchronized (this) {
if (closed) {
toStop = Collections.emptyList();
toClose = Collections.emptyList();
} else {
closed = true;
if (timerID != -1) {
vertx.cancelTimer(timerID);
}
toStop = termServers;
toClose = new ArrayList<>(sessions.values());
if (toClose.isEmpty()) {
sessionsClosed.complete();
}
}
}
if (toStop.isEmpty() && toClose.isEmpty()) {
completionHandler.handle(Future.succeededFuture());
} else {
AtomicInteger count = new AtomicInteger(1 + toClose.size());
Handler<AsyncResult<Void>> handler = ar -> {
if (count.decrementAndGet() == 0) {
completionHandler.handle(Future.succeededFuture());
}
};
toClose.forEach(ShellImpl::close);
toStop.forEach(termServer -> termServer.close(handler));
sessionsClosed.future().onComplete(handler);
}
}
@Override
public void shellHandler(Handler<Shell> shellHandler) {
this.shellHandler = shellHandler;
}
}