package org.I0Itec.zkclient;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class GatewayThread extends Thread {
protected final static Logger LOG = LoggerFactory.getLogger(GatewayThread.class);
private final int _port;
private final int _destinationPort;
private ServerSocket _serverSocket;
private Lock _lock = new ReentrantLock();
private Condition _runningCondition = _lock.newCondition();
private boolean _running = false;
public GatewayThread(int port, int destinationPort) {
_port = port;
_destinationPort = destinationPort;
setDaemon(true);
}
@Override
public void run() {
final List<Thread> runningThreads = new Vector<Thread>();
try {
LOG.info("Starting gateway on port " + _port + " pointing to port " + _destinationPort);
_serverSocket = new ServerSocket(_port);
_lock.lock();
try {
_running = true;
_runningCondition.signalAll();
} finally {
_lock.unlock();
}
while (true) {
final Socket socket = _serverSocket.accept();
LOG.info("new client is connected " + socket.getInetAddress());
final InputStream incomingInputStream = socket.getInputStream();
final OutputStream incomingOutputStream = socket.getOutputStream();
final Socket outgoingSocket;
try {
outgoingSocket = new Socket("localhost", _destinationPort);
} catch (Exception e) {
LOG.warn("could not connect to " + _destinationPort);
continue;
}
final InputStream outgoingInputStream = outgoingSocket.getInputStream();
final OutputStream outgoingOutputStream = outgoingSocket.getOutputStream();
Thread writeThread = new Thread() {
@Override
public void run() {
runningThreads.add(this);
try {
int read = -1;
while ((read = incomingInputStream.read()) != -1) {
outgoingOutputStream.write(read);
}
} catch (IOException e) {
} finally {
closeQuietly(outgoingOutputStream);
runningThreads.remove(this);
}
}
@Override
public void interrupt() {
try {
socket.close();
outgoingSocket.close();
} catch (IOException e) {
LOG.error("error on stopping closing sockets", e);
}
super.interrupt();
}
};
Thread readThread = new Thread() {
@Override
public void run() {
runningThreads.add(this);
try {
int read = -1;
while ((read = outgoingInputStream.read()) != -1) {
incomingOutputStream.write(read);
}
} catch (IOException e) {
} finally {
closeQuietly(incomingOutputStream);
runningThreads.remove(this);
}
}
};
writeThread.setDaemon(true);
readThread.setDaemon(true);
writeThread.start();
readThread.start();
}
} catch (SocketException e) {
if (!_running) {
throw ExceptionUtil.convertToRuntimeException(e);
}
LOG.info("Stopping gateway");
} catch (Exception e) {
LOG.error("error on gateway execution", e);
}
for (Thread thread : new ArrayList<Thread>(runningThreads)) {
thread.interrupt();
try {
thread.join();
} catch (InterruptedException e) {
}
}
}
protected void closeQuietly(Closeable closable) {
try {
closable.close();
} catch (IOException e) {
}
}
@Override
public void interrupt() {
try {
_serverSocket.close();
} catch (Exception cE) {
LOG.error("error on stopping gateway", cE);
}
super.interrupt();
}
public void interruptAndJoin() throws InterruptedException {
interrupt();
join();
}
public void awaitUp() {
_lock.lock();
try {
while (!_running) {
_runningCondition.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
_lock.unlock();
}
}
}