Spring boot uses RocketMQ

Posted May 25, 202010 min read

This example implements RocketMQ producer, consumer, and message monitoring

Use docker-compose to build RocketMQ environment

The following configuration files are all sourced online, I just use them.
Generate the following directory in the same directory as the docker-compose.yml file

mkdir -p ./data/logs
mkdir -p ./data/store
mkdir -p ./data/brokerconf

Edit broker.conf in the ./data/brokerconf directory

brokerClusterName = DefaultCluster

# broker name, note that different configuration files are filled in differently here, if used in broker-a.properties:broker-a,
# Use in broker-b.properties:broker-b
brokerName = broker-a

# 0 means Master,> 0 means Slave
brokerId = 0

# nameServer address, separated by semicolon
# namesrvAddr = rocketmq-nameserver1:9876; rocketmq-nameserver2:9876
# namesrvAddr = 192.168.6.139:9876

# brokerIP1 Set the host IP, do not use docker internal IP
brokerIP1 = 192.168.6.139

# When sending a message, automatically create a topic that does not exist on the server, the number of queues created by default
defaultTopicQueueNums = 4

# Whether to allow Broker to automatically create Topic, suggest to open offline, close online
autoCreateTopicEnable = true

# Whether to allow Broker to automatically create subscription groups, it is recommended to open offline, close online
autoCreateSubscriptionGroup = true

# Broker External service listening port
listenPort = 10911

# Delete the file time, the default is 4 am
deleteWhen = 04

# File retention time, default 48 hours
fileReservedTime = 120

# commitLog The default size of each file is 1G
mapedFileSizeCommitLog = 1073741824

# ConsumeQueue Each file stores 30W by default, adjusted according to business conditions
mapedFileSizeConsumeQueue = 300000

# destroyMapedFileIntervalForcibly = 120000
# redeleteHangedFileInterval = 120000
# Detect physical file disk space
diskMaxUsedSpaceRatio = 88
# Storage path
# storePathRootDir =/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog storage path
# storePathCommitLog =/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# Consumer queue storage
# storePathConsumeQueue =/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# Message index storage path
# storePathIndex =/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint file storage path
# storeCheckpoint =/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort file storage path
# abortFile =/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# Limit message size
maxMessageSize = 65536

# flushCommitLogLeastPages = 4
# flushConsumeQueueLeastPages = 2
# flushCommitLogThoroughInterval = 10000
# flushConsumeQueueThoroughInterval = 60000

# The role of Broker
#-ASYNC_MASTER asynchronous replication Master
#-SYNC_MASTER Sync Double Write Master
#-SLAVE
brokerRole = ASYNC_MASTER

# Brush mode
#-ASYNC_FLUSH asynchronous flash
#-SYNC_FLUSH Synchronous flashing
flushDiskType = ASYNC_FLUSH

# Number of message thread pools
sendMessageThreadPoolNums = 10
# Pull message thread pool number
pullMessageThreadPoolNums = 10

The contents of docker-compose.yml are as follows

version:'3.5'
services:
  rmqnamesrv:
    image:foxiswho/rocketmq:server
    container_name:rmqnamesrv
    ports:
      -9876:9876
    volumes:
      -./data/logs:/opt/logs
      -./data/store:/opt/store
    networks:
        rmq:
          aliases:
            -rmqnamesrv

  rmqbroker:
    image:foxiswho/rocketmq:broker
    container_name:rmqbroker
    ports:
      -10909:10909
      -10911:10911
    volumes:
      -./data/logs:/opt/logs
      -./data/store:/opt/store
      -./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
    environment:
        NAMESRV_ADDR:"rmqnamesrv:9876"
        JAVA_OPTS:"-Duser.home =/opt"
        JAVA_OPT_EXT:"-server -Xms128m -Xmx128m -Xmn128m"
    command:mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      -rmqnamesrv
    networks:
      rmq:
        aliases:
          -rmqbroker

  rmqconsole:
    image:styletang/rocketmq-console-ng
    container_name:rmqconsole
    ports:
      -18080:8080
    environment:
        JAVA_OPTS:"-Drocketmq.namesrv.addr = rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel = false"
    depends_on:
      -rmqnamesrv
    networks:
      rmq:
        aliases:
          -rmqconsole

networks:
  rmq:
    name:rmq
    driver:bridge

start up

docker-compose up

Browser input http://localhost :18080/to display the console interface

RocketMQ Essentials

Consumption patterns

  • Push Consumer(Push Consumer)

    After receiving the data, Broker will actively push it to the consumer

  • Pull Consumer

    Actively call Consumer's pull message method to pull messages from the Broker server, and the initiative is controlled by the application. Once the batch messages are obtained, the application will start the consumption process.

