/*
 * JBoss, Home of Professional Open Source.
 * Copyright 2014 Red Hat, Inc., and individual contributors
 * as indicated by the @author tags.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package io.undertow.websockets.extensions;

import io.undertow.connector.ByteBufferPool;
import io.undertow.connector.PooledByteBuffer;
import io.undertow.util.ImmediatePooledByteBuffer;
import io.undertow.websockets.core.StreamSinkFrameChannel;
import io.undertow.websockets.core.StreamSourceFrameChannel;
import io.undertow.websockets.core.WebSocketChannel;
import io.undertow.websockets.core.WebSocketLogger;
import io.undertow.websockets.core.WebSocketMessages;
import org.xnio.Buffers;
import org.xnio.IoUtils;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;

Implementation of permessage-deflate WebSocket Extension.

This implementation supports parameters: server_no_context_takeover, client_no_context_takeover .

This implementation does not support parameters: server_max_window_bits, client_max_window_bits .

It uses the DEFLATE implementation algorithm packaged on Deflater and Inflater classes.

Author:Lucas Ponce
See Also:
/** * Implementation of {@code permessage-deflate} WebSocket Extension. * <p> * This implementation supports parameters: {@code server_no_context_takeover, client_no_context_takeover} . * <p> * This implementation does not support parameters: {@code server_max_window_bits, client_max_window_bits} . * <p> * It uses the DEFLATE implementation algorithm packaged on {@link Deflater} and {@link Inflater} classes. * * @author Lucas Ponce * @see <a href="http://tools.ietf.org/html/draft-ietf-hybi-permessage-compression-18">Compression Extensions for WebSocket</a> */
public class PerMessageDeflateFunction implements ExtensionFunction { private static final byte[] TAIL = new byte[]{0x00, 0x00, (byte) 0xFF, (byte) 0xFF}; private final int deflaterLevel; private final boolean compressContextTakeover; private final boolean decompressContextTakeover; private final Inflater decompress; private final Deflater compress; private StreamSourceFrameChannel currentReadChannel;
Create a new PerMessageDeflateExtension instance.
Params:
  • deflaterLevel – the level of configuration of DEFLATE algorithm implementation
  • compressContextTakeover – flag for compressor context takeover or without compressor context
  • decompressContextTakeover – flag for decompressor context takeover or without decompressor context
/** * Create a new {@code PerMessageDeflateExtension} instance. * * @param deflaterLevel the level of configuration of DEFLATE algorithm implementation * @param compressContextTakeover flag for compressor context takeover or without compressor context * @param decompressContextTakeover flag for decompressor context takeover or without decompressor context */
public PerMessageDeflateFunction(final int deflaterLevel, boolean compressContextTakeover, boolean decompressContextTakeover) { this.deflaterLevel = deflaterLevel; this.decompress = new Inflater(true); this.compress = new Deflater(this.deflaterLevel, true); this.compressContextTakeover = compressContextTakeover; this.decompressContextTakeover = decompressContextTakeover; } @Override public int writeRsv(int rsv) { return rsv | RSV1; } @Override public boolean hasExtensionOpCode() { return false; } @Override public synchronized PooledByteBuffer transformForWrite(PooledByteBuffer pooledBuffer, StreamSinkFrameChannel channel, boolean lastFrame) throws IOException { ByteBuffer buffer = pooledBuffer.getBuffer(); PooledByteBuffer inputBuffer = null; if (buffer.hasArray()) { compress.setInput(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); } else { inputBuffer = toArrayBacked(buffer, channel.getWebSocketChannel().getBufferPool()); compress.setInput(inputBuffer.getBuffer().array(), inputBuffer.getBuffer().arrayOffset() + inputBuffer.getBuffer().position(), inputBuffer.getBuffer().remaining()); } PooledByteBuffer output = allocateBufferWithArray(channel.getWebSocketChannel(), 0); // first pass ByteBuffer outputBuffer = output.getBuffer(); boolean onceOnly = true; try { while ((!compress.needsInput() && !compress.finished()) || !outputBuffer.hasRemaining() || (onceOnly && lastFrame)) { onceOnly = false; //we need the hasRemaining check, because if the inflater fails to flush needsInput() will return false but it may have flushed an incomplete deflate block if (!outputBuffer.hasRemaining()) { output = largerBuffer(output, channel.getWebSocketChannel(), outputBuffer.capacity() * 2); outputBuffer = output.getBuffer(); } int n = compress.deflate( outputBuffer.array(), outputBuffer.arrayOffset() + outputBuffer.position(), outputBuffer.remaining(), lastFrame ? Deflater.SYNC_FLUSH : Deflater.NO_FLUSH ); outputBuffer.position(outputBuffer.position() + n); } } finally { // Free the buffer AFTER compression so it doesn't get re-used out from under us IoUtils.safeClose(pooledBuffer, inputBuffer); } if(lastFrame) { outputBuffer.put((byte) 0); if (!compressContextTakeover) { compress.reset(); } } outputBuffer.flip(); return output; } private PooledByteBuffer toArrayBacked(ByteBuffer buffer, ByteBufferPool pool) { if(pool.getBufferSize() < buffer.remaining()) { return new ImmediatePooledByteBuffer(ByteBuffer.wrap(Buffers.take(buffer))); } PooledByteBuffer newBuf = pool.getArrayBackedPool().allocate(); newBuf.getBuffer().put(buffer); newBuf.getBuffer().flip(); return newBuf; } private PooledByteBuffer largerBuffer(PooledByteBuffer smaller, WebSocketChannel channel, int newSize) { ByteBuffer smallerBuffer = smaller.getBuffer(); smallerBuffer.flip(); PooledByteBuffer larger = allocateBufferWithArray(channel, newSize); larger.getBuffer().put(smallerBuffer); smaller.close(); return larger; } private PooledByteBuffer allocateBufferWithArray(WebSocketChannel channel, int size) { if (size > 0) { if(size > channel.getBufferPool().getBufferSize()) { // TODO use newer XNIO sized pool thingies smartly return new ImmediatePooledByteBuffer(ByteBuffer.allocate(size)); } } return channel.getBufferPool().getArrayBackedPool().allocate(); } @Override public synchronized PooledByteBuffer transformForRead(PooledByteBuffer pooledBuffer, StreamSourceFrameChannel channel, boolean lastFragmentOfMessage) throws IOException { if ((channel.getRsv() & 4) == 0) { //rsv bit not set, this message is not compressed return pooledBuffer; } PooledByteBuffer output = allocateBufferWithArray(channel.getWebSocketChannel(), 0); // first pass PooledByteBuffer inputBuffer = null; if (currentReadChannel != null && currentReadChannel != channel) { //new channel, we did not get a last fragment message which can happens sometimes decompress.setInput(TAIL); output = decompress(channel.getWebSocketChannel(), output); } ByteBuffer buffer = pooledBuffer.getBuffer(); if (buffer.hasArray()) { decompress.setInput(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); } else { inputBuffer = toArrayBacked(buffer, channel.getWebSocketChannel().getBufferPool()); decompress.setInput(inputBuffer.getBuffer().array(), inputBuffer.getBuffer().arrayOffset() + inputBuffer.getBuffer().position(), inputBuffer.getBuffer().remaining()); } try { output = decompress(channel.getWebSocketChannel(), output); } finally { // Free the buffer AFTER decompression so it doesn't get re-used out from under us IoUtils.safeClose(inputBuffer, pooledBuffer); } if (lastFragmentOfMessage) { decompress.setInput(TAIL); output = decompress(channel.getWebSocketChannel(), output); currentReadChannel = null; } else { currentReadChannel = channel; } output.getBuffer().flip(); return output; } private PooledByteBuffer decompress(WebSocketChannel channel, PooledByteBuffer pooled) throws IOException { ByteBuffer buffer = pooled.getBuffer(); while (!decompress.needsInput() && !decompress.finished()) { if (!buffer.hasRemaining()) { pooled = largerBuffer(pooled, channel, buffer.capacity() * 2); buffer = pooled.getBuffer(); } int n; try { n = decompress.inflate(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); } catch (DataFormatException e) { WebSocketLogger.EXTENSION_LOGGER.debug(e.getMessage(), e); throw WebSocketMessages.MESSAGES.badCompressedPayload(e); } buffer.position(buffer.position() + n); } return pooled; } @Override public void dispose() { // Call end so that native zlib resources can be immediately released rather than relying on finalizer compress.end(); decompress.end(); } }