[canal series] [1] canal + mysql + canaladmin + rocketmq deployment

Posted May 27, 20208 min read

1. Prepare the installation package

1. Download canal.deployer-1.1.4
# wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
2. Download canal.admin-1.1.4
# wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
3. Download rocketmq-all-4.7.0 and rocketmq-console.jar
# wget http://mirror.bit.edu.cn/apache/rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip
# wget https://github.com/eacdy/rocketmq-externals/releases/download/RocketMQ-Console-for-4.5.1/rocketmq-console-ng-1.0.1.jar
4. Download zookeeper-3.4.14
# wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

Second, deploy zookeeper and start a single machine

1. Modify environment variables

Edit the/etc/profile file and add the following environment variable configuration at the end of the file

# ZooKeeper Env
export ZOOKEEPER_HOME =/usr/local/zookeeper
export PATH = $PATH:$ZOOKEEPER_HOME/bin

Run the following command to make the environment variables effective:source/etc/profile

2. Rename the configuration file

When using ZooKeeper for the first time, you need to rename zoo \ _sample.cfg in the $ZOOKEEPER \ _HOME/conf directory to zoo.cfg, zoo.cfg

# mv /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
3. Stand-alone mode-modify the configuration file

Create directories/usr/local/zookeeper/data and/usr/local/zookeeper/logs, and modify the zoo.cfg configuration file

tickTime = 2000
initLimit = 10
syncLimit = 5
dataDir =/usr/local/zookeeper/data
dataLogDir =/usr/local/zookeeper/logs
clientPort = 2181
4. Start ZooKeeper service
# cd /usr/local/zookeeper/zookeeper-3.4.11/bin
# ./zkServer.sh start
# zkServer.sh status

3. Deploy rocketmq and start rocket-console to monitor consumption

1. Deploy rocketmq
①, modify the configuration file
# cd /usr/rocketmq/rocketmq-all-4.7.0-bin-release
# vim conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# Fill in the local ip here, if it is open to the public network, fill in the public network ip
brokerIP1 = Public IP
②, mqnamesrv must be started first
# cd ..
# nohup sh bin/mqnamesrv & ## Startup command
# tail -f ~/logs/rocketmqlogs/namesrv.log ## View log
③, and then start mqbroker
# nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf autoCreateTopicEnable = true &
# tail -f ~/logs/rocketmqlogs/broker.log
2. RocketMQ visual interface
# nohup java -jar rocketmq-console-ng-1.0.0.jar --server.port = 17890 --rocketmq.config.namesrvAddr = rocketmq address:9876

3.jpg

4. Deploy canal and canal.admin

1. Deploy canal.admin
① 、 Modify the content of /usr/canal/canal.admin-1.1.4/conf/application.yml
server:
  port:8089
spring:
  jackson:
    date-format:yyyy-MM-dd HH:mm:ss
    time-zone:GMT + 8

spring.datasource:
  address:127.0.0.1:3306
  database:canal_manager
  username:root
  password:database password
  driver-class-name:com.mysql.jdbc.Driver
  url:jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}? useUnicode = true & characterEncoding = UTF-8 & useSSL = false
  hikari:
    maximum-pool-size:30
    minimum-idle:1

canal:
  adminUser:admin
  adminPasswd:123456
②, execute the /usr/canal/canal.admin-1.1.4/conf/canal_manager.sql script in the specified DB, and also change the contents of /etc/my.cnf of mysql by the way Restart mysql
port = 3306
datadir =/var/lib/mysql
socket =/var/lib/mysql/mysql.sock

# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links = 0

log-error =/var/log/mysqld.log
pid-file =/var/run/mysqld/mysqld.pid
log-bin =/var/lib/mysql/mysql-bin
binlog-format = ROW
server-id = 1
③ 、 Enter /usr/canal/canal.admin-1.1.4 and execute the startup script
./bin/startup.sh
④, enter http://ip :8089 in the browser, after logging in, click cluster management and add a new cluster(name:canal \ _cluster, cluster address:zk Service ip:2181)

1.jpg

