This paper describes how to use Canal to implement an asynchronous and decoupled architecture. Subsequent articles will be rewritten to analyze Canal principles and source code.
Introduction to Canal
Canal is the middleware used to get database changes.
Fake yourself as a MySQL slave library, pull the master binlog and parse and process it. Processing results can be sent to MQ, which is useful for other services to get database change messages. Some typical uses are described below.
Among them, Canal+MQ as a whole is a data pipeline service from the outside, as shown in the following figure.
Typical uses of Canal
Heterogeneous data (e.g. ES, HBase, DB for different routing key s)
Canal's own adapter allows you to synchronize heterogeneous data to ES and HBase without the tedious data conversion and synchronization. Here, adapter is the typical adapter mode, which converts data into the appropriate format and writes it to a heterogeneous storage system.
Of course, you can also synchronize data to the DB or even build a database that is routed in pieces by different fields.
For example, when placing an order, the order is recorded by user id sub-form, and then with the help of Canal data channel, an order record by merchant id sub-form is constructed for B-side business (such as merchants querying which orders they receive).
Cache Refresh
A common practice for cache refresh is to update the DB first, then delete the cache, and then delay the deletion (cache-aside pattern + delayed double deletion), which can fail with relatively complex implementation. By refreshing the cache with Canal, the main service and process do not need to care about consistency issues such as cache updates to ensure final consistency.
Important business news such as price changes
Downstream services are immediately aware of price changes.
The usual practice is to modify the price before sending a message. The challenge here is to ensure that the message is sent successfully and what to do if it is not. With Canal, you don't have to worry about losing messages at the business level.
Database Migration
- Multi-room data synchronization
- Disassemble Library
Although you can implement double-write logic in your code and then process historical data, historical data may also be updated, requiring constant iteration, comparison, and updating, all in all, complex.
Real-time reconciliation
The general practice is that the timed tasks run the reconciliation logic, which is inefficient in time and can not find inconsistencies in time. With Canal, reconciliation logic can be triggered in real time.
The general process is as follows:
- Receive data change messages
- Write hbase as pipeline record
- After a period of time, trigger comparison to compare with the end data
Canal Client demo Code Analysis
The following example is an example of a client connecting to Canal, modified from the official github example, with some optimizations made by the owner and comments added to the key lines of code. If Canal sends a data change message to MQ, it is written differently, except that one subscribes to Canal and the other subscribes to MQ, but the parsing and processing logic is basically the same.
`
public void process() { // Number of bars processed per batch int batchSize = 1024; while (running) { try { // Connect to Canal Service connector.connect(); // Subscribe to data (such as a table) connector.subscribe("table_xxx"); while (running) { // Bulk Get Data Change Records Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { // Unexpected situation, exception handling required } else { // Print data change details printEntry(message.getEntries()); } if (batchId != -1) { // Use batchId for ack operation: Indicate that batch processing is complete, update Canal side consumption progress connector.ack(batchId); } } } catch (Throwable e) { logger.error("process error!", e); try { Thread.sleep(1000L); } catch (InterruptedException e1) { // ignore } // Processing failed, rollback progress connector.rollback(); } finally { // Disconnect connector.disconnect(); } } } private void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { long executeTime = entry.getHeader().getExecuteTime(); long delayTime = new Date().getTime() - executeTime; Date date = new Date(entry.getHeader().getExecuteTime()); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // Only care about the type of data change if (entry.getEntryType() == EntryType.ROWDATA) { RowChange rowChange = null; try { // Resolving Data Change Objects rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } EventType eventType = rowChange.getEventType(); logger.info(row_format, new Object[] { entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime) }); // Do not care about queries, and DDL changes if (eventType == EventType.QUERY || rowChange.getIsDdl()) { logger.info("ddl : " + rowChange.getIsDdl() + " , sql ----> " + rowChange.getSql() + SEP); continue; } for (RowData rowData : rowChange.getRowDatasList()) { if (eventType == EventType.DELETE) { // Print column values before changes when data change type is delete printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { // Print the changed column values when the data change type is insert printColumn(rowData.getAfterColumnsList()); } else { // Print column values before and after changes when the data change type is other (that is, update) printColumn(rowData.getBeforeColumnsList()); printColumn(rowData.getAfterColumnsList()); } } } } } // Print column values private void printColumn(List<Column> columns) { for (Column column : columns) { StringBuilder builder = new StringBuilder(); try { if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB") || StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) { // get value bytes builder.append(column.getName() + " : " + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8")); } else { builder.append(column.getName() + " : " + column.getValue()); } } catch (UnsupportedEncodingException e) { } builder.append(" type=" + column.getMysqlType()); if (column.getUpdated()) { builder.append(" update=" + column.getUpdated()); } builder.append(SEP); logger.info(builder.toString()); } }
`