/*
 * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.internal.net.http;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import jdk.internal.net.http.common.Demand;
import java.net.http.HttpResponse.BodySubscriber;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.SequentialScheduler;

An adapter between BodySubscriber and Flow.Subscriber<String>.
/** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */
public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R> implements BodySubscriber<R> { private final CompletableFuture<R> cf = new MinimalFuture<>(); private final S subscriber; private final Function<? super S, ? extends R> finisher; private final Charset charset; private final String eol; private final AtomicBoolean subscribed = new AtomicBoolean(); private volatile LineSubscription downstream; private LineSubscriberAdapter(S subscriber, Function<? super S, ? extends R> finisher, Charset charset, String eol) { if (eol != null && eol.isEmpty()) throw new IllegalArgumentException("empty line separator"); this.subscriber = Objects.requireNonNull(subscriber); this.finisher = Objects.requireNonNull(finisher); this.charset = Objects.requireNonNull(charset); this.eol = eol; } @Override public void onSubscribe(Subscription subscription) { Objects.requireNonNull(subscription); if (!subscribed.compareAndSet(false, true)) { subscription.cancel(); return; } downstream = LineSubscription.create(subscription, charset, eol, subscriber, cf); subscriber.onSubscribe(downstream); } @Override public void onNext(List<ByteBuffer> item) { Objects.requireNonNull(item); try { downstream.submit(item); } catch (Throwable t) { onError(t); } } @Override public void onError(Throwable throwable) { Objects.requireNonNull(throwable); try { downstream.signalError(throwable); } finally { cf.completeExceptionally(throwable); } } @Override public void onComplete() { try { downstream.signalComplete(); } finally { cf.complete(finisher.apply(subscriber)); } } @Override public CompletionStage<R> getBody() { return cf; } public static <S extends Subscriber<? super String>, R> LineSubscriberAdapter<S, R> create(S subscriber, Function<? super S, ? extends R> finisher, Charset charset, String eol) { if (eol != null && eol.isEmpty()) throw new IllegalArgumentException("empty line separator"); return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber), Objects.requireNonNull(finisher), Objects.requireNonNull(charset), eol); } static final class LineSubscription implements Flow.Subscription { final Flow.Subscription upstreamSubscription; final CharsetDecoder decoder; final String newline; final Demand downstreamDemand; final ConcurrentLinkedDeque<ByteBuffer> queue; final SequentialScheduler scheduler; final Flow.Subscriber<? super String> upstream; final CompletableFuture<?> cf; private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); private final AtomicLong demanded = new AtomicLong(); private volatile boolean completed; private volatile boolean cancelled; private final char[] chars = new char[1024]; private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]); private final CharBuffer buffer = CharBuffer.wrap(chars); private final StringBuilder builder = new StringBuilder(); private String nextLine; private LineSubscription(Flow.Subscription s, CharsetDecoder dec, String separator, Flow.Subscriber<? super String> subscriber, CompletableFuture<?> completion) { downstreamDemand = new Demand(); queue = new ConcurrentLinkedDeque<>(); upstreamSubscription = Objects.requireNonNull(s); decoder = Objects.requireNonNull(dec); newline = separator; upstream = Objects.requireNonNull(subscriber); cf = Objects.requireNonNull(completion); scheduler = SequentialScheduler.synchronizedScheduler(this::loop); } @Override public void request(long n) { if (cancelled) return; if (downstreamDemand.increase(n)) { scheduler.runOrSchedule(); } } @Override public void cancel() { cancelled = true; upstreamSubscription.cancel(); } public void submit(List<ByteBuffer> list) { queue.addAll(list); demanded.decrementAndGet(); scheduler.runOrSchedule(); } public void signalComplete() { completed = true; scheduler.runOrSchedule(); } public void signalError(Throwable error) { if (errorRef.compareAndSet(null, Objects.requireNonNull(error))) { scheduler.runOrSchedule(); } } // This method looks at whether some bytes where left over (in leftover) // from decoding the previous buffer when the previous buffer was in // underflow. If so, it takes bytes one by one from the new buffer 'in' // and combines them with the leftover bytes until 'in' is exhausted or a // character was produced in 'out', resolving the previous underflow. // Returns true if the buffer is still in underflow, false otherwise. // However, in both situation some chars might have been produced in 'out'. private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput) throws CharacterCodingException { int limit = leftover.position(); if (limit == 0) { // no leftover return false; } else { CoderResult res = null; while (in.hasRemaining()) { leftover.position(limit); leftover.limit(++limit); leftover.put(in.get()); leftover.position(0); res = decoder.decode(leftover, out, endOfInput && !in.hasRemaining()); int remaining = leftover.remaining(); if (remaining > 0) { assert leftover.position() == 0; leftover.position(remaining); } else { leftover.position(0); } leftover.limit(leftover.capacity()); if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) { continue; } if (res.isError()) { res.throwException(); } assert !res.isOverflow(); return false; } return !endOfInput; } } // extract characters from start to end and remove them from // the StringBuilder private static String take(StringBuilder b, int start, int end) { assert start == 0; String line; if (end == start) return ""; line = b.substring(start, end); b.delete(start, end); return line; } // finds end of line, returns -1 if not found, or the position after // the line delimiter if found, removing the delimiter in the process. private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) { int len = b.length(); if (eol != null) { // delimiter explicitly specified int i = b.indexOf(eol); if (i >= 0) { // remove the delimiter and returns the position // of the char after it. b.delete(i, i + eol.length()); return i; } } else { // no delimiter specified, behaves as BufferedReader::readLine boolean crfound = false; for (int i = 0; i < len; i++) { char c = b.charAt(i); if (c == '\n') { // '\n' or '\r\n' found. // remove the delimiter and returns the position // of the char after it. b.delete(crfound ? i - 1 : i, i + 1); return crfound ? i - 1 : i; } else if (crfound) { // previous char was '\r', c != '\n' assert i != 0; // remove the delimiter and returns the position // of the char after it. b.delete(i - 1, i); return i - 1; } crfound = c == '\r'; } if (crfound && endOfInput) { // remove the delimiter and returns the position // of the char after it. b.delete(len - 1, len); return len - 1; } } return endOfInput && len > 0 ? len : -1; } // Looks at whether the StringBuilder contains a line. // Returns null if more character are needed. private static String nextLine(StringBuilder b, String eol, boolean endOfInput) { int next = endOfLine(b, eol, endOfInput); return (next > -1) ? take(b, 0, next) : null; } // Attempts to read the next line. Returns the next line if // the delimiter was found, null otherwise. The delimiters are // consumed. private String nextLine() throws CharacterCodingException { assert nextLine == null; LINES: while (nextLine == null) { boolean endOfInput = completed && queue.isEmpty(); nextLine = nextLine(builder, newline, endOfInput && leftover.position() == 0); if (nextLine != null) return nextLine; ByteBuffer b; BUFFERS: while ((b = queue.peek()) != null) { if (!b.hasRemaining()) { queue.poll(); continue BUFFERS; } BYTES: while (b.hasRemaining()) { buffer.position(0); buffer.limit(buffer.capacity()); boolean endofInput = completed && queue.size() <= 1; if (isUnderFlow(b, buffer, endofInput)) { assert !b.hasRemaining(); if (buffer.position() > 0) { buffer.flip(); builder.append(buffer); } continue BUFFERS; } CoderResult res = decoder.decode(b, buffer, endofInput); if (res.isError()) res.throwException(); if (buffer.position() > 0) { buffer.flip(); builder.append(buffer); continue LINES; } if (res.isUnderflow() && b.hasRemaining()) { //System.out.println("underflow: adding " + b.remaining() + " bytes"); leftover.put(b); assert !b.hasRemaining(); continue BUFFERS; } } } assert queue.isEmpty(); if (endOfInput) { // Time to cleanup: there may be some undecoded leftover bytes // We need to flush them out. // The decoder has been configured to replace malformed/unmappable // chars with some replacement, in order to behave like // InputStreamReader. leftover.flip(); buffer.position(0); buffer.limit(buffer.capacity()); // decode() must be called just before flush, even if there // is nothing to decode. We must do this even if leftover // has no remaining bytes. CoderResult res = decoder.decode(leftover, buffer, endOfInput); if (buffer.position() > 0) { buffer.flip(); builder.append(buffer); } if (res.isError()) res.throwException(); // Now call decoder.flush() buffer.position(0); buffer.limit(buffer.capacity()); res = decoder.flush(buffer); if (buffer.position() > 0) { buffer.flip(); builder.append(buffer); } if (res.isError()) res.throwException(); // It's possible that we reach here twice - just for the // purpose of checking that no bytes were left over, so // we reset leftover/decoder to make the function reentrant. leftover.position(0); leftover.limit(leftover.capacity()); decoder.reset(); // if some chars were produced then this call will // return them. return nextLine = nextLine(builder, newline, endOfInput); } return null; } return null; } // The main sequential scheduler loop. private void loop() { try { while (!cancelled) { Throwable error = errorRef.get(); if (error != null) { cancelled = true; scheduler.stop(); upstream.onError(error); cf.completeExceptionally(error); return; } if (nextLine == null) nextLine = nextLine(); if (nextLine == null) { if (completed) { scheduler.stop(); if (leftover.position() != 0) { // Underflow: not all bytes could be // decoded, but no more bytes will be coming. // This should not happen as we should already // have got a MalformedInputException, or // replaced the unmappable chars. errorRef.compareAndSet(null, new IllegalStateException( "premature end of input (" + leftover.position() + " undecoded bytes)")); continue; } else { upstream.onComplete(); } return; } else if (demanded.get() == 0 && !downstreamDemand.isFulfilled()) { long incr = Math.max(1, downstreamDemand.get()); demanded.addAndGet(incr); upstreamSubscription.request(incr); continue; } else return; } assert nextLine != null; assert newline != null && !nextLine.endsWith(newline) || !nextLine.endsWith("\n") || !nextLine.endsWith("\r"); if (downstreamDemand.tryDecrement()) { String forward = nextLine; nextLine = null; upstream.onNext(forward); } else return; // no demand: come back later } } catch (Throwable t) { try { upstreamSubscription.cancel(); } finally { signalError(t); } } } static LineSubscription create(Flow.Subscription s, Charset charset, String lineSeparator, Flow.Subscriber<? super String> upstream, CompletableFuture<?> cf) { return new LineSubscription(Objects.requireNonNull(s), Objects.requireNonNull(charset).newDecoder() // use the same decoder configuration than // java.io.InputStreamReader .onMalformedInput(CodingErrorAction.REPLACE) .onUnmappableCharacter(CodingErrorAction.REPLACE), lineSeparator, Objects.requireNonNull(upstream), Objects.requireNonNull(cf)); } } }