package io.vertx.kafka.client.common.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.impl.VertxInternal;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class CloseHandler {
private Closeable closeable;
private Runnable closeableHookCleanup;
private final BiConsumer<Long, Handler<AsyncResult<Void>>> close;
public CloseHandler(BiConsumer<Long, Handler<AsyncResult<Void>>> close) {
this.close = close;
}
public void registerCloseHook(VertxInternal vertx) {
registerCloseHook(vertx::addCloseHook, vertx::removeCloseHook);
}
public void registerCloseHook(Context context) {
registerCloseHook(context::addCloseHook, context::removeCloseHook);
}
private synchronized void registerCloseHook(Consumer<Closeable> addCloseHook, Consumer<Closeable> removeCloseHook) {
if (closeable == null) {
closeable = ar -> {
synchronized (CloseHandler.this) {
if (closeable == null) {
ar.handle(Future.succeededFuture());
return;
}
closeable = null;
}
close.accept(0L, ar);
};
closeableHookCleanup = () -> {
synchronized (CloseHandler.this) {
if (closeable != null) {
removeCloseHook.accept(closeable);
closeable = null;
}
}
};
addCloseHook.accept(closeable);
}
}
public synchronized void unregisterCloseHook() {
if (closeableHookCleanup != null) {
closeableHookCleanup.run();
}
}
public void close() {
unregisterCloseHook();
close.accept(0L, ar -> {});
}
public void close(Handler<AsyncResult<Void>> completionHandler) {
unregisterCloseHook();
close.accept(0L, completionHandler);
}
public void close(long timeout, Handler<AsyncResult<Void>> completionHandler) {
unregisterCloseHook();
close.accept(timeout, completionHandler);
}
}