Timed message

  • Timed message(delay queue) means that after the message is sent to the broker, it will not be consumed immediately, waiting for a specific time for delivery to the real topic.

  • delayLevel has 18 levels, and the corresponding times are:"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h",

  • messageDelayLevel is an attribute of broker and does not belong to a topic.

  • When sending a message, set the delayLevel level:msg.setDelayLevel(level)

    • level == 0, the message is non-delayed
    • 1 <= level <= maxLevel, the message is delayed by a specific time, for example, level == 1, delayed by 1s
    • level> maxLevel, then level == maxLevel, for example level == 20, delay 2h

Message mode

  • Cluster consumption(Cluster)
    Each Consumer instance of the same Consumer Group divides the message equally
  • Broadcasting
    Each Consumer instance of the same Consumer Group receives the full amount of messages

Message sequence

  • Normal Ordered Message
    Messages received by consumers through the same consumption queue are in order, and messages received in different message queues may be out of order
  • Strictly Ordered Message
    All messages received by consumers are in order

Message retry

  • Retry queue

    1. RocketMQ will set up a retry queue with the topic name "%RETRY%+ consumerGroup" for each consumer group. This topic's retry queue is for the consumer group, not for each topic
    2. Because it takes some time to recover from the exception, multiple retry levels will be set for the retry queue. Each retry level has a corresponding re-delivery delay. The more the number of retries, the greater the delivery delay.
    3. RocketMQ's handling of retry messages is first saved to the delay queue with the topic name "SCHEDULE \ _TOPIC \ _XXXX", and the background scheduled task is delayed according to the corresponding time and then saved again to "%RETRY%+ consumerGroup" Trial queue.
  • Dead Letter Queue

    1. Used to process messages that cannot be consumed normally. When the first consumption of a message fails, the message queue will automatically retry the message; after the maximum number of retries, if the consumption still fails, at this time, the message queue will not immediately discard the message, but will send it to the consumer In a special queue.
    2. RocketMQ refers to this kind of message that cannot be consumed under normal circumstances as Dead-Letter Message, and the special queue for storing dead-letter messages as Dead-Letter Queue. In RocketMQ, you can use the console console to resend the messages in the dead letter queue to make the consumer instance consume again.

Test code

Send messages, out of order

Screenshot 2020-05-22 21.26.15.png

public class ProducerNoOrder {
    public static void main(String []args) {

        DefaultMQProducer producer;

        Random r = new Random();
        producer = new DefaultMQProducer("BinProducerGroup");
        producer.setInstanceName("BinTestInstance_" + r.nextInt(1000));
        producer.setNamesrvAddr("106.14.186.226:7060");
        producer.setSendMsgTimeout(30000);
        producer.setMaxMessageSize(500000);
        producer.setRetryTimesWhenSendFailed(3);

        try {
            producer.start();
            Thread.sleep(8000);
        } catch(Exception e) {
            e.printStackTrace();
        }


        for(int i = 0; i <100; i ++) {
            try {
                String msg = "message" + i;
                Message sendMsg = new Message("BinTestTopic02",
                        "rocketTag01",
                        "key" + i,
                        msg.getBytes());
                //mqProducer.sendOneway(sendMsg); //Only send message, don't care about send result(such as send log).
                //sendResult = mqProducer.send(sendMsg); //Sync send

                producer.send(sendMsg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println("Sent ok:" + sendResult);
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        System.out.println("Sent failed:" + throwable.getMessage());
                    }
                });
                Thread.sleep(100);
            } catch(Exception e) {
                e.printStackTrace();
            }
        }

    }
}

You can see that they are randomly placed in different queues
Screenshot 2020-05-22 21.37.13.png

Send messages sequentially

Screenshot 2020-05-22 21.31.04.png

public class ProducerInOrder {

