kafka study notes
Posted May 27, 2020 • 7 min read
What is Kafka?
Apache kafka is an open source distributed message queue written by Scala. It is an open source messaging system project developed by the Apache Software Foundation. The purpose of this project is to provide a high-throughput, low-latency platform for processing real-time data.
Two, kafka architecture
1) Producer:Message producer, client that sends messages to kafka broker
2) Consumer:Message consumer, client that fetches messages from kafka broker
3) Topic:classify messages
4) Consumer Group(GC):This is the method that kafka uses to implement the broadcast(to all consumers) and unicast(to any consumer) of a topic message. A topic can have multiple CGs. Topic's messages will be copied(not really copied, conceptually) to all CGs, but each partion will only send messages to a consumer in that CG. If you need to implement broadcasting, as long as each consumer has an independent CG. To achieve unicast as long as all consumers are in the same CG. Consumers can also be grouped freely with CG without sending messages to different topics multiple times
5) Broker:A kafka server is a Broker, and a cluster consists of multiple Brokers. A Broker can accommodate multiple topics
6) Partition:In order to achieve scalability, a very large topic can be distributed to multiple brokers(ie servers), a topic can be divided into multiple partitions, each partition is an ordered queue. Each message in the partition will be assigned an ordered id(offset). Kafka only guarantees to send messages to the order in a partition
consumer, does not guarantee the order of the whole(between multiple partitions) of a topic
7) Offset:message offset, easy to find.
Producer writes a message to Kafka
1) The producer first finds the leader of the partition from the "/brokers/.../state" node of zookeeper
2) The producer sends a message to the leader
3) The leader writes the message to the local log
4) Followers pull messages from the leader, write to the local log and send ACK to the leader
5) After the leader receives the ACK of the replication in all ISRs, it adds HW(high watermark, the last committed offset) and sends an ACK to the producer
Fourth, the relationship between the number of Kafka partitions and the number of consumers
A partition under the topic can only be consumed by a consumer thread under the same consumer group, but the reverse is not true, that is, a consumer thread can consume data from multiple partitions, such as the ConsoleConsumer provided by Kafka, the default is just One thread to consume all partition data. That is, the number of partitions determines the upper limit of the number of consumers in the same group, so if your partition number is N, then the best number of threads is also maintained as N, so that you can usually achieve maximum throughput. ** Configurations over N are just a waste of system resources, because the extra threads will not be allocated to any partition.
Kafka provides two consumer consumption partition allocation strategies:range and roundrobin, specified by the parameter partition.assignment.strategy, the default is the range strategy. Specific reference: https://www.jianshu.com/p/dbbca800f607
How does Kafka achieve high availability?
After Kafka 0.8, HA mechanism is provided, which is the replica mechanism. The data of each partition will be synchronized to other machines to form its own multiple replicas. All replicas will elect a leader, then production and consumption will deal with this leader, and then other replicas will be followers. When writing, the leader will be responsible for synchronizing the data to all followers. When reading, the data on the leader can be read directly.
Because if a broker goes down, the partition on that broker has copies on other machines. If there is a leader of a partition on this down broker, then a new leader will be re-elected from the follower, and everyone can continue to read and write that new leader. This is called high availability.
When writing data, the producer writes the leader, and then the leader writes the data to the local disk, and then other followers take the initiative to pull the data from the leader. Once all followers have synchronized the data, they will send ack to the leader. After the leader receives the acks from all followers, it will return a successful write message to the producer.(Of course, this is just one of the modes, and this behavior can be adjusted appropriately)
When consuming, it will only be read from the leader, but only when a message has been successfully returned to ack by all followers, this message will be read by the consumer.
How does kafka ensure that messages are not repeatedly consumed?(Idempotence of message consumption)
Need to combine specific business to analyze
- For example, if you take a piece of data to write to the library, you first check it according to the primary key. If this data is available, you should not insert it. Update it.
- For example, if you are writing Redis, that's no problem, anyway, it is set every time, natural idempotency.
- For example, if you are not in the above two scenarios, it is a little more complicated. When you need to let the producer send each piece of data, add a globally unique id, something like the order id, and then you consume it here. , According to this id to check for example in Redis, have you consumed it before? If it has not been consumed, you will deal with it, and then write Redis with this id. If you have spent it, then you do n t have to deal with it, make sure you do n t process the same message repeatedly.
- For example, based on the unique index of the database to ensure that duplicate data will not be inserted repeatedly. Because of the unique key constraint, repeated data insertion will only report an error, and will not cause dirty data in the database.
How does kafka ensure the reliable transmission of messages?
Consumer lost data
The only situation that may cause consumers to lose data is to say that you consume this message, and then the consumer automatically submits an offset, so that Kafka thinks that you have consumed the news, but in fact you have just prepared Dealing with this message, you haven't dealt with it, you just hung up, and this message is lost.
Kafka will automatically submit the offset, so as long as off the automatic submission offset, and manually submit the offset after processing, you can ensure that the data will not be lost. But at this time it is still possible that there may be repeated consumption **. For example, you have just processed it and have not submitted the offset, and the result is that you have hung yourself. At this time, you will definitely repeat the consumption once, and it is good to guarantee idempotency yourself.
A problem encountered in the production environment is that our Kafka consumers consume data and then write it to a memory queue to buffer it. As a result, sometimes you just write the message to the memory queue, and then the consumer will automatically Submit an offset. Then we restart the system at this time, which will cause the data in the memory queue to be lost before it can be processed.
kafka lost data
This is a common scenario where a broker in Kafka goes down and then re-elects the leader of the partition. Think about it, if other followers happen to have some data out of sync at this time, then the leader hangs at this time, and then after a follower is elected as leader, some data is missing.
At this time, it is generally required to set at least the following 4 parameters:
- Set the
replication.factorparameter for topic:this value must be greater than 1, and each partition must have at least 2 copies.
- Set the
min.insync.replicasparameter on the Kafka server:this value must be greater than 1, this requires a leader to at least perceive that there is at least one follower and keep in touch with himself, without leaving the queue, so as to ensure that the leader hangs A follower.
acks = allon the producer side:this requires that each piece of data must be written to all replicas before it can be considered a successful write.
retries = MAXon the producer side(a large, large, and large value, meaning unlimited retry):this is requires unlimited retry once the write fails, stuck here .
After this configuration, at least on the Kafka broker side, it can be ensured that when the leader's broker fails, data will not be lost when the leader is switched.
Producer lost data
If you set
acks = all according to the above idea, it will not be lost. The requirement is that your leader receives the message and all followers have synchronized to the message before they think that the write was successful. If this condition is not met, the producer will automatically retry continuously and try again indefinitely.
How to deal with the accumulation of news?
A lot of news has been backlogged in mq for several hours and has not been resolved
Tens of millions of data have been backlogged in MQ for seven or eight hours, from more than 4pm to more than 11pm. This is a scene that we have actually encountered. It is indeed an online failure. At this time, otherwise it is to repair the consumer's problem, let it restore the consumption speed, and then wait for a few hours to complete the consumption. This must not be said during the interview.
One consumer is 1,000 items per second, three consumers are 3000 items per second, and 180,000 items per minute. So if you backlog millions to tens of millions of data, even if the consumer recovers, it will take about 1 hour to recover.
Generally, at this time, only temporary emergency capacity expansion is possible. The specific operation steps and ideas are as follows:
- Fix the consumer's problem first, make sure it resumes consumption, and then stop all existing consumers.
- Create a new topic, the partition is 10 times the original, and temporarily create the original 10 times the number of queues.
- Then write a temporary consumer program that distributes the data. This program is deployed to consume the backlog of data. No time-consuming processing after consumption. Directly evenly poll the written queue that is temporarily created to 10 times the number.
- Then temporarily requisition 10 times the machine to deploy consumers, and each batch of consumers consumes a temporary queue of data. This approach is equivalent to temporarily expanding queue resources and consumer resources by 10 times and consuming data at a normal speed of 10 times.
- After the rapid consumption of the backlog data, have to restore the originally deployed architecture, ** re-use the original consumer machine to consume the message.