Use canal to incrementally subscribe to MySQL binlog

Posted May 28, 202016 min read

【Please indicate the source】 https://segmentfault.com/a/1190000022767293

Based on database incremental log analysis, providing incremental data subscription & consumption, currently mainly supports mysql.
In the early days, Alibaba's B2B company had the need to synchronize services across the two computer rooms due to the deployment in Hangzhou and the United States. However, the early database synchronization business was mainly based on the trigger method to obtain incremental changes. However, since 2010, the Alibaba company began to gradually try to analyze the log based on the database to obtain incremental changes for synchronization. The subscription and consumption business has started a new era.

ps. The current internal version already supports log analysis for some versions of mysql and oracle. The current open source version of canal supports versions 5.7 and below(Alibaba internal mysql 5.7.13, 5.6.10, mysql 5.5.18 and 5.1.40/48)

Business based on log incremental subscription & consumption support:

  • Database mirroring
  • Real-time database backup
  • Multi-level index(separate library index for sellers and buyers)
  • search build
  • Business cache refresh
  • Important business news such as price changes

1. How Canal works

mysql master-slave replication implementation

image.png

From the upper level, copying is divided into three steps:

  1. The master will record the changes in the binary log(binary log)(these records are called binary log events, binary log events, which can be viewed through show binlog events);
  2. The slave copies the master's binary log events to its relay log(relay log);
  3. The slave redoes the events in the relay log, which will change to reflect its own data.
How canal works:

image.png

The principle is relatively simple:

  1. Canal simulates the interactive protocol of mysql slave, pretends to be mysql slave, and sends a dump protocol to mysql master
  2. The mysql master receives the dump request and starts to push the binary log to the slave(that is, canal)
  3. Canal parses the binary log object(originally a byte stream)
Architecture

image.png
Description:

  • server represents a canal running instance, corresponding to a jvm
  • instance corresponds to a data queue(1 server corresponds to 1. n instances)

instance module:

  • eventParser(data source access, simulate slave protocol to interact with master, protocol analysis)
  • eventSink(Parser and Store linker, data filtering, processing, distribution)
  • eventStore(data storage)
  • metaManager(incremental subscription & consumption information manager)
EventParser design

The general process:
image.png
The entire parser process can be roughly divided into several steps:

  1. Connection obtains the location where the last resolution was successful(if it is started for the first time, the initial specified location or the binlog location of the current database)
  2. Connection establishes a link and sends the BINLOG \ _DUMP command

//0. write command number
//1. write 4 bytes bin-log position to start at
//2. write 2 bytes bin-log flags
//3. write 4 bytes server id of the slave
//4. write bin-log file name

  1. Mysql starts pushing Binaly Log
  2. The received Binaly Log will be parsed by Binlog parser to add some specific information

//Add field name, field type, primary key information, unsigned type processing

  1. Passing to the EventSink module for data storage is a blocking operation until the storage is successful
  2. After the storage is successful, regularly record the location of Binaly Log

Binlay Log network protocol of mysql:
image.png
Description:

https://dev.mysql.com/doc/internals/en/event-structure.html
https://dev.mysql.com/doc/internals/en/binlog-event.html

EventSink Design

image.png

Description:

  • Data filtering:support wildcard filtering mode, table name, field content, etc.
  • Data routing/distribution:solve 1:n(one parser corresponds to multiple store mode)
  • Data merge:solve n:1(multiple parser corresponds to 1 store)
  • Data processing:additional processing such as join before entering the store

Data 1:n business
In order to make reasonable use of database resources, the common business is generally isolated according to the schema, and then a data source routing is performed on the upper level of mysql or dao to shield the impact of the physical location of the database on development. The Alibaba system mainly uses cobar/tddl to solve the data source routing problem.
Therefore, on a database instance, multiple schemas will be deployed, and each schema will be followed by one or more business parties.

Data n:1 business
Similarly, when the data size of a business reaches a certain level, it will inevitably involve the problem of horizontal split and vertical split. When the data for these splits needs to be processed, it is necessary to link multiple stores for processing. Sites will become multiple copies, and the progress of data consumption cannot be guaranteed as orderly as possible.
Therefore, in certain business scenarios, it is necessary to merge the split incremental data, such as sorting and merge according to timestamp/global id.

EventStore Design
  1. At present, only the Memory mode is implemented, and subsequent plans to add local file storage and mixed mixed mode
  2. Drawing on the implementation ideas of Disruptor's RingBuffer

RingBuffer design:
image

3 cursors are defined

  • Put:The last write position of the data storage by the Sink module
  • Get:The last time the data subscription was retrieved
  • Ack:The last consumption position for successful data consumption

