Let’s take a look at how to increase the consumption speed of RabbitMQ in the Magic Box (Big Data Development Collaboration Platform)

Posted Jun 15, 20209 min read

  1. This article mainly introduces the optimization methods that can increase the consumption speed of the RabbitMQ consumer end under the premise that the consumer cluster resources are limited in the actual production project.
  2. In order to help everyone to understand the problem more clearly, the article deliberately compares the time before and after optimization, and the demo download address is provided at the end of the article.

1. Introduction to the Magic Box

  1. The Magic Box is a development collaboration platform in the Jubilee Digital Core Big Data development platform;
  2. Data developers can easily complete the offline tasks and real-time tasks packaging, testing, and launching through the magic box;
  3. Support serial and parallel workflow settings for offline tasks;
  4. Provide a complete task operation monitoring and alarm system.

Two. The process of packaging the offline task of Magic Box

Flowchart

Magic Box Packaging Process.png

Process combing

  1. After the data developer receives the required code development offline, he can create Spark tasks in the test environment through the magic box;
  2. The data developer initiates a request to package the Spark task by clicking the project construction button on the page;

packaging1.png

  1. After receiving the packaging request, the server will update the status of the data to be constructed;
  2. The scheduled task is scanned from the database every 1 second, and if a task to be built is found, the task is placed in the queue;
  3. The consumer of the message receives the message(data to be packaged), starts packaging the project on the server(the packaging process may take about 1-5 minutes), and returns the packaging log to the front end through the WebSocket service;

packaging3.png

  1. After receiving the packaging log, the front end(WebSocket client) displays it in real time in the interface window until the packaging is completed.

packaging2.png

Three. Scenes using RabbitMQ

  1. As long as the server's scheduled task finds the task to be built from the database, it will be placed in the queue, regardless of when the message is consumed, and it is here the role of the RabbitMQ production side;
  2. The RabbitMQ consumer monitors the queue, and after receiving the message, it executes business logic and executes the mvn command in the background to package it;
  3. In order to ensure the robustness of the packaging process application, we deploy the message production end and message consumption end as two services.

Four. Problems

  1. Because the process of mvn packaging is time-consuming(the packaging process may take about 1-5 minutes), the consumer will consume messages slowly, and there will be more unack messages, resulting in a message backlog ;
  2. When the front end initiates too many packaging requests, because there are more messages waiting to be consumed in the queue, the throughput is greatly reduced. The reflected phenomenon is that your packaging request may have to wait a long time before being processed. The server consumes it, so it takes a long time on the interface to see the output of the package log.

Five. Optimization ideas

1. Enable consumer multithreading

In RabbitMQ, we can create multiple consumers to consume the same queue, thereby increasing the consumption speed.
Multiple Consumers.png

Add container factory configuration
/**

 * RabbitMQ configuration

 * Turn on consumer multithreading while consuming messages

 * @author lyf

 * @   full stack on the road

 * @GitHub https://github.com/liuyongfei1

 * @date 2020-06-02 06:10

 */

@Slf4j

@Configuration

public class RabbitConsumerConfig {

@Bean("customContainerFactory")

public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

//Set the number of threads

factory.setConcurrentConsumers(10);

//Set the maximum number of threads

factory.setMaxConcurrentConsumers(10);

configurer.configure(factory,connectionFactory);

return factory;

}

}
The consumer uses the container factory
@RabbitListener(queues = {QueueConstants.QUEUE_NAME}, containerFactory = "customContainerFactory")
View setting effect

After restarting the service, open the RabbitMQ management interface, find the queue used to send the message and click, it will jump to the queue details page, and some information about the consumer will be displayed in the Consumers column:
Consumers1.png

As can be seen from the figure, there are now a total of 10 consumers, which proves that our configuration just took effect.

2. Consumer current limit mechanism

Polling distribution

In RabbitMQ, the default message distribution mechanism is round-robin distribution. Multiple consumers will consume messages in turn from the queue in turn:

For example, there are currently 10 messages and 2 consumers. RabbitMQ Server will not consider which consumer is currently idle or busy, but will allocate 5 messages to 2 consumers at a time, average Each consumer will get the same amount of news.

Let's use a simple demo to experience the disadvantages of this message distribution mechanism.

Production side

/**

 * Message production side

 *

 * @author lyf

 * @   full stack on the road

 * @GitHub https://github.com/liuyongfei1

 * @date 2020-06-11 06:30

 */

Slf4j

Data

@RestController

