/*
 * JBoss, Home of Professional Open Source
 *
 * Copyright 2013 Red Hat, Inc. and/or its affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.xnio.http;

import static org.xnio.IoUtils.safeClose;
import static org.xnio._private.Messages.msg;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

import org.xnio.ChannelExceptionHandler;
import org.xnio.ChannelListener;
import org.xnio.ChannelListeners;
import org.xnio.FutureResult;
import org.xnio.IoFuture;
import org.xnio.OptionMap;
import org.xnio.Pooled;
import org.xnio.StreamConnection;
import org.xnio.XnioWorker;
import org.xnio.channels.BoundChannel;
import org.xnio.ssl.SslConnection;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.channels.StreamSourceChannel;
import org.xnio.conduits.PushBackStreamSourceConduit;
import org.xnio.conduits.StreamSourceConduit;
import org.xnio.ssl.XnioSsl;

Simple HTTP client that can perform a HTTP upgrade. This is not a general purpose HTTP client, all it can do is upgrade a HTTP
Author:Stuart Douglas
/** * Simple HTTP client that can perform a HTTP upgrade. This is not a general purpose HTTP * client, all it can do is upgrade a HTTP * * @author Stuart Douglas */
public class HttpUpgrade {
Perform a HTTP upgrade that results in a SSL secured connection. This method should be used if the target endpoint is using https
Params:
  • worker – The worker
  • ssl – The XnioSsl instance
  • bindAddress – The bind address
  • uri – The URI to connect to
  • headers – Any additional headers to include in the upgrade request. This must include an Upgrade header that specifies the type of upgrade being performed
  • openListener – The open listener that is invoked once the HTTP upgrade is done
  • bindListener – The bind listener that is invoked when the socket is bound
  • optionMap – The option map for the connection
  • handshakeChecker – A handshake checker that can be supplied to verify that the server returned a valid response to the upgrade request
Returns:An IoFuture of the connection
/** * Perform a HTTP upgrade that results in a SSL secured connection. This method should be used if the target endpoint is using https * * @param worker The worker * @param ssl The XnioSsl instance * @param bindAddress The bind address * @param uri The URI to connect to * @param headers Any additional headers to include in the upgrade request. This must include an <code>Upgrade</code> header that specifies the type of upgrade being performed * @param openListener The open listener that is invoked once the HTTP upgrade is done * @param bindListener The bind listener that is invoked when the socket is bound * @param optionMap The option map for the connection * @param handshakeChecker A handshake checker that can be supplied to verify that the server returned a valid response to the upgrade request * @return An IoFuture of the connection */
public static IoFuture<SslConnection> performUpgrade(final XnioWorker worker, XnioSsl ssl, InetSocketAddress bindAddress, URI uri, final Map<String, String> headers, ChannelListener<? super SslConnection> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap, HandshakeChecker handshakeChecker) { return new HttpUpgradeState<SslConnection>(worker, ssl, bindAddress, uri, headers, openListener, bindListener, optionMap, handshakeChecker).doUpgrade(); }
Perform a HTTP upgrade that results in a SSL secured connection. This method should be used if the target endpoint is using https
Params:
  • worker – The worker
  • ssl – The XnioSsl instance
  • bindAddress – The bind address
  • uri – The URI to connect to
  • headers – Any additional headers to include in the upgrade request. This must include an Upgrade header that specifies the type of upgrade being performed
  • openListener – The open listener that is invoked once the HTTP upgrade is done
  • bindListener – The bind listener that is invoked when the socket is bound
  • optionMap – The option map for the connection
  • handshakeChecker – A handshake checker that can be supplied to verify that the server returned a valid response to the upgrade request
Returns:An IoFuture of the connection
/** * Perform a HTTP upgrade that results in a SSL secured connection. This method should be used if the target endpoint is using https * * @param worker The worker * @param ssl The XnioSsl instance * @param bindAddress The bind address * @param uri The URI to connect to * @param headers Any additional headers to include in the upgrade request. This must include an <code>Upgrade</code> header that specifies the type of upgrade being performed * @param openListener The open listener that is invoked once the HTTP upgrade is done * @param bindListener The bind listener that is invoked when the socket is bound * @param optionMap The option map for the connection * @param handshakeChecker A handshake checker that can be supplied to verify that the server returned a valid response to the upgrade request * @return An IoFuture of the connection */
public static IoFuture<SslConnection> performUpgrade(final XnioWorker worker, XnioSsl ssl, InetSocketAddress bindAddress, URI uri, final Map<String, List<String>> headers, ChannelListener<? super SslConnection> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap, ExtendedHandshakeChecker handshakeChecker) { return new HttpUpgradeState<SslConnection>(worker, ssl, bindAddress, uri, headers, openListener, bindListener, optionMap, handshakeChecker).doUpgrade(); }
Connects to the target server using HTTP upgrade.
Params:
  • worker – The worker
  • bindAddress – The bind address
  • uri – The URI to connect to
  • headers – Any additional headers to include in the upgrade request. This must include an Upgrade header that specifies the type of upgrade being performed
  • openListener – The open listener that is invoked once the HTTP upgrade is done
  • bindListener – The bind listener that is invoked when the socket is bound
  • optionMap – The option map for the connection
  • handshakeChecker – A handshake checker that can be supplied to verify that the server returned a valid response to the upgrade request
Returns:An IoFuture of the connection
/** * Connects to the target server using HTTP upgrade. * * @param worker The worker * @param bindAddress The bind address * @param uri The URI to connect to * @param headers Any additional headers to include in the upgrade request. This must include an <code>Upgrade</code> header that specifies the type of upgrade being performed * @param openListener The open listener that is invoked once the HTTP upgrade is done * @param bindListener The bind listener that is invoked when the socket is bound * @param optionMap The option map for the connection * @param handshakeChecker A handshake checker that can be supplied to verify that the server returned a valid response to the upgrade request * @return An IoFuture of the connection */
public static IoFuture<StreamConnection> performUpgrade(final XnioWorker worker, InetSocketAddress bindAddress, URI uri, final Map<String, String> headers, ChannelListener<? super StreamConnection> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap, HandshakeChecker handshakeChecker) { return new HttpUpgradeState<StreamConnection>(worker, null, bindAddress, uri, headers, openListener, bindListener, optionMap, handshakeChecker).doUpgrade(); }
Connects to the target server using HTTP upgrade.
Params:
  • worker – The worker
  • bindAddress – The bind address
  • uri – The URI to connect to
  • headers – Any additional headers to include in the upgrade request. This must include an Upgrade header that specifies the type of upgrade being performed
  • openListener – The open listener that is invoked once the HTTP upgrade is done
  • bindListener – The bind listener that is invoked when the socket is bound
  • optionMap – The option map for the connection
  • handshakeChecker – A handshake checker that can be supplied to verify that the server returned a valid response to the upgrade request
Returns:An IoFuture of the connection
/** * Connects to the target server using HTTP upgrade. * * @param worker The worker * @param bindAddress The bind address * @param uri The URI to connect to * @param headers Any additional headers to include in the upgrade request. This must include an <code>Upgrade</code> header that specifies the type of upgrade being performed * @param openListener The open listener that is invoked once the HTTP upgrade is done * @param bindListener The bind listener that is invoked when the socket is bound * @param optionMap The option map for the connection * @param handshakeChecker A handshake checker that can be supplied to verify that the server returned a valid response to the upgrade request * @return An IoFuture of the connection */
public static IoFuture<StreamConnection> performUpgrade(final XnioWorker worker, InetSocketAddress bindAddress, URI uri, final Map<String, List<String>> headers, ChannelListener<? super StreamConnection> openListener, ChannelListener<? super BoundChannel> bindListener, OptionMap optionMap, ExtendedHandshakeChecker handshakeChecker) { return new HttpUpgradeState<StreamConnection>(worker, null, bindAddress, uri, headers, openListener, bindListener, optionMap, handshakeChecker).doUpgrade(); }
Performs a HTTP upgrade on an existing connection.
Params:
  • connection – The existing connection to upgrade
  • uri – The URI to connect to
  • headers – Any additional headers to include in the upgrade request. This must include an Upgrade header that specifies the type of upgrade being performed
  • openListener – The open listener that is invoked once the HTTP upgrade is done
  • handshakeChecker – A handshake checker that can be supplied to verify that the server returned a valid response to the upgrade request
Returns:An IoFuture of the connection
/** * Performs a HTTP upgrade on an existing connection. * * @param connection The existing connection to upgrade * @param uri The URI to connect to * @param headers Any additional headers to include in the upgrade request. This must include an <code>Upgrade</code> header that specifies the type of upgrade being performed * @param openListener The open listener that is invoked once the HTTP upgrade is done * @param handshakeChecker A handshake checker that can be supplied to verify that the server returned a valid response to the upgrade request * @return An IoFuture of the connection */
public static <T extends StreamConnection> IoFuture<T> performUpgrade(final T connection, URI uri, final Map<String, String> headers, ChannelListener<? super StreamConnection> openListener, HandshakeChecker handshakeChecker) { return new HttpUpgradeState<T>(connection, uri, headers, openListener, handshakeChecker).upgradeExistingConnection(); }
Performs a HTTP upgrade on an existing connection.
Params:
  • connection – The existing connection to upgrade
  • uri – The URI to connect to
  • headers – Any additional headers to include in the upgrade request. This must include an Upgrade header that specifies the type of upgrade being performed
  • openListener – The open listener that is invoked once the HTTP upgrade is done
  • handshakeChecker – A handshake checker that can be supplied to verify that the server returned a valid response to the upgrade request
Returns:An IoFuture of the connection
/** * Performs a HTTP upgrade on an existing connection. * * @param connection The existing connection to upgrade * @param uri The URI to connect to * @param headers Any additional headers to include in the upgrade request. This must include an <code>Upgrade</code> header that specifies the type of upgrade being performed * @param openListener The open listener that is invoked once the HTTP upgrade is done * @param handshakeChecker A handshake checker that can be supplied to verify that the server returned a valid response to the upgrade request * @return An IoFuture of the connection */
public static <T extends StreamConnection> IoFuture<T> performUpgrade(final T connection, URI uri, final Map<String, List<String>> headers, ChannelListener<? super StreamConnection> openListener, ExtendedHandshakeChecker handshakeChecker) { return new HttpUpgradeState<T>(connection, uri, headers, openListener, handshakeChecker).upgradeExistingConnection(); } private HttpUpgrade() { } private static class HttpUpgradeState<T extends StreamConnection> { private final XnioWorker worker; private final XnioSsl ssl; private final InetSocketAddress bindAddress; private final URI uri; private final Map<String, List<String>> headers; private final ChannelListener<? super T> openListener; private final ChannelListener<? super BoundChannel> bindListener; private final OptionMap optionMap; private final Object handshakeChecker; private final FutureResult<T> future = new FutureResult<T>(); private T connection; private HttpUpgradeState(final XnioWorker worker, final XnioSsl ssl, final InetSocketAddress bindAddress, final URI uri, final Map<String, String> headers, final ChannelListener<? super T> openListener, final ChannelListener<? super BoundChannel> bindListener, final OptionMap optionMap, final HandshakeChecker handshakeChecker) { this.worker = worker; this.ssl = ssl; this.bindAddress = bindAddress; this.uri = uri; this.openListener = openListener; this.bindListener = bindListener; this.optionMap = optionMap; this.handshakeChecker = handshakeChecker; Map<String, List<String>> newHeaders = new HashMap<>(); for(Map.Entry<String, String> entry : headers.entrySet()) { newHeaders.put(entry.getKey(), Collections.singletonList(entry.getValue())); } this.headers = newHeaders; } private HttpUpgradeState(final XnioWorker worker, final XnioSsl ssl, final InetSocketAddress bindAddress, final URI uri, final Map<String, List<String>> headers, final ChannelListener<? super T> openListener, final ChannelListener<? super BoundChannel> bindListener, final OptionMap optionMap, final ExtendedHandshakeChecker handshakeChecker) { this.worker = worker; this.ssl = ssl; this.bindAddress = bindAddress; this.uri = uri; this.headers = headers; this.openListener = openListener; this.bindListener = bindListener; this.optionMap = optionMap; this.handshakeChecker = handshakeChecker; } public HttpUpgradeState(final T connection, final URI uri, final Map<String, String> headers, final ChannelListener<? super StreamConnection> openListener, final HandshakeChecker handshakeChecker) { this.worker = connection.getWorker(); this.ssl = null; this.bindAddress = null; this.uri = uri; this.openListener = openListener; this.bindListener = null; this.optionMap = OptionMap.EMPTY; this.handshakeChecker = handshakeChecker; this.connection = connection; Map<String, List<String>> newHeaders = new HashMap<>(); for(Map.Entry<String, String> entry : headers.entrySet()) { newHeaders.put(entry.getKey(), Collections.singletonList(entry.getValue())); } this.headers = newHeaders; } public HttpUpgradeState(final T connection, final URI uri, final Map<String, List<String>> headers, final ChannelListener<? super StreamConnection> openListener, final ExtendedHandshakeChecker handshakeChecker) { this.worker = connection.getWorker(); this.ssl = null; this.bindAddress = null; this.uri = uri; this.headers = headers; this.openListener = openListener; this.bindListener = null; this.optionMap = OptionMap.EMPTY; this.handshakeChecker = handshakeChecker; this.connection = connection; } private IoFuture<T> doUpgrade() { InetSocketAddress address = new InetSocketAddress(uri.getHost(), uri.getPort()); final ChannelListener<StreamConnection> connectListener = new ConnectionOpenListener(); final String scheme = uri.getScheme(); if (scheme.equals("http")) { if (bindAddress == null) { worker.openStreamConnection(address, connectListener, bindListener, optionMap).addNotifier(new FailureNotifier(), null); } else { worker.openStreamConnection(bindAddress, address, connectListener, bindListener, optionMap).addNotifier(new FailureNotifier(), null); } } else if (scheme.equals("https")) { if (ssl == null) { throw msg.missingSslProvider(); } if (bindAddress == null) { ssl.openSslConnection(worker, address, connectListener, bindListener, optionMap).addNotifier(new FailureNotifier(), null); } else { ssl.openSslConnection(worker, bindAddress, address, connectListener, bindListener, optionMap).addNotifier(new FailureNotifier(), null); } } else { throw msg.invalidURLScheme(scheme); } return future.getIoFuture(); } private String buildHttpRequest() { final StringBuilder builder = new StringBuilder(); builder.append("GET "); builder.append(uri.getPath().isEmpty() ? "/" : uri.getPath()); if(uri.getQuery() != null && !uri.getQuery().isEmpty()) { builder.append('?'); builder.append(uri.getQuery()); } builder.append(" HTTP/1.1\r\n"); final Set<String> seen = new HashSet<String>(); for (Map.Entry<String, List<String>> headerEntry : headers.entrySet()) { for(String value : headerEntry.getValue()) { builder.append(headerEntry.getKey()); builder.append(": "); builder.append(value); builder.append("\r\n"); seen.add(headerEntry.getKey().toLowerCase(Locale.ENGLISH)); } } if (!seen.contains("host")) { builder.append("Host: "); builder.append(getHost()); builder.append("\r\n"); } if (!seen.contains("connection")) { builder.append("Connection: upgrade\r\n"); } if (!seen.contains("upgrade")) { throw new IllegalArgumentException("Upgrade: header was not supplied in header arguments"); } builder.append("\r\n"); return builder.toString(); } private String getHost() { String scheme = uri.getScheme(); int port = uri.getPort(); if (port < 0 || "http".equals(scheme) && port == 80 || "https".equals(scheme) && port == 443) { // No port or default port. return uri.getHost(); } return uri.getHost() + ":" + port; } public IoFuture<T> upgradeExistingConnection() { final ChannelListener<StreamConnection> connectListener = new ConnectionOpenListener(); connectListener.handleEvent(connection); return future.getIoFuture(); } private class ConnectionOpenListener implements ChannelListener<StreamConnection> { @Override public void handleEvent(final StreamConnection channel) { connection = (T) channel; final ByteBuffer buffer = ByteBuffer.wrap(buildHttpRequest().getBytes()); int r; do { try { r = channel.getSinkChannel().write(buffer); if (r == 0) { channel.getSinkChannel().getWriteSetter().set(new StringWriteListener(buffer)); channel.getSinkChannel().resumeWrites(); return; } } catch (IOException e) { safeClose(channel); future.setException(e); return; } } while (buffer.hasRemaining()); flushUpgradeChannel(); } } private void flushUpgradeChannel() { try { if(!connection.getSinkChannel().flush()) { connection.getSinkChannel().getWriteSetter().set(ChannelListeners.flushingChannelListener(new ChannelListener<StreamSinkChannel>() { @Override public void handleEvent(StreamSinkChannel channel) { channel.suspendWrites(); new UpgradeResultListener().handleEvent(connection.getSourceChannel()); } }, new ChannelExceptionHandler<StreamSinkChannel>() { @Override public void handleException(StreamSinkChannel channel, IOException exception) { safeClose(channel); future.setException(exception); } })); connection.getSinkChannel().resumeWrites(); return; } } catch (IOException e) { safeClose(connection); future.setException(e); return; } new UpgradeResultListener().handleEvent(connection.getSourceChannel()); } private final class StringWriteListener implements ChannelListener<StreamSinkChannel> { final ByteBuffer buffer; private StringWriteListener(final ByteBuffer buffer) { this.buffer = buffer; } @Override public void handleEvent(final StreamSinkChannel channel) { int r; do { try { r = channel.write(buffer); if (r == 0) { return; } } catch (IOException e) { safeClose(channel); future.setException(e); return; } } while (buffer.hasRemaining()); channel.suspendWrites(); flushUpgradeChannel(); } } private final class UpgradeResultListener implements ChannelListener<StreamSourceChannel> { private final HttpUpgradeParser parser = new HttpUpgradeParser(); private ByteBuffer buffer = ByteBuffer.allocate(1024); @Override public void handleEvent(final StreamSourceChannel channel) { int r; do { try { r = channel.read(buffer); if (r == 0) { channel.getReadSetter().set(this); channel.resumeReads(); return; } else if (r == -1) { throw msg.connectionClosedEarly(); } buffer.flip(); parser.parse(buffer); if(!parser.isComplete()) { buffer.compact(); } } catch (IOException e) { safeClose(channel); future.setException(e); return; } } while (!parser.isComplete()); channel.suspendReads(); if (buffer.hasRemaining()) { StreamSourceConduit orig = connection.getSourceChannel().getConduit(); PushBackStreamSourceConduit pushBack = new PushBackStreamSourceConduit(orig); pushBack.pushBack(new Pooled<ByteBuffer>() { @Override public void discard() { buffer = null; } @Override public void free() { buffer = null; } @Override public ByteBuffer getResource() throws IllegalStateException { return buffer; } @Override public void close() { free(); } }); connection.getSourceChannel().setConduit(pushBack); } //ok, we have a response if (parser.getResponseCode() == 101) { // Switching Protocols handleUpgrade(parser); } else if (parser.getResponseCode() == 301 || // Moved Permanently parser.getResponseCode() == 302 || // Found parser.getResponseCode() == 303 || // See Other parser.getResponseCode() == 307 || // Temporary Redirect parser.getResponseCode() == 308) { // Permanent Redirect safeClose(connection); handleRedirect(parser); } else { safeClose(connection); future.setException(new UpgradeFailedException("Invalid response code " + parser.getResponseCode())); } } } private void handleUpgrade(final HttpUpgradeParser parser) { Map<String, String> simpleHeaders = new HashMap<>(); for(Map.Entry<String, List<String>> e : parser.getHeaders().entrySet()) { simpleHeaders.put(e.getKey(), e.getValue().get(0)); } final String contentLength = simpleHeaders.get("content-length"); if (contentLength != null) { if (!"0".equals(contentLength)) { future.setException(new IOException("Upgrade responses must have a content length of zero.")); return; } } final String transferCoding = simpleHeaders.get("transfer-encoding"); if (transferCoding != null) { future.setException(new IOException("Upgrade responses cannot have a transfer coding")); return; } if (handshakeChecker != null) { try { if(handshakeChecker instanceof ExtendedHandshakeChecker) { ((ExtendedHandshakeChecker) handshakeChecker).checkHandshakeExtended(parser.getHeaders()); } else { ((HandshakeChecker)handshakeChecker).checkHandshake(simpleHeaders); } } catch (IOException e) { safeClose(connection); future.setException(e); return; } } future.setResult(connection); ChannelListeners.invokeChannelListener(connection, openListener); } private void handleRedirect(final HttpUpgradeParser parser) { List<String> location = parser.getHeaders().get("location"); future.setException(new RedirectException(msg.redirect(), parser.getResponseCode(), location == null ? null : location.get(0))); } private class FailureNotifier extends IoFuture.HandlingNotifier<StreamConnection, Object> { @Override public void handleFailed(IOException exception, Object attachment) { future.setException(exception); } @Override public void handleCancelled(Object attachment) { future.setCancelled(); } } } }