package jdk.jfr.internal.consumer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.time.Instant;
import jdk.jfr.Recording;
import jdk.jfr.RecordingState;
import jdk.jfr.internal.Utils;
import jdk.jfr.internal.SecuritySupport;
import jdk.jfr.internal.SecuritySupport.SafePath;
import jdk.jfr.internal.management.EventByteStream;
import jdk.jfr.internal.management.ManagementSupport;
public final class OngoingStream extends EventByteStream {
private static final byte[] EMPTY_ARRAY = new byte[0];
private static final int = (int)ChunkHeader.HEADER_SIZE;
private static final int = (int)ChunkHeader.FILE_STATE_POSITION;
private static final byte MODIFYING_STATE = ChunkHeader.UPDATING_CHUNK_HEADER;
private final RepositoryFiles repositoryFiles;
private final Recording recording;
private final int blockSize;
private final long endTimeNanos;
private final byte[] = new byte[HEADER_SIZE];
private RecordingInput input;
private ChunkHeader ;
private long position;
private long startTimeNanos;
private Path path;
private boolean first = true;
public OngoingStream(Recording recording, int blockSize, long startTimeNanos, long endTimeNanos) {
super();
this.recording = recording;
this.blockSize = blockSize;
this.startTimeNanos = startTimeNanos;
this.endTimeNanos = endTimeNanos;
this.repositoryFiles = new RepositoryFiles(SecuritySupport.PRIVILEGED, null);
}
public synchronized byte[] read() throws IOException {
try {
return readBytes();
} catch (IOException ioe) {
if (recording.getState() == RecordingState.CLOSED) {
return null;
}
throw ioe;
}
}
private byte[] readBytes() throws IOException {
touch();
if (recording.getState() == RecordingState.NEW) {
return EMPTY_ARRAY;
}
if (recording.getState() == RecordingState.DELAYED) {
return EMPTY_ARRAY;
}
if (first) {
long s = ManagementSupport.getStartTimeNanos(recording);
startTimeNanos = Math.max(s, startTimeNanos);
first = false;
}
while (true) {
if (startTimeNanos > endTimeNanos) {
return null;
}
if (isRecordingClosed()) {
closeInput();
return null;
}
if (!ensurePath()) {
return EMPTY_ARRAY;
}
if (!ensureInput()) {
return EMPTY_ARRAY;
}
if (position < header.getChunkSize()) {
long size = Math.min(header.getChunkSize() - position, blockSize);
return readBytes((int) size);
}
if (header.isFinished()) {
if (header.getDurationNanos() < 1) {
throw new IOException("No progress");
}
startTimeNanos += header.getDurationNanos();
ManagementSupport.removePath(recording, path);
closeInput();
} else {
header.refresh();
if (position >= header.getChunkSize()) {
return EMPTY_ARRAY;
}
}
}
}
private boolean isRecordingClosed() {
return recording != null && recording.getState() == RecordingState.CLOSED;
}
private void closeInput() {
if (input != null) {
try {
input.close();
} catch (IOException ioe) {
}
input = null;
position = 0;
path = null;
}
}
private byte[] readBytes(int size) throws IOException {
if (position == 0) {
return readWithHeader(size);
} else {
return readNonHeader(size);
}
}
private byte[] (int size) throws IOException {
byte[] result = new byte[size];
input.readFully(result);
position += size;
return result;
}
private byte[] (int size) throws IOException {
byte[] bytes = new byte[Math.max(HEADER_SIZE, size)];
for (int attempts = 0; attempts < 25; attempts++) {
input.position(0);
input.readFully(bytes, 0, HEADER_SIZE);
input.position(0);
input.readFully(headerBytes);
if (bytes[HEADER_FILE_STATE_POSITION] != MODIFYING_STATE) {
if (bytes[HEADER_FILE_STATE_POSITION] == headerBytes[HEADER_FILE_STATE_POSITION]) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
buffer.putLong(8, HEADER_SIZE);
buffer.putLong(16, 0);
buffer.putLong(24, 0);
buffer.putLong(40, 0);
buffer.put(64, (byte) 1);
int left = bytes.length - HEADER_SIZE;
input.readFully(bytes, HEADER_SIZE, left);
position += bytes.length;
return bytes;
}
}
takeNap();
}
return EMPTY_ARRAY;
}
private void takeNap() throws IOException {
try {
Thread.sleep(10);
} catch (InterruptedException ie) {
throw new IOException("Read operation interrupted", ie);
}
}
private boolean ensureInput() throws IOException {
if (input == null) {
if (SecuritySupport.getFileSize(new SafePath(path)) < HEADER_SIZE) {
return false;
}
input = new RecordingInput(path.toFile(), SecuritySupport.PRIVILEGED);
header = new ChunkHeader(input);
}
return true;
}
private boolean ensurePath() {
if (path == null) {
path = repositoryFiles.nextPath(startTimeNanos, false);
}
return path != null;
}
@Override
public synchronized void close() throws IOException {
closeInput();
if (recording.getName().startsWith(EventByteStream.NAME)) {
recording.close();
}
}
}