public class ProducerController {

@Autowired

private RabbitTemplate rabbitTemplate;

@GetMapping("/send")

public String send() {

for(int i = 1; i <11; i++) {

String message = "NO. "+ i;

String msgId = UUID.randomUUID().toString();

try {

TimeUnit.MILLISECONDS.sleep(100);

} catch(InterruptedException e) {

e.printStackTrace();

}

rabbitTemplate.convertAndSend("", QueueConstants.QUEUE_NAME, message, new CorrelationData(msgId));

log.info("The production side sent a message:[{}]succeeded.", message);

}

return "Send message successfully";

}

}
  1. Consumer A

    /**

    • Message consumer A

    • @author Liuyongfei

    • @ full stack on the road

    • @GitHub https://github.com/liuyongfei1

    • @date 2020-06-11 13:30

    • /

      @Slf4j

      @Data

      @Component

      public class Consumer1Controller {

      @RabbitListener(queues = {QueueConstants.QUEUE_NAME})

      public void work(Message message, Channel channel) {

      //Get news

      String info =(String) message.getPayload();

      log.info("Consumer A gets the message:{}", info);

      try {

      //Get header

      MessageHeaders headers = message.getHeaders();

      Long tag =(Long) headers.get(AmqpHeaders.DELIVERY_TAG);

      TimeUnit.MILLISECONDS.sleep(200);

      channel.basicAck(tag, false);

      } catch(InterruptedException | IOException e) {

      log.error("An exception occurred while getting the message:"+ e.getMessage());

      }

      }

      }

  2. Consumer B

    /**

    • Message consumer B

    • @author Liuyongfei

    • @ full stack on the road

    • @GitHub https://github.com/liuyongfei1

    • @date 2020-06-11 13:50

      */

      @Slf4j

      @Data

      @Component

      public class Consumer2Controller {

      @RabbitListener(queues = {QueueConstants.QUEUE_NAME})

      public void work(Message message, Channel channel) {

      //Get news

      String info =(String) message.getPayload();

      log.info("Consumer B gets the message:{}", info);

      try {

      //Get header

      MessageHeaders headers = message.getHeaders();

      Long tag =(Long) headers.get(AmqpHeaders.DELIVERY_TAG);

      TimeUnit.MILLISECONDS.sleep(200);

      channel.basicAck(tag, false);

      } catch(InterruptedException | IOException e) {

      log.error("An exception occurred while getting the message:"+ e.getMessage());

      }

      }

      }

  3. Check consumption

consumer-console.png
We can see the consumption of two consumers from the idea console, and the messages in the Queue will be shared among multiple consumers.

  1. Disadvantages

    • If the processing time of each message is different, it may cause some consumers to be busy all the time, while other consumers will quickly finish the work at hand and have been idle;
    • Specific to our actual project, Some Spark tasks may need to rely on fewer jar packages when packaging, some may require more jar packages, and the impact of the network when packaging, so each The time it takes for the consumer to process specific packaging actions is not the same, so using polling distribution rather than distributing according to the actual capabilities of the consumer will definitely reduce the throughput of the consumer.
Fair distribution

Based on the various problems that the polling distribution mechanism will encounter, how can we achieve fair consumption of news according to the capabilities of consumers?
We use consumer current limiting and ack confirmation mechanism to modify RabbitMQ's default message distribution mechanism:
Message Fair Distribution.jpeg

Make sure that each consumer processes at most one message at the same time. In other words, before receiving the consumer's ack, RabbitMQ Server will not distribute the new message to the consumer.

  • application.properties
    Add the following two lines of configuration in the configuration file application.properties:

    Turn on ACK(manual confirmation when the consumer receives the message)

    spring.rabbitmq.listener.simple.acknowledge-mode=manual

    Set the current channel maximum pre-fetch message volume to 1

    spring.rabbitmq.listener.simple.prefetch=1

Restart the service, check from RabbitMQ management interface:
Fair distribution 1.png

  • View consumption

We increased the sleep time of consumer B above to 1000 milliseconds, to simulate consumer B's time-consuming tasks. Then restart the service and check the consumption again:
consumer-console2.png
It can be seen from the idea console that after setting, RabbitMQ has already distributed messages according to the actual capabilities of consumers.

VI. Packaging Task

Let's assume that there are 5 pieces of data in the Magic Box database that need to be packaged. Let's use these 5 pieces of data as the test data behind:
Data to be packaged 1.png

Remarks:

  1. The following tests are completed on this machine, packaging time-consuming will be affected by the local machine configuration and network environment;

  2. The time consuming in the following test includes the following processes:

    • First pull the branch code used from the git repository;
    • Pack the current project code;
    • Automatically upload the jar package to HDFS after packaging.

VII. Before optimization

1. Pack the data into the queue

Package console1.png

From the figure, we can see that the five pieces of data are put into the RabbitMQ queue at 15:06 in the order of version_id from small to large:
queues1.png

The consumer uses 1 consumer:
consumers2.png

2. Time spent

By querying the packaging log, you can find the consumption of the last message:

Package 2.png
Package 3.png

can be seen:

  • The last message starts consumption at 15:10;
  • Packed at 15:13;
  • From 15:06 to 15:13, the last task is packaged and lasts about 7 minutes in total.

Eight. After optimization

At the consumer end, 10 consumers are used, and the maximum prefetch count(prefetch count) of the current channel is set to 1:

Optimized prefetch count.png

1. Pack the data into the queue

Optimized Message Listed.png
You can see from the figure that the message was put in the queue at 18:17.

2. Time spent

By querying the packaging log, we can see that multiple threads are started to consume messages at the same time:

2020\-06\-12 18:17:50,931 INFO LOG\_PACKAGING \[\]-\[TaskSparkVersionEntity(versionId=388...

...

2020\-06\-12 18:17:50,932 INFO LOG\_PACKAGING \[\]-\[TaskSparkVersionEntity(versionId=390, ......

...

2020\-06\-12 18:17:50,932 INFO LOG\_PACKAGING \[\]-\[TaskSparkVersionEntity(versionId=386, ......

...

2020\-06\-12 18:17:50,932 INFO LOG\_PACKAGING \[\]-\[TaskSparkVersionEntity(versionId=387, ......

...

2020\-06\-12 18:17:50,932 INFO LOG\_PACKAGING \[\]-\[TaskSparkVersionEntity(versionId=389, ......

You can see the consumption of the last message:
Package time after optimization.png

can be seen:

  • Started multiple threads, a total of 10 consumers;
  • From 18:17:50 to 18:21, the last task is packaged and lasts about 3 minutes;
  • From 7 minutes before optimization to 3 minutes after optimization, the time consumption is greatly shortened, and the throughput of the consumer has been greatly improved.

. Code download address

Follow WeChat Official Account

Everyone is welcome to follow my WeChat public account to read more articles:
WeChat Official Account QR Code.jpg