package org.jboss.resteasy.plugins.providers.sse;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.sse.SseEventSink;

import org.jboss.resteasy.resteasy_jaxrs.i18n.LogMessages;
import org.jboss.resteasy.resteasy_jaxrs.i18n.Messages;

public class SseBroadcasterImpl implements SseBroadcaster
{
   private ConcurrentLinkedQueue<SseEventSink> outputQueue = new ConcurrentLinkedQueue<>();

   private final List<BiConsumer<SseEventSink, Throwable>> onErrorConsumers = new CopyOnWriteArrayList<>();

   private final List<Consumer<SseEventSink>> closeConsumers = new CopyOnWriteArrayList<>();

   private final AtomicBoolean closed = new AtomicBoolean();

   // Used to perform a mutual exclusion between register and close operations
   // since every registered SseEventSink needs to be closed when
   // SseBroadcaster.close() is invoked to prevent leaks due to SseEventSink
   // never closed.
   // Actually most of the time when a SseEventSink is registered to a
   // SseBroadcaster, user is expected its termination to be handled by the
   // SseBroadcaster itself. So user will never call SseEventSink.close() on
   // each SseEventSink he registers but instead he will just call
   // SseBroadcaster.close().
   private final Lock readLock;
   private final Lock writeLock;

   public SseBroadcasterImpl()
   {
      ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
      this.readLock = readWriteLock.readLock();
      this.writeLock = readWriteLock.writeLock();
   }

   @Override
   public void close()
   {
      if (!closed.compareAndSet(false, true))
      {
         return;
      }
      writeLock.lock();
      try
      {
         //Javadoc says close the broadcaster and all subscribed {@link SseEventSink} instances.
         //is it necessay to close the subsribed SseEventSink ?
         outputQueue.forEach(eventSink -> {
            try {
               eventSink.close();
             } catch (RuntimeException e) {
                LogMessages.LOGGER.debug(e.getLocalizedMessage());
             } finally {
                notifyOnCloseListeners(eventSink);
             }
         });
      }
      finally
      {
         writeLock.unlock();
      }
   }

   private void checkClosed()
   {
      if (closed.get())
      {
         throw new IllegalStateException(Messages.MESSAGES.sseBroadcasterIsClosed());
      }
   }

   private void notifyOnCloseListeners(SseEventSink eventSink)
   {
      // First remove the eventSink from the outputQueue to ensure that
      // concurrent calls to this method will notify listeners only once for a
      // given eventSink instance.
      if (outputQueue.remove(eventSink))
      {
         closeConsumers.forEach(consumer -> {
            consumer.accept(eventSink);
         });
      }
   }

   private void notifyOnErrorListeners(SseEventSink eventSink, Throwable throwable)
   {
      // We have to notify close listeners if the SSE event output has been
      // closed (either by client closing the connection (IOException) or by
      // calling SseEventSink.close() (IllegalStateException) on the server
      // side).
      if (throwable instanceof IOException || throwable instanceof IllegalStateException)
      {
         notifyOnCloseListeners(eventSink);
      }
      onErrorConsumers.forEach(consumer -> {
         consumer.accept(eventSink, throwable);
      });
   }

   @Override
   public void onError(BiConsumer<SseEventSink, Throwable> onError)
   {
      checkClosed();
      onErrorConsumers.add(onError);
   }

   @Override
   public void onClose(Consumer<SseEventSink> onClose)
   {
      checkClosed();
      closeConsumers.add(onClose);
   }

   @Override
   public void register(SseEventSink sseEventSink)
   {
      checkClosed();
      readLock.lock();
      try
      {
         checkClosed();
         outputQueue.add(sseEventSink);
      }
      finally
      {
         readLock.unlock();
      }
   }

   @Override
   public CompletionStage<?> broadcast(OutboundSseEvent event)
   {
      checkClosed();
      //return event immediately and doesn't block anything
      return CompletableFuture.runAsync(() -> {
         outputQueue.forEach(eventSink -> {
            SseEventSink outputImpl = eventSink;
            try
            {
               outputImpl.send(event).whenComplete((object, err) -> {
                  if (err != null)
                  {
                     notifyOnErrorListeners(eventSink, err);
                  }
               });
            }
            catch (IllegalStateException e)
            {
               notifyOnErrorListeners(eventSink, e);
            }
         });
      });
   }

}