/*
* JBoss, Home of Professional Open Source.
* Copyright 2014 Red Hat, Inc., and individual contributors
* as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.undertow.websockets.core;
import org.xnio.ChannelListener;
import io.undertow.connector.PooledByteBuffer;
import java.io.IOException;
import java.nio.ByteBuffer;
A buffered text message.
Author: Stuart Douglas
/**
* A buffered text message.
*
* @author Stuart Douglas
*/
public class BufferedTextMessage {
private final UTF8Output data = new UTF8Output();
private final boolean bufferFullMessage;
private final long maxMessageSize;
private boolean complete;
long currentSize;
Params: - maxMessageSize – The maximum message size
- bufferFullMessage – If the complete message should be buffered
/**
* @param maxMessageSize The maximum message size
* @param bufferFullMessage If the complete message should be buffered
*/
public BufferedTextMessage(long maxMessageSize, boolean bufferFullMessage) {
this.maxMessageSize = maxMessageSize;
this.bufferFullMessage = bufferFullMessage;
}
public BufferedTextMessage(boolean bufferFullMessage) {
this(-1, bufferFullMessage);
}
private void checkMaxSize(StreamSourceFrameChannel channel, int res) throws IOException {
if(res > 0) {
currentSize += res;
}
if (maxMessageSize > 0 && currentSize > maxMessageSize) {
WebSockets.sendClose(new CloseMessage(CloseMessage.MSG_TOO_BIG, WebSocketMessages.MESSAGES.messageToBig(maxMessageSize)), channel.getWebSocketChannel(), null);
throw new IOException(WebSocketMessages.MESSAGES.messageToBig(maxMessageSize));
}
}
public void readBlocking(StreamSourceFrameChannel channel) throws IOException {
PooledByteBuffer pooled = channel.getWebSocketChannel().getBufferPool().allocate();
final ByteBuffer buffer = pooled.getBuffer();
try {
for (; ; ) {
int res = channel.read(buffer);
if (res == -1) {
buffer.flip();
data.write(buffer);
this.complete = true;
return;
} else if (res == 0) {
channel.awaitReadable();
}
checkMaxSize(channel, res);
if (!buffer.hasRemaining()) {
buffer.flip();
data.write(buffer);
buffer.compact();
if (!bufferFullMessage) {
//if we are not reading the full message we return
return;
}
}
}
} finally {
pooled.close();
}
}
public void read(final StreamSourceFrameChannel channel, final WebSocketCallback<BufferedTextMessage> callback) {
PooledByteBuffer pooled = channel.getWebSocketChannel().getBufferPool().allocate();
final ByteBuffer buffer = pooled.getBuffer();
try {
try {
for (; ; ) {
int res = channel.read(buffer);
if (res == -1) {
this.complete = true;
buffer.flip();
data.write(buffer);
callback.complete(channel.getWebSocketChannel(), this);
return;
} else if (res == 0) {
buffer.flip();
if (buffer.hasRemaining()) {
data.write(buffer);
if (!bufferFullMessage) {
callback.complete(channel.getWebSocketChannel(), this);
}
}
channel.getReadSetter().set(new ChannelListener<StreamSourceFrameChannel>() {
@Override
public void handleEvent(StreamSourceFrameChannel channel) {
if(complete ) {
return;
}
PooledByteBuffer pooled = channel.getWebSocketChannel().getBufferPool().allocate();
final ByteBuffer buffer = pooled.getBuffer();
try {
try {
for (; ; ) {
int res = channel.read(buffer);
if (res == -1) {
checkMaxSize(channel, res);
buffer.flip();
data.write(buffer);
complete = true;
callback.complete(channel.getWebSocketChannel(), BufferedTextMessage.this);
return;
} else if (res == 0) {
buffer.flip();
if (buffer.hasRemaining()) {
data.write(buffer);
if (!bufferFullMessage) {
callback.complete(channel.getWebSocketChannel(), BufferedTextMessage.this);
}
}
return;
}
if (!buffer.hasRemaining()) {
buffer.flip();
data.write(buffer);
buffer.clear();
if (!bufferFullMessage) {
callback.complete(channel.getWebSocketChannel(), BufferedTextMessage.this);
}
}
}
} catch (IOException e) {
callback.onError(channel.getWebSocketChannel(), BufferedTextMessage.this, e);
}
} finally {
pooled.close();
}
}
});
channel.resumeReads();
return;
}
checkMaxSize(channel, res);
if (!buffer.hasRemaining()) {
buffer.flip();
data.write(buffer);
buffer.clear();
if (!bufferFullMessage) {
callback.complete(channel.getWebSocketChannel(), this);
}
}
}
} catch (IOException e) {
callback.onError(channel.getWebSocketChannel(), this, e);
}
} finally {
pooled.close();
}
}
Gets the buffered data and clears the buffered text message. If this is not called on a UTF8
character boundary there may be partial code point data that is still buffered.
Returns: The data
/**
* Gets the buffered data and clears the buffered text message. If this is not called on a UTF8
* character boundary there may be partial code point data that is still buffered.
*
* @return The data
*/
public String getData() {
return data.extract();
}
public boolean isComplete() {
return complete;
}
}