/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache license, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the license for the specific language governing permissions and
 * limitations under the license.
 */

package org.apache.logging.log4j.core.async;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import com.lmax.disruptor.EventTranslatorVararg;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Marker;
import org.apache.logging.log4j.core.AbstractLifeCycle;
import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
import org.apache.logging.log4j.core.util.Log4jThreadFactory;
import org.apache.logging.log4j.core.util.Throwables;

import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.apache.logging.log4j.message.Message;

Helper class for async loggers: AsyncLoggerDisruptor handles the mechanics of working with the LMAX Disruptor, and works with its associated AsyncLoggerContext to synchronize the life cycle of the Disruptor and its thread with the life cycle of the context. The AsyncLoggerDisruptor of the context is shared by all AsyncLogger objects created by that AsyncLoggerContext.
/** * Helper class for async loggers: AsyncLoggerDisruptor handles the mechanics of working with the LMAX Disruptor, and * works with its associated AsyncLoggerContext to synchronize the life cycle of the Disruptor and its thread with the * life cycle of the context. The AsyncLoggerDisruptor of the context is shared by all AsyncLogger objects created by * that AsyncLoggerContext. */
class AsyncLoggerDisruptor extends AbstractLifeCycle { private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50; private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200; private final Object queueFullEnqueueLock = new Object(); private volatile Disruptor<RingBufferLogEvent> disruptor; private String contextName; private boolean useThreadLocalTranslator = true; private long backgroundThreadId; private AsyncQueueFullPolicy asyncQueueFullPolicy; private int ringBufferSize; AsyncLoggerDisruptor(final String contextName) { this.contextName = contextName; } public String getContextName() { return contextName; } public void setContextName(final String name) { contextName = name; } Disruptor<RingBufferLogEvent> getDisruptor() { return disruptor; }
Creates and starts a new Disruptor and associated thread if none currently exists.
See Also:
  • stop()
/** * Creates and starts a new Disruptor and associated thread if none currently exists. * * @see #stop() */
@Override public synchronized void start() { if (disruptor != null) { LOGGER.trace( "[{}] AsyncLoggerDisruptor not starting new disruptor for this context, using existing object.", contextName); return; } LOGGER.trace("[{}] AsyncLoggerDisruptor creating new disruptor for this context.", contextName); ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLogger.RingBufferSize"); final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLogger.WaitStrategy"); final ThreadFactory threadFactory = new Log4jThreadFactory("AsyncLogger[" + contextName + "]", true, Thread.NORM_PRIORITY) { @Override public Thread newThread(final Runnable r) { final Thread result = super.newThread(r); backgroundThreadId = result.getId(); return result; } }; asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create(); disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, threadFactory, ProducerType.MULTI, waitStrategy); final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getAsyncLoggerExceptionHandler(); disruptor.setDefaultExceptionHandler(errorHandler); final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()}; disruptor.handleEventsWith(handlers); LOGGER.debug("[{}] Starting AsyncLogger disruptor for this context with ringbufferSize={}, waitStrategy={}, " + "exceptionHandler={}...", contextName, disruptor.getRingBuffer().getBufferSize(), waitStrategy .getClass().getSimpleName(), errorHandler); disruptor.start(); LOGGER.trace("[{}] AsyncLoggers use a {} translator", contextName, useThreadLocalTranslator ? "threadlocal" : "vararg"); super.start(); }
Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are shut down and their references set to null.
/** * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are * shut down and their references set to {@code null}. */
@Override public boolean stop(final long timeout, final TimeUnit timeUnit) { final Disruptor<RingBufferLogEvent> temp = getDisruptor(); if (temp == null) { LOGGER.trace("[{}] AsyncLoggerDisruptor: disruptor for this context already shut down.", contextName); return true; // disruptor was already shut down by another thread } setStopping(); LOGGER.debug("[{}] AsyncLoggerDisruptor: shutting down disruptor for this context.", contextName); // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown(). disruptor = null; // client code fails with NPE if log after stop. This is by design. // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed, // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU, // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain. for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) { try { Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while } catch (final InterruptedException e) { // ignored } } try { // busy-spins until all events currently in the disruptor have been processed, or timeout temp.shutdown(timeout, timeUnit); } catch (final TimeoutException e) { LOGGER.warn("[{}] AsyncLoggerDisruptor: shutdown timed out after {} {}", contextName, timeout, timeUnit); temp.halt(); // give up on remaining log events, if any } LOGGER.trace("[{}] AsyncLoggerDisruptor: disruptor has been shut down.", contextName); if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) { LOGGER.trace("AsyncLoggerDisruptor: {} discarded {} events.", asyncQueueFullPolicy, DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy)); } setStopped(); return true; }
Returns true if the specified disruptor still has unprocessed events.
/** * Returns {@code true} if the specified disruptor still has unprocessed events. */
private static boolean hasBacklog(final Disruptor<?> theDisruptor) { final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer(); return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize()); }
Creates and returns a new RingBufferAdmin that instruments the ringbuffer of the AsyncLogger.
Params:
  • jmxContextName – name of the AsyncLoggerContext
Returns:a new RingBufferAdmin that instruments the ringbuffer
/** * Creates and returns a new {@code RingBufferAdmin} that instruments the ringbuffer of the {@code AsyncLogger}. * * @param jmxContextName name of the {@code AsyncLoggerContext} * @return a new {@code RingBufferAdmin} that instruments the ringbuffer */
public RingBufferAdmin createRingBufferAdmin(final String jmxContextName) { final RingBuffer<RingBufferLogEvent> ring = disruptor == null ? null : disruptor.getRingBuffer(); return RingBufferAdmin.forAsyncLogger(ring, jmxContextName); } EventRoute getEventRoute(final Level logLevel) { final int remainingCapacity = remainingDisruptorCapacity(); if (remainingCapacity < 0) { return EventRoute.DISCARD; } return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel); } private int remainingDisruptorCapacity() { final Disruptor<RingBufferLogEvent> temp = disruptor; if (hasLog4jBeenShutDown(temp)) { return -1; } return (int) temp.getRingBuffer().remainingCapacity(); }
Returns true if the specified disruptor is null.
/** * Returns {@code true} if the specified disruptor is null. */
private boolean hasLog4jBeenShutDown(final Disruptor<RingBufferLogEvent> aDisruptor) { if (aDisruptor == null) { // LOG4J2-639 LOGGER.warn("Ignoring log event after log4j was shut down"); return true; } return false; } boolean tryPublish(final RingBufferLogEventTranslator translator) { try { // Note: we deliberately access the volatile disruptor field afresh here. // Avoiding this and using an older reference could result in adding a log event to the disruptor after it // was shut down, which could cause the publishEvent method to hang and never return. return disruptor.getRingBuffer().tryPublishEvent(translator); } catch (final NullPointerException npe) { // LOG4J2-639: catch NPE if disruptor field was set to null in stop() logWarningOnNpeFromDisruptorPublish(translator); return false; } } void enqueueLogMessageWhenQueueFull(final RingBufferLogEventTranslator translator) { try { // Note: we deliberately access the volatile disruptor field afresh here. // Avoiding this and using an older reference could result in adding a log event to the disruptor after it // was shut down, which could cause the publishEvent method to hang and never return. if (synchronizeEnqueueWhenQueueFull()) { synchronized (queueFullEnqueueLock) { disruptor.publishEvent(translator); } } else { disruptor.publishEvent(translator); } } catch (final NullPointerException npe) { // LOG4J2-639: catch NPE if disruptor field was set to null in stop() logWarningOnNpeFromDisruptorPublish(translator); } } void enqueueLogMessageWhenQueueFull( final EventTranslatorVararg<RingBufferLogEvent> translator, final AsyncLogger asyncLogger, final StackTraceElement location, final String fqcn, final Level level, final Marker marker, final Message msg, final Throwable thrown) { try { // Note: we deliberately access the volatile disruptor field afresh here. // Avoiding this and using an older reference could result in adding a log event to the disruptor after it // was shut down, which could cause the publishEvent method to hang and never return. if (synchronizeEnqueueWhenQueueFull()) { synchronized (queueFullEnqueueLock) { disruptor.getRingBuffer().publishEvent(translator, asyncLogger, // asyncLogger: 0 location, // location: 1 fqcn, // 2 level, // 3 marker, // 4 msg, // 5 thrown); // 6 } } else { disruptor.getRingBuffer().publishEvent(translator, asyncLogger, // asyncLogger: 0 location, // location: 1 fqcn, // 2 level, // 3 marker, // 4 msg, // 5 thrown); // 6 } } catch (final NullPointerException npe) { // LOG4J2-639: catch NPE if disruptor field was set to null in stop() logWarningOnNpeFromDisruptorPublish(level, fqcn, msg, thrown); } } private boolean synchronizeEnqueueWhenQueueFull() { return DisruptorUtil.ASYNC_LOGGER_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL // Background thread must never block && backgroundThreadId != Thread.currentThread().getId(); } private void logWarningOnNpeFromDisruptorPublish(final RingBufferLogEventTranslator translator) { logWarningOnNpeFromDisruptorPublish( translator.level, translator.loggerName, translator.message, translator.thrown); } private void logWarningOnNpeFromDisruptorPublish( final Level level, final String fqcn, final Message msg, final Throwable thrown) { LOGGER.warn("[{}] Ignoring log event after log4j was shut down: {} [{}] {}{}", contextName, level, fqcn, msg.getFormattedMessage(), thrown == null ? "" : Throwables.toStringList(thrown)); }
Returns whether it is allowed to store non-JDK classes in ThreadLocal objects for efficiency.
See Also:
Returns:whether AsyncLoggers are allowed to use ThreadLocal objects
Since:2.5
/** * Returns whether it is allowed to store non-JDK classes in ThreadLocal objects for efficiency. * * @return whether AsyncLoggers are allowed to use ThreadLocal objects * @since 2.5 * @see <a href="https://issues.apache.org/jira/browse/LOG4J2-1172">LOG4J2-1172</a> */
public boolean isUseThreadLocals() { return useThreadLocalTranslator; }
Signals this AsyncLoggerDisruptor whether it is allowed to store non-JDK classes in ThreadLocal objects for efficiency.

This property may be modified after the start() method has been called.

Params:
  • allow – whether AsyncLoggers are allowed to use ThreadLocal objects
See Also:
Since:2.5
/** * Signals this AsyncLoggerDisruptor whether it is allowed to store non-JDK classes in ThreadLocal objects for * efficiency. * <p> * This property may be modified after the {@link #start()} method has been called. * </p> * * @param allow whether AsyncLoggers are allowed to use ThreadLocal objects * @since 2.5 * @see <a href="https://issues.apache.org/jira/browse/LOG4J2-1172">LOG4J2-1172</a> */
public void setUseThreadLocals(final boolean allow) { useThreadLocalTranslator = allow; LOGGER.trace("[{}] AsyncLoggers have been modified to use a {} translator", contextName, useThreadLocalTranslator ? "threadlocal" : "vararg"); } }