Springboot -- integrating Canal to realize real-time monitoring of database changes

Canal introduction

View the official website of Canal
Canal means waterway / pipeline / ditch. It is mainly used for incremental log parsing based on MySQL database to provide incremental data subscription and consumption
Businesses based on log incremental subscription and consumption include:

  • database mirroring
  • Real time database backup
  • Index construction and real-time maintenance (splitting heterogeneous indexes, inverted indexes, etc.)
  • Service cache refresh
  • Incremental data processing with business logic

Current canal supports source MySQL versions, including 5.1. X, 5.5. X, 5.6. X, 5.7. X, and 8.0. X

Canal principle

Official website address https://gitee.com/bwaylon/canal

The introduction of the principle on the official website is very clear. In short, it disguises itself as a slave node in the Mysql Cluster, sends the dump protocol to the master node, receives the binary log pushed from the master node, and realizes the real-time acquisition of incremental data by parsing the binary log.

Canal installation

download

https://github.com/alibaba/canal/releases

Download the latest application files from the official website

Extract and modify the configuration file conf/example/instance.properties

canal.instance.mysql.slaveId = 1234
#position info, which needs to be changed to its own database information
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .*\\...*

Open binary log in the database, modify my.cnf, and restart Mysql service

[mysqld]
Log bin = MySQL bin # enable binlog
Binlog format = ROW # select ROW mode
server_id=1 # configuring MySQL replacement needs to be defined. It should not be repeated with the slaveId of canal

Authorize the canal linked MySQL account to have the permission to act as a MySQL slave. If there is an existing account, you can grant it directly

Run the following command
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'canal'@'%';
– GRANT ALL PRIVILEGES ON . TO 'canal'@'%' ;
FLUSH PRIVILEGES;

start-up

Run startup.cmd for windows
Run startup.sh on linux

SpringBoot integrated Canal

code

This is mainly to use ApplicationRunner to create an initialization operation after the Springboot application is successfully started

Adding a dependency may cause conflicts with its own log framework. If there are conflicts, eliminate them

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.0.12</version>
            <exclusions>
                <exclusion>
                    <groupId>ch.qos.logback</groupId>
                    <artifactId>logback-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>ch.qos.logback</groupId>
                    <artifactId>logback-classic</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

Configuration properties

@Data
@ConfigurationProperties(prefix = "custom.canal")
public class CanalProperties {

    private String  ip;//Canal IP
    private Integer port;//Canal Port
    // canal.properties after configuring canal.destinations, you need to create a corresponding folder with a copy of instance.properties
    private String  destination;// The target configuration is the folder name of the target instance.properties
    private String  username;//user name
    private String  password;//password
}  
@Slf4j
@Configuration
public class CanalListener implements ApplicationRunner {

    private static CanalProperties properties;

    @Autowired
    public void setProperties(CanalProperties properties) {
        this.properties = properties;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        //Get to work 
        dowork();
    }

    //Official sample code
    public void dowork(){
        // create link  
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(properties.getIp(),
                properties.getPort()), properties.getDestination(), properties.getUsername(), properties.getPassword());
        int batchSize = 1000;
        try {  
            connector.connect();  
            connector.subscribe(properties.getSubscribe());
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(batchSize); // Gets the specified amount of data
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        log.error("cannal wait error");
                    }
                } else {
                    run(message.getEntries());
                }
                connector.ack(batchId); // Submission confirmation received successfully
//                    connector.rollback(batchId); //  Processing failed. Roll back the data and receive this data next time
            }
        } finally {  
            connector.disconnect();  
        }  
    }
    
    public void run(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}  

After starting the application, when we operate in the configured data, we will see the contents printed on the console.

Common configurations and possible problems

canal.instance.mysql.slaveId

The ID used to imitate slave cannot conflict with the configuration in the database my.ini

canal.instance.filter.regex filter settings

The configuration in install.properties can set which tables canal listens to. The format is as follows:

  • . \... Listen to all libraries and all tables
  • Dbname \... * all tables under dbname database
  • Dbname \. aa. * tables beginning with aa in dbname database
  • dbname.table1 specifies the database and table
  • Dbname \. AA. *, dbname.table1 multiple rules are separated by

Pay attention to the following points when configuring filters:

  • Do not configure CanalConnector.subscribe("...") in the code, otherwise the filtering configuration in the configuration file will be overwritten
  • subscribe is not configured in the code, but the filter still does not take effect
    I have encountered this problem, but I don't know why. You can find this configuration according to the configuration file
    1,see conf/canal.properties Medium canal.instance.global.spring.xml attribute 
       The general default is classpath:spring/file-instance.xml
    2,Open the file found above spring/file-instance.xml And modify here
       <!-- Analytical filtering processing -->
    	<property name="eventFilter">
    		<bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
    			<constructor-arg index="0" value="${canal.instance.filter.regex:.*\..*}" />
    			Change to
    			<constructor-arg index="0" value="${canal.instance.filter.regex}" />
    		</bean>
    	</property>
    Delete the default configuration and restart canal It's OK. Later, I restored the configuration and restarted it again. It still takes effect... Confused
    

canal.instance.filter.black.regex blacklist settings

The table setting the class list will not be monitored. Note that if the rules of the table meet both the filtering regularity and the blacklist regularity, it will not be monitored

Keywords: Java Database MySQL Spring Boot IDE

Added by capetonian on Wed, 08 Dec 2021 11:35:26 +0200