⑤ 、 Edit the cluster configuration, load the template and make changes
##############################################
######### common argument #############
##############################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
canal.user = canal
canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
canal.admin.manager = 172.18.209.186:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

# canal zk cluster, if you want to access the external network, fill in the external network ip here
canal.zkServers = canal zk cluster ip:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = RocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2, n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size, default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
# canal.instance.detecting.sql = insert into retl.xdual values ​​(1, now()) on duplicate key update x = now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = true
canal.instance.filter.query.dml = true
canal.instance.filter.query.ddl = true
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW, STATEMENT, MIXED
canal.instance.binlog.image = FULL, MINIMAL, NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60%available processors, suggest not to exceed Runtime.getRuntime(). availableProcessors()
# canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2; CACHE_SIZE = 1000; MODE = MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire, default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk, support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

##############################################
######### destinations #############
##############################################
# Here you can build instance.properties later and fill in(instance name, separated by multiple English commas)
canal.destinations = test1
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
# canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
# canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
# canal.instance.global.spring.xml = classpath:spring/default-instance.xml

################################################
######### MQ #############
################################################
# mq address, the cluster fills in the cluster address, and the single machine fills in the single machine address
canal.mq.servers = rocketmq ip address:9876
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# canal.mq.properties. =
canal.mq.producerGroup = canal_coupon_group
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
# canal.mq.namespace =

################################################
######### Kafka Kerberos Info #############
################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
2. Deploy canal cluster
# register ip
canal.register.ip = IP exposed by the service where canal is located
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# admin auto register
canal.admin.register.auto = true
# Here is the canal cluster name, fill in the cluster name filled in canal.admin management
canal.admin.register.cluster = canal_cluster
② 、 Enter the /usr/canal/canal-1.1.4 directory and start the canal service
# cd /usr/canal/canal-1.1.4
# ./bin/start.sh

At this time, you can see the started canal server in Server management in the canal.admin management system
2.jpg

3. Create instance.properties and make actual modifications
##############################################
## mysql serverId, v1.0.26 + will autoGen
# canal.instance.mysql.slaveId = 0

# enable gtid use true/false
canal.instance.gtidon = false

# position info
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
canal.instance.master.gtid =

# rds oss binlog
canal.instance.rds.accesskey =
canal.instance.rds.secretkey =
canal.instance.rds.instanceId =

# table meta tsdb info
canal.instance.tsdb.enable = true
# canal.instance.tsdb.url = jdbc:mysql://127.0.0.1:3306/canal_tsdb
# canal.instance.tsdb.dbUsername = canal
# canal.instance.tsdb.dbPassword = canal

# canal.instance.standby.address =
# canal.instance.standby.journal.name =
# canal.instance.standby.position =
# canal.instance.standby.timestamp =
# canal.instance.standby.gtid =

# username/password
canal.instance.dbUsername = root
canal.instance.dbPassword = Database password
canal.instance.defaultDatabaseName = Database name
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid = false
# canal.instance.pwdPublicKey = MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCPUKUQU

# table regex
# canal.instance.filter.regex =. * \\ .. *
# Fill in the table or field to be filtered here, multiple separated by commas
canal.instance.filter.regex = yunjicoupon \\. t_fullcoupon_user. *
# table black regex
canal.instance.filter.black.regex =
# table field filter(format:schema1.tableName1:field1/field2, schema2.tableName2:field1/field2)
# canal.instance.filter.field = test1.t_product:id/subject/keywords, test2.t_company:id/name/contact/ch
# table field black filter(format:schema1.tableName1:field1/field2, schema2.tableName2:field1/field2)
# canal.instance.filter.black.field = test1.t_product:subject/product_image, test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic = canal_coupon_topic
# dynamic topic route by schema or table regex
# canal.mq.dynamicTopic = mytest1.user, mytest2 \\ .. *,. * \\ .. *
canal.mq.partition = 0
# hash partition config
# canal.mq.partitionsNum = 3
# canal.mq.partitionHash = test.table:id ^ name,. * \\ .. *
##############################################

At this time, the canal.destinations value in the canal cluster configuration can be changed, and all deployments have been completed. The next section writes mq consumption for db data synchronization.