Logback logs are sent to Kafka, blocking the spring boot main application startup problem

Posted Jun 27, 20203 min read

I encountered a problem today. I added an appender to logback to send the log to springboot. If the Kafka link fails or the metadata update fails, it will block the main application from starting, as shown below
Screenshot 2020-06-27 PM 3.42.55.png

Kafka producer will update metadata before sending a message. About the update mechanism of metadata, I think it is more detailed in this blog .
If the metadata update fails, kafka producer will block max.block.ms and then continue to try to obtain the metadata. It is during this blocking process that the main springboot application is also blocked. The default value of Kafka max.block.ms is 600000.

The solution can reduce the value of max.block.ms, but there is a better solution in here , introducing `
logback-kafka-appender

<dependency>
    <groupId>com.github.danielwegener</groupId>
    <artifactId>logback-kafka-appender</artifactId>
    <version>0.2.0</version>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.3</version>
    <scope>runtime</scope>
</dependency>

Define appenders that send logs to Kafka as follows:

<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<!-- appender link Kafka configuration -->
</appender>

Use ch.qos.logback.classic.AsyncAppender to send Kafka appenders

<appender name="async" class="ch.qos.logback.classic.AsyncAppender">
        <appender-ref ref="kafkaAppender" />
</appender>

Use asynchronous appender directly when logging

<root level="INFO">
        <appender-ref ref="STDOUT" />
        <appender-ref ref="async" />
</root>

Have a look at the AsyncAppender class, which inherits an AsyncAppenderBase class, this class defines a blocking queue and a separate thread worker

@Override
    public void start() {
        if(isStarted())
            return;
        if(appenderCount == 0) {
            addError("No attached appenders found.");
            return;
        }
        if(queueSize <1) {
            addError("Invalid queue size [" + queueSize + "]");
            return;
        }
        //AsyncAppender will initialize a blocking queue when it starts, and the log will be temporarily placed in the queue
        blockingQueue = new ArrayBlockingQueue<E>(queueSize);

        if(discardingThreshold == UNDEFINED)
            discardingThreshold = queueSize/5;
        addInfo("Setting discardingThreshold to "+ discardingThreshold);
        //worker is a single thread used to send logs to each sub appender
        worker.setDaemon(true);
        worker.setName("AsyncAppender-Worker-" + getName());
        //The AsyncAppenderBase class inherits UnsynchronizedAppenderBase, super.start() marks this instance of appender as started
        super.start();
        //The worker thread starts and sends messages to each sub appender
        worker.start();
    }

The appender accepts the message and puts it in the blocking queue:

    //The appender will call the append method if there is a message
    @Override
    protected void append(E eventObject) {
        if(isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) {
            return;
        }
        preprocess(eventObject);
        put(eventObject);
    }

    private void put(E eventObject) {
        if(neverBlock) {
            blockingQueue.offer(eventObject);
        } else {
            //Equivalent to blockingQueue.put(eventObject)
            putUninterruptibly(eventObject);
        }
    }

Worker is a thread internal class, responsible for sending messages to each child appender(in the above example, it is sent to kafkaAppender, and then kafkaAppender is responsible for linking kafka), so that the thread that updates Kafka metadata is handed over to the worker to operate, and the main thread Isolated, the main service will not block.

class Worker extends Thread {

        public void run() {
            AsyncAppenderBase<E> parent = AsyncAppenderBase.this;
            AppenderAttachableImpl<E> aai = parent.aai;

            //loop while the parent is started
            while(parent.isStarted()) {
                try {
                    E e = parent.blockingQueue.take();
                    //Traverse all sub-appenders and pass the message to all sub-appenders
                    aai.appendLoopOnAppenders(e);
                } catch(InterruptedException ie) {
                    break;
                }
            }

            addInfo("Worker thread will flush remaining events before exiting. ");
            //If the appender is closed, after sending the remaining messages, close the sub appender
            for(E e:parent.blockingQueue) {
                aai.appendLoopOnAppenders(e);
                parent.blockingQueue.remove(e);
            }

            aai.detachAndStopAllAppenders();
        }
    }