rocketMQ (4.6.1) series of tutorials-refining articles
Posted May 26, 2020 • 8 min read
1.namrsrv and broker
1.1, namesrv and broker architecture
From the perspective of the architecture diagram, the role of namesrv is the registration center. It is only a little special that namesrv does not communicate with each other.
1.2, Namesrv and broker communication
The broker registers its own information with namesrv every 30s. Namesrv checks non-responsive brokers within 120s every 10s and removes them.
Then there will be a problem with the communication model between namesrv and broker. That is, it takes at least 120s for namesrv to perceive the death of the broker.
1.3, rocketmq message model
In the rocketMQ message model, there are several main roles:producer, consumer, and queue. The communication method is as follows.
The figure mainly expresses several information:
- Topic is actually a logical structure, and queue is a physical structure, that is, rocketmq is consumed based on a queue
- Consumers consume in groups.
- In the figure, I specifically made groupA subscribe to topicA, but not topicB. The reason is that when rocketMQ was designed, it did not want a consumer to process multiple types of messages at the same time. Therefore, consumer responsibilities under the same consumerGroup should be the same, and do not do different things(ie, consume multiple topics).
2. Message sender
2.1, Three ways to send messages
- Synchronous sending
When sending synchronously, you need to wait for the broker to store the message in the commitlog file before returning, and the producer thread is blocked.
- Send asynchronously
When sending asynchronously, the task is submitted using the thread pool. Core thread, maximum number of threads = cpu core number. See DefaultMQProducerImpl. Send a message without waiting for the server to return the result.
- One-way sending
Just send it, regardless of success
2.2 Message sending process
The general process:verify the message => find the broker => select the queue => send the message.
2.3, the choice of broker
The broker information will be obtained from the local cache first. If it does not exist, the broker information will be obtained from namesrv.
When choosing a broker, it will avoid the broker that failed to send the last time.
Failure delay mechanism
DefaultMQProducer # setSendLatencyFaultEnable
After the failure to send the message, if the failure delay mechanism is enabled, the broker will be set to be unavailable for a certain period of time. And when selecting the queue, skip the broker.
2.4, Message Queue Selection
Code entry:TopicPublishInfo # selectOneMessageQueue.
Polling selection queue.(Each consumer maintains a counter internally. After each selection, the counter value will be + 1. The algorithm is, the value is%of the number of queues.)
2.5 Message sending
Sending a message means sending the message to the broker. If the message fails to be sent, it will retry according to the configured number of retries.
2.6 Batch sending
The so-called bulk sending is just to turn the message into a List and then transmit it at once.
3. Message storage
3.1. The message is sent out. After the broker receives the message, how to deal with it? How to store messages?
Before that, you need to talk about 3 important storage files of rocketMQ
commitlog A file that actually stores messages.
In order to speed up storage, rocketmq uses sequential writing. All messages, regardless of topic, are stored in the commitlog. Therefore, if you want to delete the news of a topic, you can hardly do it. You can only delete the commitlog file of rocketmq directly, or enable more consumers to consume the message.
There are a few things to know about commitlog.
When the disk usage exceeds 0.9(default), it will be forbidden to continue writing
To prevent too many commitlog files and excessive storage space. rocketMQ will clean up the file.
- rocketMQ scans expired files every 10s(default), and regularly cleans expired files at 4am(default) every day.
Here is a concept, what is an expired commitlog?
The concept of expiration refers to:Current time-the last modification time of commitlog> = 3d(default) becomes an expired file. This will cause a problem, and expired messages will be automatically cleared. If you plan to use rocketMQ to implement long delay messages, then this is not recommended.
- If the disk usage exceeds 0.85(default), the operation of deleting expired files will also be performed
- rocketMQ scans expired files every 10s(default), and regularly cleans expired files at 4am(default) every day.
We know that all topic messages are placed in the commit file, so that although the performance of writing is very high(sequential writing), the efficiency of reading is very low(random reading). So how does rocketMQ solve such a problem? Then this time the consumequeue comes on stage.
All messages are stored on the commitlog, so when you want to retrieve the messages, the efficiency is very slow. The consumequeue solves this problem, and it can be considered as the index file of commitlog.
So how does consumequeue speed up message retrieval?
The general process is as follows. Find the MappedFile and obtain the corresponding consumequeue according to the offset in the MappedFile. Then according to the commitlog offset in the consumequeue to find in the commitlog
The index file is created to improve the retrieval speed of commitlog and consumequeue.
The writing process of the index file is as follows:
Put index into the index entry first, then use the hashcode of the key as k, index subscript as v, put it in the hash slot, and finally update the IndexHead information.
When searching, first determine the index file to be searched, and then go to the hash slot to get the index position, and then go to the index entry to get the corresponding information.
After talking about 3 storage files, I also have a general understanding of how brokers store messages. Then, when brokers store messages, the main job is to put messages in these 3 files.
Store commitlog first. After the storage is completed, it is forwarded to the thread that exclusively stores consumequeue and index
4. Message consumption
4.1. After the broker receives the message, is the consumer able to pull the message immediately? Or does it have to wait for the broker to store the message before it can pull the message?
From the perspective of rocketMQ's message model, consumers pull messages from the queue. That is, you need to wait until rocketMQ flashes the message into the consumequeue before you can pull the message.
4.2. There is a problem here. When will rocketMQ flash the message into the consumequeue, if it is too slow, will it cause consumers to be unable to pull the message in time?
After flashing the commitlog every 10 milliseconds, the request will be forwarded to comsumequeue, indexFile. Finally, consume consumequeue and indexFile to disk. The consumer side pulls messages from the broker side in near real time.
4.3. If you want to consume messages, then you must pull the messages first. When pulling messages, there is a problem. How does rocketMQ know which queue to pull messages from?
rocketMQ will initialize the pull queue for each consumer. The default algorithm is the average distribution algorithm. For example, there are now 8 c1, c2, c3, c4, c5, c6, c7, c8. Suppose there are 3 a1, a2, a3. Then the queue allocated by each consumer is as follows.
a1:c1, c2, c3
a2:c4, c5, c6
Another recommended algorithm is the polling algorithm.
a1:c1, c4, c7
a2:c2, c5, c8
The load balancing of the queue is performed every 20s(default), and when a new consumer comes in, the load balancing of the queue is also performed.
After the consumer knows which queue to pull the message from, the next step is to pull the message.
When rocketMQ pulls messages, it will pull 32 messages at once. Put it in ProcessQueue, and then submit it to Thread Pool for processing.
4.4. If the consumer pulls too many messages, and the consumption speed cannot keep up, will the consumer pull messages endlessly? Cause the message to pile up even more?
Consumers will not pull messages endlessly. RocketMQ has a flow control function when pulling messages. Every time flow control will delay 50ms before continuing to pull messages.
So, under what circumstances will flow control be triggered?
- The number of messages in processQueue is greater than 1000, the size of messages in processQueue is greater than 100 MB, and the message will be pulled after a delay of 50 ms
- If the span of the message with the largest offset and the message with the smallest offset in the processQueue exceeds 2000, the message will be delayed by 50 ms before the message is pulled
The accumulation of rocketMQ messages will not only accumulate on the broker side, but also on the consumer side. Moreover, once a large amount of accumulation occurs on the consumer side, it must cause frequent gc and cpu soaring.
The accumulation on the consumer side means that the consumer consumption speed is too slow, but a lot of news is pulled and placed locally.
4.5. Next, if our consumption fails, what will RocketMQ do for us? Did it fail all the time?
Consumers, as long as they consume this message, regardless of failure or success, rocketMQ will think that the message has been consumed. If the message consumption fails, rocketMQ will resend a message with the content of the message and the message failed to Retry topic%RETRY%+ topic. And the delay level + 1.
There are 2 issues involved here.
What if the news continues to fail? What about rocketMQ?
The maximum retry of rocketMQ is 16 times(default). When the retry is completed, consumption still fails, so rocketMQ will put the message into the DLQ queue, which is what we call the dead letter queue.
Where does the delay level come from?
The delay level of rocketMQ needs to be configured in broker.conf. The default delay level is as follows.
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
4.6. The general message consumption process has been made clear. In the development process, such problems may be encountered. If a consumer is added in the middle, then at what point will the consumer start spending, is there a way for us to specify where the consumer will start spending?
You can call DefaultMQPushConsumer # setConsumeFromWhere to set
- ConsumeFromWhere.CONSUME \ _FROM \ _LAST \ _OFFSET:start consumption from the current maximum offset of the queue(default)
- ConsumeFromWhere.CONSUME \ _FROM \ _FIRST \ _OFFSET consumes from the earliest available message
- ConsumeFromWhere.CONSUME \ _FROM \ _TIMESTAMP starts consumption from the specified timestamp
But this is limited, that is, ConsumeFromWhere consumption progress verification will only be invalid when the consumption progress obtained from the disk returns -1. That is, the consumption group just created. If the consumption progress of the consumption group has already been recorded in the broker, the setting of this value is invalid.
5. Message filtering
Message filtering supports expression filtering. Expression filtering is divided into TAG and SQL92. Here mainly talk about commonly used TAG expressions.
A consumer can subscribe to multiple TAGs,
5.1. A question arises here, is the TAG expression filtered on the broker side or the consumer side?
This question arises because if you filter on the broker side, the broker will be under great pressure. If you filter on the consumer side, the consumer side will receive many useless messages. How does rocketMQ weigh?
rocketMQ's approach is more interesting. First, use the hash code of the tag to perform a quick filter on the broker side. When the consumer pulls, then filter based on the actual tag value.
5.2. There is also a problem. If two consumers are in the same group, subscribe to the same topic, but the tags are different. Will it work normally at this time?
Will not work properly.
Reason:These are 2 identical consumers, they form a cluster, although the tags are different. Then in cluster mode(default), there is absolutely only one consumer that will take effect. Therefore, the correct approach is to define different consumer groups and subscribe to the same topic. Use tag to distinguish