Install Zookeeper and Kafka cluster

Posted May 27, 202015 min read

Configure /etc/hosts file

vi/etc/hosts

# Add to
192.168.200.110 master
192.168.200.111 slave1
192.168.200.112 slave2

Install Zookeeper cluster

Download Zookeeper installation package

Download address:[ https://www.apache.org/dyn/cl...] ( https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.5.8/apache-zookeeper- 3.5.8-bin.tar.gz)

Create corresponding ZK data and log directories

# Create a ZK data directory, and also need to create myid to specify the ID of this node
mkdir -p/software/zookeeper/zkdata /
vi/software/zookeeper/zkdata/myid

# Create ZK log directory
mkdir/software/zookeeper/zklogs /

Modify the zoo.cfg file

First make a copy:

cp zoo_sample.cfg zoo.cfg

Start editing:

cat zoo.cfg

# The number of milliseconds of each tick
tickTime = 2000
# The number of ticks that the initial
# synchronization phase can take
initLimit = 10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit = 5
# the directory where the snapshot is stored.
# do not use/tmp for storage,/tmp here is just
# example sakes.
# dataDir =/tmp/zookeeper
# the port at which the clients will connect
clientPort = 2181
# the maximum number of client connections.
# increase this if you need to handle more clients
# maxClientCnxns = 60
##
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
##
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
##
# The number of snapshots to retain in dataDir
# autopurge.snapRetainCount = 3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
# autopurge.purgeInterval = 1

# ZK's data directory, myid is also inside
dataDir =/software/zookeeper/zkdata
# ZK's log directory
dataLogDir =/software/zookeeper/zklogs
# Each node in the cluster mode, if it is itself, needs to be configured as 0.0.0.0 instead of the master/IP address, otherwise there may be situations where zk cannot communicate with each other.(If you provide a public IP, the listener will not be able to connect to the port, you must specify 0.0.0.0 for the current node)
server.1 = 0.0.0.0:2888:3888
server.2 = slave1:2888:3888
server.3 = slave2:2888:3888

Start/stop ZK/view status/restart

zkServer.sh start

zkServer.sh stop

zkServer.sh status

zkServer.sh restart
# Connect ZK
[root @ master conf]# zkCli.sh -server 127.0.0.1:2181

# Display files in the root directory
[zk:127.0.0.1:2181(CONNECTED) 0]ls /
[admin, brokers, cluster, config, consumers, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]

# Display files in the root directory, and can see data such as the number of updates
[zk:127.0.0.1:2181(CONNECTED) 1]ls2 /
'ls2' has been deprecated. Please use 'ls [-s]path' instead.
[cluster, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x400000003
cversion = 10
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 10

# Create file and set initialization content
[zk:127.0.0.1:2181(CONNECTED) 3]create/lzhpo-test1 "hello, lzhpo-test1 ~"
Created/lzhpo-test1

# View file content
[zk:127.0.0.1:2181(CONNECTED) 6]get/lzhpo-test1
hello, lzhpo-test1 ~

# Modify file content
[zk:127.0.0.1:2181(CONNECTED) 7]set/lzhpo-test1 "test1"

[zk:127.0.0.1:2181(CONNECTED) 8]get/lzhpo-test1
test1

# Delete Files
[zk:127.0.0.1:2181(CONNECTED) 9]delete/lzhpo-test1

[zk:127.0.0.1:2181(CONNECTED) 10]ls /
[admin, brokers, cluster, config, consumers, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]

For the rest, please see the official documentation: https://zookeeper.apache.org/...

Install Kafka cluster

Download Kafka installation package

Download address:[ https://www.apache.org/dyn/cl...] ( https://www.apache.org/dyn/closer.cgi?path=/kafka/2.2.2/kafka_2.12 -2.2.2.tgz)

Turn on JMX monitoring(optional)

method 1

If you do not want to monitor Kafka, you can choose not to enable JMX monitoring.

Just like the monitoring below, if you write a Kafka monitoring platform yourself, you cannot get some information about Kafka without enabling JMX monitoring.

Modify KAFKA_HEAP_OPTS to half of the running memory of the host;

vi /software/kafka/kafka_2.12-2.2.0/bin/kafka-server-start.sh

Add export JMX_PORT =" 9999 ", which is to add JMX monitoring to monitor the Kafka cluster.

if ["x $KAFKA_HEAP_OPTS" = "x"]; then
    export KAFKA_HEAP_OPTS = "-Xmx2G -Xms2G"
    export JMX_PORT = "9999"
fi

~ Method 2(Abandoned) ~

This method will report an error when creating a topic:

Error:Exception thrown by the agent:java.rmi.server.ExportException:Port already in use:9988; nested exception is:
        java.net.BindException:Address in use(Bind failed)
sun.management.AgentConfigurationError:java.rmi.server.ExportException:Port already in use:9988; nested exception is:

# Modify kafka-run-class.sh
vi /software/kafka/kafka_2.12-2.2.0/bin/kafka-run-class.sh

# The first line is added(that is, JMX monitoring is turned on and the JMX monitoring port is established)
JMX_PORT = 9988

# Some servers may not be able to bind ip correctly, at this time we need to display the host specified binding
# In the parameter KAFKA_JMX_OPTS, add one, which is to specify the service IP
# Different hosts need to be modified to corresponding IP addresses
-Djava.rmi.server.hostname = 192.168.10.110

Modify bin/kafka-server-start.sh

This is where I optimize, optimize the JVM.

vi /software/kafka/kafka_2.12-2.2.0/bin/kafka-server-start.sh

# Adjust the value of KAFKA_HEAP_OPTS = "-Xmx16G -Xms16G"
# Recommended configuration:Generally, the size of HEAP SIZE does not exceed 50%of the host memory.

Modify config/server.properties

vi /software/kafka/kafka_2.12-2.2.0/bin/server.properties

# Amendment 1:broker.id, unique ID in the cluster, different hosts cannot be the same.
broker.id = 0

# Amendment 2:Open addresses to the external network. Different hosts need to be modified to corresponding addresses
listeners = PLAINTEXT://192.168.200.110:9092

# Modification 3:Monitor address. Different hosts need to be modified to corresponding addresses
advertised.listeners = PLAINTEXT://192.168.200.110:9092

# Amendment 4:socket maximum request, int type, cannot exceed the maximum value of int type
socket.request.max.bytes = 2147483600

# Modification 5:the number of partitions
num.partitions = 6

# Amendment 6:Whether to enable log compression, generally do not need to enable, if enabled, can improve performance
log.cleaner.enable = false

# Amendment 7:zookeeper cluster address
zookeeper.connect = 192.168.200.110:2181,192.168.200.111:2181,192.168.200.112:2181

# Amendment 8:Whether to allow deletion
# Whether to allow to delete topic
delete.topic.enable = true

# Modification 9:The directory where the message is stored. This directory can be configured as "," comma-separated expression. The above num.io.threads should be greater than the number of this directory. If multiple directories are configured, the newly created The topic where he persists the message is that in the current directory separated by commas, the one with the least number of partitions is placed
log.dirs =/software/kafka/kafka-logs

# Create a directory for storing messages
mkdir -p/software/kafka/kafka-logs /

For Kafka tuning, please see my article: https://www.lzhpo.com/article...

Detailed description of configuration file tuning(description example)

# The unique identifier of the current machine in the cluster is the same as the myid nature of zookeeper. It is recommended to use the last three digits of your own host. Each(host) broker is inconsistent
broker.id

# The current port for kafka to provide external services is 9092 by default. Producer(producer) should use this port as the standard [before kafka-0.1.x]
#port

# This parameter is off by default, there is a bug in 0.8.1, DNS resolution problem, failure rate problem.(Just fill in the address of the machine) [Before kafka-0.1.x]
# host.name

# Kafka started port [before kafka-0.1.x]
# advertised.port = 9092
# Kafka's local IP address [before kafka-0.1.x]
# advertised.host.name = 192.168.10.130

# Monitor
listeners = PLAINTEXT://192.168.10.130:9092

# Open to the outside world. After kafka-0.1.x is modified like this, the above open port and address method is enabled
advertised.listeners = PLAINTEXT://192.168.10.130:9092

# This is the number of borker threads for network processing
# [Optimization:num.network.threads mainly handles network io, reads and writes buffer data, there is basically no io wait, and the number of configured threads is the number of CPU cores plus 1.  
num.network.threads = 3

# This is the number of threads that Borker performs I/O processing
# [Optimization:num.io.threads mainly performs disk io operations. There may be some io waiting during peak hours, so the configuration needs to be larger. The number of configured threads is 2 times the number of CPU cores, and the maximum is no more than 3 times.  
num.io.threads = 8

# The directory where the message is stored. This directory can be configured as "," comma-separated expression. The above num.io.threads should be greater than the number of this directory. If multiple directories are configured, the newly created topic will send the message The place of persistence is that in the current directory separated by comma, the one with the least number of partitions is placed.
log.dirs =/software/kafka/kafka-logs

# Whether to allow to delete topic
delete.topic.enable = true

# Send buffer size, the data is not sent at once, it is stored in the buffer first and then sent after reaching a certain size, which can improve performance
socket.send.buffer.bytes = 102400

# kafka receive buffer size, serialized to disk when data reaches a certain size
socket.receive.buffer.bytes = 102400

# This parameter is the maximum number of requests to Kafka or messages to Kafka. This value cannot exceed the Java stack size
# [Optimization:This is the value of int type, int type range is -2147483648 ~ 2147483647. It cannot be exceeded. After exceeding the error:org.apache.kafka.common.config.ConfigException:Invalid value 8589934592 for configuration socket.request.max.bytes:Not a number of type INT If you are tangled, set it to 2147483600 as I recommend
socket.request.max.bytes = 104857600

# The default number of partitions, a topic defaults to 1 partition number
# [Optimization:The default number of partitions is 1. If the number of partitions is not specified when the topic is created, this value is used by default. The selection of the number of partitions will also directly affect the throughput performance of the Kafka cluster. Too small a configuration will affect the consumption performance. It is recommended to change to 6.  
num.partitions = 1

# The maximum duration of the default message, 168 hours, 7 days
log.retention.hours = 168

# The maximum value of message storage is 5M
message.max.byte = 5242880

# kafka save the number of copies of the message, if one copy is invalid, the other can continue to provide services
default.replication.factor = 2

# Get the maximum direct number of messages
replica.fetch.max.bytes = 5242880

# This parameter is:because the kafka message is appended to the file, when this value is exceeded, kafka will start a new file
log.segment.bytes = 1073741824

# Check the log expiration time configured above(log.retention.hours = 168) every 300,000 milliseconds, go to the directory to see if there are expired messages, if there are, delete them
log.retention.check.interval.ms = 300000

# Whether to enable log compression, generally do not need to enable, if enabled, can improve performance
log.cleaner.enable = false

# Set the connection port of zookeeper to use this port when consuming
zookeeper.connect = 192.168.10.130:2181,192.168.10.128:2181,192.168.10.129:2181
# zookeeper connection timeout
zookeeper.connection.timeout.ms = 6000

# [Optimization:In order to greatly improve producer write throughput, it is necessary to write files in batches on a regular basis. Generally, there is no need to change it. If the amount of data in the topic is small, consider reducing log.flush.interval.ms and log.flush.interval.messages to force the data to be flushed, reducing the inconsistency that may be caused by the cache data not being written to the disk. It is recommended to configure message 10000 and interval 1s.  
# When the producer writes 10,000 messages, flash the data to disk
log.flush.interval.messages = 10000
# Every 1 second interval, flash data to disk
log.flush.interval.ms = 1000

The default config/server.properties configuration file:

# Licensed to the Apache Software Foundation(ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
#(the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
##
# http://www.apache.org/licenses/LICENSE-2.0
##
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

########################### Server Basics ################## ##########

# The id of the broker. This must be set to a unique integer for each broker.
broker.id = 0

############################ Socket Server Settings ################# ###########

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
# listeners = PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
# advertised.listeners = PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
# listener.security.protocol.map = PLAINTEXT:PLAINTEXT, SSL:SSL, SASL_PLAINTEXT:SASL_PLAINTEXT, SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads = 3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads = 8

# The send buffer(SO_SNDBUF) used by the socket server
socket.send.buffer.bytes = 102400

# The receive buffer(SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes = 102400

# The maximum size of a request that the socket server will accept(protection against OOM)
socket.request.max.bytes = 104857600


############################ Log Basics ################## ##########

# A comma separated list of directories under which to store log files
log.dirs =/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions = 1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir = 1

############################ Internal Topic Settings ################# ###########
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor = 1
transaction.state.log.replication.factor = 1
transaction.state.log.min.isr = 1

############################ Log Flush Policy ################# ###########

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability:Unflushed data may be lost if you are not using replication.
# 2. Latency:Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput:The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages(or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
# log.flush.interval.messages = 10000

# The maximum amount of time a message can sit in a log before we force a flush
# log.flush.interval.ms = 1000

############################ Log Retention Policy ################# ###########

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever * either * of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours = 168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
# log.retention.bytes = 1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes = 1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms = 300000

############################ Zookeeper #################### #########

# Zookeeper connection string(see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect = localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms = 6000


############################ Group Coordinator Settings ################# ###########

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms = 0

After modification, distribute to other machines

scp -r/software/kafka/slave1:/software/kafka /

scp -r/software/kafka/slave2:/software/kafka /

Modify the parameters of other machines

config/server.properties

The parameters that need to be modified are modified to the actual address of their host:

broker.id, listeners, advertised.listeners

bin/kafka-run-class.sh

What needs to be modified:

\ -Djava.rmi.server.hostname

# Start kafka
[root @ master /]# kafka-server-start.sh -daemon /software/kafka/kafka_2.12-2.2.0/config/server.properties

# Stop kafka
[root @ master /]# kafka-server-stop.sh

# Create topic, set the number of partitions to 3 and the number of copies to 6
[root @ master kafka-logs]# kafka-topics.sh --create --zookeeper 192.168.200.110:2181,192.168.200.111:2181,192.168.200.112:2181 --replication-factor 3 --partitions 6 --topic lzhpo-topic-test06
Created topic lzhpo-topic-test06.

# View all topics in the cluster
[root @ master kafka-logs]# kafka-topics.sh --list --zookeeper 192.168.200.110:2181,192.168.200.111:2181,192.168.200.112:2181
__consumer_offsets
lzhpo-topic-test01
lzhpo-topic-test02
lzhpo-topic-test03
lzhpo-topic-test04
lzhpo-topic-test05
lzhpo-topic-test06
lzhpo-topic-test3
lzhpo-topic-test4
topic-test-01

# Start the consumer consumer on the slave2 node and listen to the topic of lzhpo-topic-test3
[root @ slave2 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.200.112:9092 --topic lzhpo-topic-test05 --from-beginning


# Start a producer send message in slave1
[root @ slave1 ~]# kafka-console-producer.sh --broker-list 192.168.200.111:9092 --topic lzhpo-topic-test05

The slave2 node starts a consumer:

The slave1 node starts a producer to send a message:

# View topic information
[root @ master kafka-logs]# kafka-topics.sh --describe --zookeeper master:2181, slave1:2181, slave2:2181 --topic lzhpo-topic-test05
Topic:lzhpo-topic-test05 PartitionCount:6 ReplicationFactor:3 Configs:
    Topic:lzhpo-topic-test05 Partition:0 Leader:2 Replicas:2,0,1 Isr:2,0,1
    Topic:lzhpo-topic-test05 Partition:1 Leader:0 Replicas:0,1,2 Isr:0,1,2
    Topic:lzhpo-topic-test05 Partition:2 Leader:1 Replicas:1,2,0 Isr:1,2,0
    Topic:lzhpo-topic-test05 Partition:3 Leader:2 Replicas:2,1,0 Isr:2,1,0
    Topic:lzhpo-topic-test05 Partition:4 Leader:0 Replicas:0,2,1 Isr:0,2,1
    Topic:lzhpo-topic-test05 Partition:5 Leader:1 Replicas:1,0,2 Isr:1,0,2

Kafka tuning

Tuning parameters:

  • replication.factor:This is the parameter set for the topic, that is, the number of partitions. The value of this parameter must be greater than 1, and the default is 1, which requires that each partition must have at least 2 copies. Although the Kafka cluster is highly available, the topic may not be available when a broker is down.

    • Method 1:Set when the topic is created.

        bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 6 --topic lzhpo01

      View modification:

        bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic lzhpo01
    • Method 2:Dynamically add replication-factor to already created topics

      a. First we configure a copy of the topic and save it as a json file()
      For example, we want to set the part of lzhpo01 to 3,(my kafka cluster has 3 brokers with ids of 0,1,2), and the json file name is increase-replication-factor.json:

        {"version":1,
        "partitions":[
        {"topic":"lzhpo01", "partition":0, "replicas":[0,1,2]},
        {"topic":"lzhpo01", "partition":1, "replicas":[0,1,2]},
        {"topic":"lzhpo01", "partition":2, "replicas":[0,1,2]}
       ]}

      b. Execute the script:

        bin/kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 --reassignment-json-file increase-replication-factor.json --execute

      c. View the modified topic copy factor:

        bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic lzhpo01

      Refer to the official documentation: https://kafka.apache.org/docu...

  • min.insync.replicas:This is a parameter set for the Kafka server. This parameter value must also be greater than 1, meaning that a leader must have at least one follower to keep in touch with itself, that is, the data and the leader have always been synchronized, This will ensure that the Leader hangs up, and at least one Follower will not lose data.

    Set min.insync.replicas:

      bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 6 --min.insync.replicas 2 --topic lzhpo01
  • acks = all:This is set on the producer side, which requires that each piece of data must be written to all replicas before it can be considered as written successfully.

    For example, if SpringBoot integrates Kafka, it can be configured directly in application.yml:

Producer's acks set to all.png

  • ~ retries = MAX:This is to request unlimited recharge once the writing fails, stuck here. ~

    I haven't studied this, slightly ...

Welcome to follow me

A programmer who loves technology and basketball.

My blog: https://www.lzhpo.com

My WeChat public account:a program that can play basketball