Canal source code analysis series - communication data structure of canal

The protocol module mainly defines the communication protocol between client and server. The data transmission of canal consists of two parts. One part is to convert binlog into the Message defined by us during binlog subscription, and the other part is the TCP protocol for transmission between client and server. These two parts adopt protobuff format.

First, clarify some basic concepts so that we can analyze the core link later.

First look at the Message class,

/**
 * @author zebin.xuzb @ 2012-6-19
 * @version 1.0.0
 *
 * A message is the batch packet obtained
 * Contains more than one canalentry Entry
 */
public class Message implements Serializable {

    private static final long      serialVersionUID = 1234034768477580009L;
    private long                   id;
    private List<CanalEntry.Entry> entries          = new ArrayList<>();//Non raw types use this
    // row data for performance, see:
    // https://github.com/alibaba/canal/issues/726
    private boolean                raw              = true;
    private List<ByteString>       rawEntries       = new ArrayList<>();//The raw type uses this
    ...

Message encapsulates the data structure of the communication protocol between the canal server and the client. The specific data stored in it is entries or rawEntries, which is determined by whether the raw type is used. Raw type is equivalent to a processing type, which can store some basic data structures, such as bytes.

Examples of Message usage are as follows:

while (running) {
            try {
                connector.connect();
                connector.subscribe();
                while (running) {
                    try {
                        List<Message> messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS); // Get message
                        if (messages == null) {
                            continue;
                        }
                        for (Message message : messages) {
                            long batchId = message.getId();
                            int size = message.getEntries().size();
                            if (batchId == -1 || size == 0) {
                                // try {
                                // Thread.sleep(1000);
                                // } catch (InterruptedException e) {
                                // }
                            } else {
                                // printSummary(message, batchId, size);
                                // printEntry(message.getEntries());
                                logger.info(message.toString());
                            }
                        }
...

What is CanalEntry? From the code, we can see that CananlEntry contains several types:

  • CanalEntry.Column - data structure (column) for each field
  • CanalEntry.Entry - corresponds to an Event
  • CanalEntry.EntryType - the event type after fragmentation. It is mainly used to identify the start, change data and end of a transaction
  • CanalEntry.EventType - event type, insert, update, etc
  • CanalEntry.RowChange - the data structure of each row of change data
  • CanalEntry.RowData - a row of data corresponding to mysql

CanalEntry is defined based on protobuf. Protobuf is a data description language developed by Google, which is similar to XML. It can serialize structured data and can be used in data storage, communication protocol and so on. We won't talk about protobuf here.

Entry consists of three parts:

  • header
  • entryType
  • storeValue
message Entry {
	/**Protocol header information**/
     Header						header 				= 1;
	///**Event type after splitting * * / [default = ROWDATA]
	oneof entryType_present{
		EntryType					entryType			= 2;
	}

	/**Binary array transferred**/
	bytes						storeValue			= 3;
}

These parts include:

  • Header

    • Version [version number of the protocol, default = 1]
    • logfileName [binlog file name]
    • logfileOffset [binlog position]
    • serverId [server serverId]
    • serverenCode [change data code]
    • executeTime [execution time of changed data]
    • sourceType [change data source, default = MYSQL]
    • schemaname [schemaname of changed data]
    • tablename [tablename of changed data]
    • eventLength [length of each event]
    • eventType [insert/update/delete type, default = UPDATE]
    • props [reserved extension]
    • gtid [gitd of current transaction]
  • entryType [transaction header BEGIN / transaction tail END / data ROWDATA/HEARTBEAT/GTIDLOG]

  • storeValue [byte data, expandable, the corresponding type is RowChange, and the data structure of each row is changed]

    • tableId [tableId, generated by database]
    • eventType [data change type, default = UPDATE]
    • isDdl [identify whether it is a ddl statement, such as create table/drop table]
    • sql [sql statement of DDL / query]
    • props [reserved extension]
    • Ddlschemaname [schemaName of ddl / query, there will be cross database ddl, and the current schemaName executing ddl needs to be kept]
    • Rowdata [specific insert/update/delete change data can be multiple, and one binlog event can correspond to multiple changes, such as batch processing]

Keywords: data structure entry canal

Added by adavis on Sat, 18 Dec 2021 00:16:21 +0200