Canal introduction
View the official website of Canal
Canal means waterway / pipeline / ditch. It is mainly used for incremental log parsing based on MySQL database to provide incremental data subscription and consumption
Businesses based on log incremental subscription and consumption include:
- database mirroring
- Real time database backup
- Index construction and real-time maintenance (splitting heterogeneous indexes, inverted indexes, etc.)
- Service cache refresh
- Incremental data processing with business logic
Current canal supports source MySQL versions, including 5.1. X, 5.5. X, 5.6. X, 5.7. X, and 8.0. X
Canal principle
Official website address https://gitee.com/bwaylon/canal
The introduction of the principle on the official website is very clear. In short, it disguises itself as a slave node in the Mysql Cluster, sends the dump protocol to the master node, receives the binary log pushed from the master node, and realizes the real-time acquisition of incremental data by parsing the binary log.
Canal installation
download
https://github.com/alibaba/canal/releases
Download the latest application files from the official website
Extract and modify the configuration file conf/example/instance.properties
canal.instance.mysql.slaveId = 1234
#position info, which needs to be changed to its own database information
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .*\\...*
Open binary log in the database, modify my.cnf, and restart Mysql service
[mysqld]
Log bin = MySQL bin # enable binlog
Binlog format = ROW # select ROW mode
server_id=1 # configuring MySQL replacement needs to be defined. It should not be repeated with the slaveId of canal
Authorize the canal linked MySQL account to have the permission to act as a MySQL slave. If there is an existing account, you can grant it directly
Run the following command
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'canal'@'%';
– GRANT ALL PRIVILEGES ON . TO 'canal'@'%' ;
FLUSH PRIVILEGES;
start-up
Run startup.cmd for windows
Run startup.sh on linux
SpringBoot integrated Canal
code
This is mainly to use ApplicationRunner to create an initialization operation after the Springboot application is successfully started
Adding a dependency may cause conflicts with its own log framework. If there are conflicts, eliminate them
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.0.12</version> <exclusions> <exclusion> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> </exclusion> <exclusion> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> </exclusion> </exclusions> </dependency>
Configuration properties
@Data @ConfigurationProperties(prefix = "custom.canal") public class CanalProperties { private String ip;//Canal IP private Integer port;//Canal Port // canal.properties after configuring canal.destinations, you need to create a corresponding folder with a copy of instance.properties private String destination;// The target configuration is the folder name of the target instance.properties private String username;//user name private String password;//password }
@Slf4j @Configuration public class CanalListener implements ApplicationRunner { private static CanalProperties properties; @Autowired public void setProperties(CanalProperties properties) { this.properties = properties; } @Override public void run(ApplicationArguments args) throws Exception { //Get to work dowork(); } //Official sample code public void dowork(){ // create link CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(properties.getIp(), properties.getPort()), properties.getDestination(), properties.getUsername(), properties.getPassword()); int batchSize = 1000; try { connector.connect(); connector.subscribe(properties.getSubscribe()); connector.rollback(); while (true) { Message message = connector.getWithoutAck(batchSize); // Gets the specified amount of data long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { try { Thread.sleep(500); } catch (InterruptedException e) { log.error("cannal wait error"); } } else { run(message.getEntries()); } connector.ack(batchId); // Submission confirmation received successfully // connector.rollback(batchId); // Processing failed. Roll back the data and receive this data next time } } finally { connector.disconnect(); } } public void run(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage = null; try { rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == CanalEntry.EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private void printColumn(List<CanalEntry.Column> columns) { for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
After starting the application, when we operate in the configured data, we will see the contents printed on the console.
Common configurations and possible problems
canal.instance.mysql.slaveId
The ID used to imitate slave cannot conflict with the configuration in the database my.ini
canal.instance.filter.regex filter settings
The configuration in install.properties can set which tables canal listens to. The format is as follows:
- . \... Listen to all libraries and all tables
- Dbname \... * all tables under dbname database
- Dbname \. aa. * tables beginning with aa in dbname database
- dbname.table1 specifies the database and table
- Dbname \. AA. *, dbname.table1 multiple rules are separated by
Pay attention to the following points when configuring filters:
- Do not configure CanalConnector.subscribe("...") in the code, otherwise the filtering configuration in the configuration file will be overwritten
- subscribe is not configured in the code, but the filter still does not take effect
I have encountered this problem, but I don't know why. You can find this configuration according to the configuration file 1,see conf/canal.properties Medium canal.instance.global.spring.xml attribute The general default is classpath:spring/file-instance.xml 2,Open the file found above spring/file-instance.xml And modify here <!-- Analytical filtering processing --> <property name="eventFilter"> <bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" > <constructor-arg index="0" value="${canal.instance.filter.regex:.*\..*}" /> Change to <constructor-arg index="0" value="${canal.instance.filter.regex}" /> </bean> </property> Delete the default configuration and restart canal It's OK. Later, I restored the configuration and restarted it again. It still takes effect... Confused
canal.instance.filter.black.regex blacklist settings
The table setting the class list will not be monitored. Note that if the rules of the table meet both the filtering regularity and the blacklist regularity, it will not be monitored