Drawing on the implementation of Disruptor's RingBuffer, straighten the RingBuffer:
image

Implementation instructions:

  • Put/Get/Ack cursor is used for increment, using long storage
  • Buffer get operation, by taking the remainder or operation.(And operation:cusor &(size-1), size needs to be an exponent of 2 and the efficiency is relatively high)
Instance design

image.png

instance represents an actual running data queue, including EventPaser, EventSink, EventStore and other components.

CanalInstanceGenerator is abstracted, mainly considering the configuration management method:

  • Manager mode:Connect with your own internal web console/manager system.(Currently mainly used internally by the company)
  • Spring method:define based on spring xml + properties, build spring configuration.
Server design

image.png

The server represents a running instance of canal. In order to facilitate the use of components, the two implementations of Embeded(embedded)/Netty(network access) are specifically abstracted.

  • Embeded:There are relatively high requirements for latency and availability, and I can hold distributed related technologies(such as failover)
  • Netty:Based on netty, a layer of network protocol is encapsulated, and the availability is guaranteed by the canal server. The pull model used is of course slightly discounted, but this also depends on the situation.(Ali's notify and metaq, typical push/pull models, are currently gradually moving closer to the pull model. Push will have some problems when the amount of data is large)
Incremental subscription/consumption design

image.png
For the specific protocol format, see:[CanalProtocol.proto]( https://github.com/alibaba/canal/blob/master/protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalProtocol . proto)

Introduction to get/ack/rollback protocol:

  • Message getWithoutAck(int batchSize), allows you to specify batchSize, you can get multiple items at a time, the object returned each time is Message, and the content is:
    a. batch id unique identification
    b. Entry specific data objects, corresponding data object format: EntryProtocol.proto
  • void rollback(long batchId), according to your destiny, roll back the last get request and get the data again. Submit based on the batchId obtained by get to avoid misoperation
  • void ack(long batchId), follow the advice of life, confirm that the consumption has been successful, and notify the server to delete the data. Submit based on the batchId obtained by get to avoid misoperation

Canal's get/ack/rollback protocol is different from the conventional jms protocol. It allows get/ack to be processed asynchronously. For example, you can call get multiple times in succession. Subsequent asynchronous submission of ack/rollback in sequence is called streaming API in the project.

Benefits of streaming API design:

  • Get/ack is asynchronous to reduce the network delay and operating cost caused by ack(99%of the states are in a normal state, abnormal rollback belongs to individual cases, there is no need to sacrifice the entire performance for individual cases)
  • After get data is obtained, when there is a bottleneck in business consumption or multi-process/multi-thread consumption is required, you can continuously poll the get data and send tasks continuously to improve parallelization.(A case in the actual business by the author) :Business data consumption needs to cross the Sino-US network, so one operation is basically more than 200ms, in order to reduce latency, so it is necessary to implement parallelization)

Streaming API design:
image.png

  • Each get operation will generate a mark in the meta, the mark mark will increase, to ensure the uniqueness of the mark during the operation
  • Each get operation will continue to take the cursor recorded in the previous mark operation, if the mark does not exist, then continue to take the last ack cursor
  • When performing ack, you need to perform numerical sequence ack in the order of mark, you can not jump ack. Ack will delete the current mark mark, and update the corresponding mark position to last ack cusor
  • Once an abnormal situation occurs, the client can initiate a rollback situation and reset it:delete all marks, clean up the position of the get request, and the next request will continue to take back from the last ack cursor
HA mechanism design

Canal's ha is divided into two parts, canal server and canal client have corresponding ha implementation

  • canal server:In order to reduce requests for mysql dump, instances on different servers require that only one can be running at a time, and the other can be in standby state.
  • canal client:In order to ensure order, an instance can only be get/ack/rollback by a canal client at the same time, otherwise the client cannot guarantee order.

The control of the entire HA mechanism mainly depends on several features of zookeeper, watcher and EPHEMERAL node(bound to the life cycle of the session), you can see my previous articles related to zookeeper.

Canal Server:
image.png

Approximate steps:

  1. When the canal server wants to start a canal instance, it will make an attempt to start the judgment to the zookeeper(implementation:create an EPHEMERAL node, whoever is created will be allowed to start)
  2. After the zookeeper node is created successfully, the corresponding canal server will start the corresponding canal instance, if the canal instance is not successfully created, it will be in the standby state
  3. Once zookeeper finds that the node created by canal server A disappears, it immediately notifies other canal servers to perform the operation of step 1 again, and re-selects a canal server to start an instance.
  4. Each time the canal client connects, it will first ask zookeeper who started the canal instance, and then establish a link with it. Once the link is not available, it will try to connect again.

The method of Canal Client is similar to that of canal server, and it also uses zookeeper to seize EPHEMERAL nodes for control.

2. Environmental requirements

  • jdk recommends version 1.6.25 or later

  • The current canal open source version supports versions 5.7 and below
    ps. mysql4.x version has not been rigorously tested and is theoretically compatible

  • Open the binlog write function of mysql, and configure the binlog mode to row

      [mysqld]
      log-bin = mysql-bin
      binlog-format = ROW #select row mode
      server_id = 1 #Configure mysql replaction needs to be defined, can not be duplicated with slaveId of canal

    Check if the configuration is valid

      #View the open status and file name of binlog
      mysql> show variables like '%log_bin%';
      #View the current format of binlog
      mysql> show variables like '%format%';
      #View binlog file list
      mysql> show binary logs;
      #View the status of binlog
      mysql> show master status;
  • The principle of canal is to simulate itself as a mysql slave, so here you must need the relevant permissions as a mysql slave

      mysql> CREATE USER canal IDENTIFIED BY 'canal';
      mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *. * TO 'canal' @ '%';
      #-mysql> GRANT ALL PRIVILEGES ON *. * TO 'canal' @ '%';
      mysql> FLUSH PRIVILEGES;

    For existing accounts, you can query permissions via grants:

      mysql> show grants for 'canal';

3. Deployment

Get the release package

Method 1:(Direct download)

Visit: https://github.com/alibaba/canal/releases , which will list all historical release packages
The current latest version is 1.1.3

wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

Method 2:(Compile yourself)

git clone git@github.com:alibaba/canal.git
git co canal-1.1.3 #Switch to the corresponding version
mvn clean install -Denv = release

After the execution is completed, a target directory will be generated under the root directory of the canal project, which will contain a canal.deployer-1.1.3.tar.gz

Configuration Introduction

Before introducing the configuration, first understand the configuration loading method of canal:
image.png

There are two ways to configure canal:

  1. ManagerCanalInstanceGenerator:Configuration method based on manager management, this method is currently used for internal configuration of alibaba. You can implement CanalConfigClient and connect to their respective management systems to complete the access.
  2. SpringCanalInstanceGenerator:Based on the local spring xml configuration method, the current open source version already comes with all the code for this function, it is recommended to use
Introduction to Spring Configuration

The principle of spring configuration is to abstract the entire configuration into two parts:

  • xxxx-instance.xml(canal component configuration definition can be shared among multiple instance configurations)
  • xxxx.properties(each instance channel has its own definition, because each mysql's ip, account, password and other information will not be the same)

Through Spring's PropertyPlaceholderConfigurer, it is fused through a mechanism to generate an instance instance object. The components corresponding to each instance are independent of each other and do not affect each other.

properties configuration file
The properties configuration is divided into two parts:

  • canal.properties(system root configuration file)
  • instance.properties(instance level configuration file, one for each instance)

canal.properties introduction:
The canal configuration is mainly divided into two parts:

  1. Instance list definition(list how many instances on the current server, each instance is loaded by spring/manager, etc.)
  2. Common parameter definitions, for example, the common parameters of instance.properties can be extracted and placed here, so that each instance can be shared when it starts. [Instance.properties configuration definition priority is higher than canal.properties]

Instance.properties introduction:

  1. After canal.destinations is defined in canal.properties, a file with the same name needs to be created in the directory corresponding to canal.conf.dir

such as:

 canal.destinations = example1, example2

At this time, you need to create two directories, example1 and example2, each of which has an instance.properties.
ps. canal comes with an instance.properties demo, which can be copied directly to the conf/example directory for configuration modification

  1. If the instance list is not defined in canal.properties but canal.auto.scan is enabled
  • When the server is started for the first time, it will automatically scan the conf directory and use the file name as the instance name to start the corresponding instance

  • When the server is running, it will scan according to the frequency defined by canal.auto.scan.interval

    1. Found a new directory, start a new instance
    2. Found that the directory is deleted, close the old instance
    3. Found that the instance.properties of the corresponding directory have changed, restart the instance

instance.xml configuration file
There are currently the following instance.xml supported by default:

  • spring/memory-instance.xml
  • spring/default-instance.xml
  • spring/group-instance.xml

Before introducing the instance configuration, first understand how canal maintains an incremental subscription & consumption relationship information:

  • Parsing location(The parse module will record where the last binlog was parsed, and the corresponding component is:CanalLogPositionManager)
  • Consumption site(canal server will record the last site submitted by the client after receiving the ack from the client, the corresponding component is:CanalMetaManager)

There are currently several implementations of the corresponding two site components:

  • memory(used in memory-instance.xml)
  • zookeeper
  • mixed
  • period(used in default-instance.xml, a collection of zookeeper + memory mode, write memory first, refresh data to zookeeper regularly)

memory-instance.xml:
All components(parser, sink, store) have selected the memory version mode, and the recording site has selected the memory mode. After restarting, it will return to the initial site for parsing.

**Features:** The fastest and least dependent(no zookeeper required)

Scene: Generally used in quickstart, or a scenario where data analysis occurs after a problem occurs, it should not be applied to a production environment

default-instance.xml:
The store chooses the memory mode, and the remaining parser/sink-dependent site management chooses the persistence mode. The current persistence method is mainly to write to zookeeper to ensure data cluster sharing.
Features:Support HA Scenario:** Production environment, cluster deployment.

group-instance.xml:
Mainly aiming at the need to merge multiple libraries, multiple physical instances can be merged into one logical instance to provide client access.

**Scene:** Sub-library business. For example, the product data is split into 4 libraries, and each library will have an instance. If you do not use groups, you need to start 4 clients and link 4 instance instances when you want to consume data on the business. After using the group, it can be merged into a logical instance on the canal server. Only one client needs to be started, and the logical instance can be linked.

Instance.xml design intention:
Allows for custom extensions. For example, after implementing database-based site management, you can customize a copy of your own instance.xml. The greatest flexibility in the entire canal design lies in this

HA mode configuration

Modify canal.properties

canal.zkServers = 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
canal.destinations = example #instance list deployed on the current server
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

Modify instance.properties

canal.instance.mysql.slaveId = 1234 #The serverId concept in the mysql cluster configuration needs to be guaranteed to be unique to the current mysql cluster id(canal will be automatically generated after v1.1.x version, no need to specify manually)
canal.instance.master.address = 127.0.0.1:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.filter.regex =. * \\ .. * #mysql data analysis table of interest, Perl regular expressions. Multiple regular expressions are separated by commas(,), and the escape character requires double slashes(\\)

Note:The name of the instance directory on other machines needs to be completely consistent. The HA mode relies on the instance name for management. At the same time, the default-instance.xml configuration must be selected. Canal.instance.mysql.slaveId should be unique.
Execute the startup script startup.sh, after startup, you can view logs/example/example.log, you will only see a log of successful startup on one machine, and the other is in standby state.

Client consumption data

Create mvn project, modify pom.xml, add dependencies:

<dependency>
  <groupId> com.alibaba.otter </groupId>
  <artifactId> canal.client </artifactId>
  <version> 1.1.3 </version>
</dependency>

CanalClientTest code

package com.stepper.canalclient;

import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

public class CanalClientTest {

    public static void main(String args []) {
        String zkServers = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
        String destination = "example";
        CanalConnector connector = CanalConnectors.newClusterConnector(zkServers, destination, "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(". * \\ .. *");
//connector.rollback();
            int totalEmptyCount = 120;
            while(emptyCount <totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); //Get the specified amount of data
//System.out.println(message.toString());
                long batchId = message.getId();
                int size = message.getEntries(). size();
                if(batchId == -1 || size == 0) {
                    emptyCount ++;
                    System.out.println("empty count:" + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch(InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    //System.out.printf("message [batchId =%s, size =%s]\ n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); //Submit confirmation
                //connector.rollback(batchId); //processing failed, rollback data
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List <Entry> entrys) {
        for(Entry entry:entrys) {
            if(entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch(Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error, data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================ binlog [%s:%s], name [%s,%s], eventType:%s" ,
                    entry.getHeader(). getLogfileName(), entry.getHeader(). getLogfileOffset(),
                    entry.getHeader(). getSchemaName(), entry.getHeader(). getTableName(),
                    eventType));

            for(RowData rowData:rowChage.getRowDatasList()) {
                if(eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if(eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("------- & gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("------- & gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List <Column> columns) {
        for(Column column:columns) {
            System.out.println(column.getName() + ":" + column.getValue() + "update =" + column.getUpdated());
        }
    }


}

After starting the Canal Client, you can see the message from the console by operating the database to change the data.
For more parameters and introductions, please refer to the official wiki documentation .

note:

  • Use HA as much as possible in the production environment.
  • Regarding the order in which Canal consumes binlogs, to ensure that binlogs are strictly ordered, try not to use multithreading.
  • If the data after Canal consumes binlog must be sent to kafka, and it must be ordered, the partition of kafka topic can be set to 1 partition.

[Please indicate the source of the reprint]: https://segmentfault.com/a/1190000022767293