    public static void main(String []args) {

        DefaultMQProducer producer;

        Random r = new Random();
        producer = new DefaultMQProducer("BinProducerGroup");
        producer.setInstanceName("BinTestInstance_" + r.nextInt(1000));
        producer.setNamesrvAddr("106.14.186.226:7060");
        producer.setSendMsgTimeout(30000);
        producer.setMaxMessageSize(500000);
        producer.setRetryTimesWhenSendFailed(3);

        try {
            producer.start();
            Thread.sleep(8000);
        } catch(Exception e) {
            e.printStackTrace();
        }

        String []tags = new String []{"TagA", "TagC", "TagD"};

        //Order List
        List <OrderStep> orderList = new ProducerInOrder(). BuildOrders();

        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);
        int orderSize = orderList.size();
        for(int i = 0; i <100; i ++) {
            //Add a time prefix
            String body = dateStr + "Hello RocketMQ" + orderList.get(i%orderSize);
            Message msg = new Message("BinTestTopic02", tags [i%tags.length], "KEY" + i, body.getBytes());
            SendResult sendResult = null; //Order id
            try {
                sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List <MessageQueue> mqs, Message msg, Object arg) {
                        Long id =(Long) arg; //Select to send queue based on order id
                        long index = id%mqs.size();
                        return mqs.get((int) index);
                    }
                }, orderList.get(i%orderSize) .getOrderId());
            } catch(MQClientException e) {
                e.printStackTrace();
            } catch(RemotingException e) {
                e.printStackTrace();
            } catch(MQBrokerException e) {
                e.printStackTrace();
            } catch(InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue(). getQueueId(),
                    body));

        }

        //producer.shutdown();
    }

   /**
     * Steps of order
     * /
    private static class OrderStep {
        private long orderId;
        private String desc;

        public long getOrderId() {
            return orderId;
        }

        public void setOrderId(long orderId) {
            this.orderId = orderId;
        }

        public String getDesc() {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc;
        }

        @Override
        public String toString() {
            return "OrderStep {" +
                    "orderId =" + orderId +
                    ", desc = '" + desc +' \ '' +
                    '}';
        }
    }


   /**
     * Generate simulated order data
     * /
    private List <OrderStep> buildOrders() {
        List <OrderStep> orderList = new ArrayList <OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("Create");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("Create");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("Payment");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("Create");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("Payment");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("Payment");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("Done");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("Push");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("Done");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("Done");
        orderList.add(orderDemo);

        return orderList;
    }

Send results
Screenshot 2020-05-25 11.34.36.png

You can see that messages with the same orderId are sent to the same queueId

Sequential consumption
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BinConsumerGroup");
        Random r = new Random();
        String instanceName = "ConsumerInstance" + r.nextInt(100);
        consumer.setInstanceName(instanceName);
        consumer.setNamesrvAddr("106.14.186.226:7060");
        consumer.setConsumeTimeout(20000);

        consumer.registerMessageListener(new MessageListenerOrderly() {

            Random random = new Random();

            @Override
            public ConsumeOrderlyStatus consumeMessage(List <MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for(MessageExt msg:msgs) {
                    //You can see that each queue has a unique consume thread to consume, and the order is ordered for each queue(partition)
                    System.out.println("consumeThread =" + Thread.currentThread(). GetName() + "queueId =" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                }

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        try {
            consumer.subscribe("BinTestTopic02", "*");
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.setConsumeMessageBatchMaxSize(10);
            consumer.start();
        } catch(MQClientException e) {
            e.printStackTrace();
        }

Consumption result
Screenshot 2020-05-25 11.37.14.png

It can be seen that the messages consumed from the same queueId are consumed in the order sent to the queue

Delayed Message

The most typical application scenario for delayed messages is order status query. The user places an order, confirms whether the order is paid after 30 minutes, and cancels the current order if it is not paid.

        DefaultMQProducer producer = new DefaultMQProducer("BinProducerGroup");
        Random r = new Random();
        producer.setInstanceName("BinTestInstance_" + r.nextInt(1000));
        producer.setNamesrvAddr("106.14.186.226:7060");
        producer.setSendMsgTimeout(30000);
        producer.setMaxMessageSize(500000);
        producer.setRetryTimesWhenSendFailed(3);

        try {
            producer.start();
            Thread.sleep(8000);
        } catch(Exception e) {
            e.printStackTrace();
        }

        int totalMessagesToSend = 100;
        for(int i = 0; i <totalMessagesToSend; i ++) {
            Message message = new Message("BinTestTopic02",("Scheduled message_10s_" + i) .getBytes());
            //The duration of the delay level is as follows, from 1 to 18
            //private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
            message.setDelayTimeLevel(3); //10s
            //Send a message
            SendResult result = producer.send(message);
            System.out.println("Send result:" + result);
        }
        //close producer
        producer.shutdown();
    }
Increase the speed of consumption

Improve the parallel processing power of consumption

  • Under the same ConsumerGroup, increase the parallelism by increasing the number of Consumer instances(increasing consumer instances). The number of Consumer instances should not exceed the configured number of subscription queues

Screenshot 2020-05-25 19.29.06.png

  • Improve the consumption of parallel threads of a single Consumer by modifying the parameters consumeThreadMin, consumeThreadMax.

Bulk consumption news

  • Use setConsumeMessageBatchMaxSize to set the maximum number of messages in batches. Consume multiple messages at once

Related Posts