//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.server;

import java.util.concurrent.Semaphore;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Blocking implementation of ContentProducer. Calling nextContent() will block when there is no available content but will never return null.
/** * Blocking implementation of {@link ContentProducer}. Calling {@link #nextContent()} will block when * there is no available content but will never return null. */
class BlockingContentProducer implements ContentProducer { private static final Logger LOG = LoggerFactory.getLogger(BlockingContentProducer.class); private final Semaphore _semaphore = new Semaphore(0); private final AsyncContentProducer _asyncContentProducer; BlockingContentProducer(AsyncContentProducer delegate) { _asyncContentProducer = delegate; } @Override public void recycle() { if (LOG.isDebugEnabled()) LOG.debug("recycling {}", this); _asyncContentProducer.recycle(); _semaphore.drainPermits(); } @Override public int available() { return _asyncContentProducer.available(); } @Override public boolean hasContent() { return _asyncContentProducer.hasContent(); } @Override public boolean isError() { return _asyncContentProducer.isError(); } @Override public void checkMinDataRate() { _asyncContentProducer.checkMinDataRate(); } @Override public long getRawContentArrived() { return _asyncContentProducer.getRawContentArrived(); } @Override public boolean consumeAll(Throwable x) { return _asyncContentProducer.consumeAll(x); } @Override public HttpInput.Content nextContent() { while (true) { HttpInput.Content content = _asyncContentProducer.nextContent(); if (LOG.isDebugEnabled()) LOG.debug("nextContent async producer returned {}", content); if (content != null) return content; // IFF isReady() returns false then HttpChannel.needContent() has been called, // thus we know that eventually a call to onContentProducible will come. if (_asyncContentProducer.isReady()) { if (LOG.isDebugEnabled()) LOG.debug("nextContent async producer is ready, retrying"); continue; } if (LOG.isDebugEnabled()) LOG.debug("nextContent async producer is not ready, waiting on semaphore {}", _semaphore); try { _semaphore.acquire(); } catch (InterruptedException e) { return new HttpInput.ErrorContent(e); } } } @Override public void reclaim(HttpInput.Content content) { _asyncContentProducer.reclaim(content); } @Override public boolean isReady() { boolean ready = available() > 0; if (LOG.isDebugEnabled()) LOG.debug("isReady = {}", ready); return ready; } @Override public HttpInput.Interceptor getInterceptor() { return _asyncContentProducer.getInterceptor(); } @Override public void setInterceptor(HttpInput.Interceptor interceptor) { _asyncContentProducer.setInterceptor(interceptor); } @Override public boolean onContentProducible() { // In blocking mode, the dispatched thread normally does not have to be rescheduled as it is normally in state // DISPATCHED blocked on the semaphore that just needs to be released for the dispatched thread to resume. This is why // this method always returns false. // But async errors can occur while the dispatched thread is NOT blocked reading (i.e.: in state WAITING), // so the WAITING to WOKEN transition must be done by the error-notifying thread which then has to reschedule the // dispatched thread after HttpChannelState.asyncError() is called. // Calling _asyncContentProducer.wakeup() changes the channel state from WAITING to WOKEN which would prevent the // subsequent call to HttpChannelState.asyncError() from rescheduling the thread. // AsyncServletTest.testStartAsyncThenClientStreamIdleTimeout() tests this. if (LOG.isDebugEnabled()) LOG.debug("onContentProducible releasing semaphore {}", _semaphore); _semaphore.release(); return false; } }