package io.ebeaninternal.server.transaction;
import io.ebean.ProfileLocation;
import io.ebean.config.ProfilingConfig;
import io.ebean.plugin.Plugin;
import io.ebean.plugin.SpiServer;
import io.ebeaninternal.api.SpiProfileHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import static java.time.temporal.ChronoField.DAY_OF_MONTH;
import static java.time.temporal.ChronoField.HOUR_OF_DAY;
import static java.time.temporal.ChronoField.MILLI_OF_SECOND;
import static java.time.temporal.ChronoField.MINUTE_OF_HOUR;
import static java.time.temporal.ChronoField.MONTH_OF_YEAR;
import static java.time.temporal.ChronoField.SECOND_OF_MINUTE;
import static java.time.temporal.ChronoField.YEAR;
Default profile handler.
Uses ConcurrentLinkedQueue to minimise contention on threads calling collectTransactionProfile().
Uses a sleep backoff on the single threaded consumer that reads the profiles and writes them to files.
/**
* Default profile handler.
* <p>
* Uses ConcurrentLinkedQueue to minimise contention on threads calling collectTransactionProfile().
* <p>
* Uses a sleep backoff on the single threaded consumer that reads the profiles and writes them to files.
*/
public class DefaultProfileHandler implements SpiProfileHandler, Plugin {
private static final Logger log = LoggerFactory.getLogger(DefaultProfileHandler.class);
private static final DateTimeFormatter DTF;
static {
DTF = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendValue(YEAR, 4)
.appendValue(MONTH_OF_YEAR, 2)
.appendValue(DAY_OF_MONTH, 2)
.appendLiteral('-')
.appendValue(HOUR_OF_DAY, 2)
.appendValue(MINUTE_OF_HOUR, 2)
.appendValue(SECOND_OF_MINUTE, 2)
.appendLiteral('-')
.appendValue(MILLI_OF_SECOND, 3)
.toFormatter();
}
Low contention choice.
/**
* Low contention choice.
*/
private final Queue<TransactionProfile> queue = new ConcurrentLinkedQueue<>();
private final ExecutorService executor;
private final ReentrantLock lock = new ReentrantLock();
private final File dir;
private final long minMicros;
private final long profilesPerFile;
private final boolean verbose;
private volatile boolean shutdown;
private long profileCounter;
Slow down polling of transaction profiling queue.
/**
* Slow down polling of transaction profiling queue.
*/
private int sleepBackoff;
private Writer out;
public DefaultProfileHandler(ProfilingConfig config) {
this.verbose = config.isVerbose();
this.minMicros = config.getMinimumMicros();
this.profilesPerFile = config.getProfilesPerFile();
// dedicated single threaded executor for consuming the
// profiling and writing it to file(s)
this.executor = Executors.newSingleThreadExecutor();
this.dir = new File(config.getDirectory());
if (!dir.exists() && !dir.mkdirs()) {
log.error("failed to mkdirs " + dir.getAbsolutePath());
}
incrementFile();
}
Low contention adding the transaction profile to the queue.
Minimise the impact to the normal transaction processing (threads).
/**
* Low contention adding the transaction profile to the queue.
* Minimise the impact to the normal transaction processing (threads).
*/
@Override
public void collectTransactionProfile(TransactionProfile transactionProfile) {
queue.add(transactionProfile);
}
Create and return a ProfileStream.
/**
* Create and return a ProfileStream.
*/
@Override
public ProfileStream createProfileStream(ProfileLocation location) {
return new DefaultProfileStream(location, verbose);
}
private void flushCurrentFile() {
lock.lock();
try {
if (out != null) {
try {
out.close();
out = null;
} catch (IOException e) {
log.error("Failed to flush and close transaction profiling file ", e);
}
}
} finally {
lock.unlock();
}
}
Move to the next file to write to.
/**
* Move to the next file to write to.
*/
private void incrementFile() {
lock.lock();
try {
flushCurrentFile();
try {
String now = DTF.format(LocalDateTime.now());
File file = new File(dir, "txprofile-" + now + ".tprofile");
out = new BufferedWriter(new FileWriter(file));
} catch (IOException e) {
log.error("Not expected", e);
}
} finally {
lock.unlock();
}
}
Main loop for polling the queue and processing profiling messages.
/**
* Main loop for polling the queue and processing profiling messages.
*/
private void collect() {
try {
while (!shutdown) {
TransactionProfile profile = queue.poll();
if (profile == null) {
sleep();
} else if (include(profile)) {
write(profile);
}
}
flushCurrentFile();
} catch (Exception e) {
log.warn("Error on collect", e);
}
}
Write the profile to the current file.
/**
* Write the profile to the current file.
*/
private void write(TransactionProfile profile) {
try {
sleepBackoff = 0;
++profileCounter;
StringBuilder sb = new StringBuilder(80);
// header
sb.append(profile.getStartTime()).append(' ')
.append(profile.getLabel()).append(' ')
.append(profile.getTotalMicros()).append(' ');
// summary
appendSummary(profile, sb);
out.write(sb.toString());
if (verbose) {
out.write(' ');
out.write(profile.getData());
}
out.write('\n');
if (profileCounter % profilesPerFile == 0) {
incrementFile();
log.debug("profiled {} transactions", profileCounter);
}
} catch (IOException e) {
log.warn("Error writing transaction profiling", e);
}
}
private void appendSummary(TransactionProfile profile, StringBuilder sb) {
TransactionProfile.Summary summary = profile.getSummary();
sb.append("z:").append(rate(profile.getTotalMicros(), summary.persistCount + summary.queryCount)).append(' ');
sb.append("p:").append(rate(summary.persistMicros, summary.persistBeans)).append(' ');
sb.append("q:").append(rate(summary.queryMicros, summary.queryCount)).append(' ');
sb.append("qm:").append(summary.queryMax).append(' ');
sb.append("qt:").append(summary.queryMicros).append(' ');
sb.append("qc:").append(summary.queryCount).append(' ');
sb.append("qb:").append(summary.queryBeans).append(' ');
sb.append("pt:").append(summary.persistMicros).append(' ');
sb.append("pc:").append(summary.persistCount).append(' ');
sb.append("pb:").append(summary.persistBeans).append(' ');
sb.append("po:").append(summary.persistOneCount).append(' ');
sb.append("pz:").append(rate(summary.persistBeans, summary.persistCount));
}
private int rate(long micros, long count) {
return count < 1 ? 0 : (int) (micros / count);
}
Return true if the profile should be included (or false for ignored).
/**
* Return true if the profile should be included (or false for ignored).
*/
private boolean include(TransactionProfile profile) {
return profile.getTotalMicros() >= minMicros;
}
Sleep backing off towards 250 millis when there is no activity.
This seems to be simple and decent for our queue consumer.
/**
* Sleep backing off towards 250 millis when there is no activity.
* This seems to be simple and decent for our queue consumer.
*/
private void sleep() {
try {
// backoff sleep when nothing is happening
int sleepFor = Math.min(++sleepBackoff, 250);
Thread.sleep(sleepFor);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void configure(SpiServer server) {
// do nothing
}
@Override
public void online(boolean online) {
if (online) {
executor.submit(this::collect);
}
}
@Override
public void shutdown() {
shutdown = true;
log.trace("shutting down");
try {
executor.shutdown();
if (!executor.awaitTermination(4, TimeUnit.SECONDS)) {
log.info("Shut down timeout exceeded. Terminating profiling consumer thread.");
executor.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupt on shutdown", e);
}
flushCurrentFile();
}
}