order
This paper focuses on maxwell's Recovery
Recovery
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/recovery/Recovery.java
public class Recovery { static final Logger LOGGER = LoggerFactory.getLogger(Recovery.class); private final ConnectionPool replicationConnectionPool; private final RecoveryInfo recoveryInfo; private final MaxwellMysqlConfig replicationConfig; private final String maxwellDatabaseName; private final RecoverySchemaStore schemaStore; public Recovery(MaxwellMysqlConfig replicationConfig, String maxwellDatabaseName, ConnectionPool replicationConnectionPool, CaseSensitivity caseSensitivity, RecoveryInfo recoveryInfo) { this.replicationConfig = replicationConfig; this.replicationConnectionPool = replicationConnectionPool; this.recoveryInfo = recoveryInfo; this.schemaStore = new RecoverySchemaStore(replicationConnectionPool, maxwellDatabaseName, caseSensitivity); this.maxwellDatabaseName = maxwellDatabaseName; } public HeartbeatRowMap recover() throws Exception { String recoveryMsg = String.format( "old-server-id: %d, position: %s", recoveryInfo.serverID, recoveryInfo.position ); LOGGER.warn("attempting to recover from master-change: " + recoveryMsg); List<BinlogPosition> list = getBinlogInfo(); for ( int i = list.size() - 1; i >= 0 ; i-- ) { BinlogPosition binlogPosition = list.get(i); Position position = Position.valueOf(binlogPosition, recoveryInfo.getHeartbeat()); Metrics metrics = new NoOpMetrics(); LOGGER.debug("scanning binlog: " + binlogPosition); Replicator replicator = new BinlogConnectorReplicator( this.schemaStore, null, null, replicationConfig, 0L, // server-id of 0 activates "mysqlbinlog" behavior where the server will stop after each binlog maxwellDatabaseName, metrics, position, true, recoveryInfo.clientID, new HeartbeatNotifier(), null, new RecoveryFilter(this.maxwellDatabaseName), new MaxwellOutputConfig(), 0.25f // Default memory usage size, not used ); HeartbeatRowMap h = findHeartbeat(replicator); if ( h != null ) { LOGGER.warn("recovered new master position: " + h.getNextPosition()); return h; } } LOGGER.error("Could not recover from master-change: " + recoveryMsg); return null; } /** * try to find a given heartbeat value from the replicator. * @return A BinlogPosition where the heartbeat was found, or null if none was found. */ private HeartbeatRowMap findHeartbeat(Replicator r) throws Exception { r.startReplicator(); for (RowMap row = r.getRow(); row != null ; row = r.getRow()) { if (!(row instanceof HeartbeatRowMap)) { continue; } HeartbeatRowMap heartbeatRow = (HeartbeatRowMap) row; if (heartbeatRow.getPosition().getLastHeartbeatRead() == recoveryInfo.getHeartbeat()) return heartbeatRow; } return null; } /** * fetch a list of binlog positions representing the start of each binlog file * * @return a list of binlog positions to attempt recovery at * */ private List<BinlogPosition> getBinlogInfo() throws SQLException { ArrayList<BinlogPosition> list = new ArrayList<>(); try ( Connection c = replicationConnectionPool.getConnection() ) { ResultSet rs = c.createStatement().executeQuery("SHOW BINARY LOGS"); while ( rs.next() ) { list.add(BinlogPosition.at(4, rs.getString("Log_name"))); } } return list; } }
- Recovery provides a recovery method, which first obtains the BinlogPosition list through the getBinlogInfo method, then traverses the BinlogPosition from the back to the front to build the BinlogConnectorReplicator, and finally finds the heartbeatRow.getPosition().getLastHeartbeatRead() as the HeartbeatRowMap of recoveryInfo.getHeartbeat(). If it is not null, it returns directly
Maxwell
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/Maxwell.java
public class Maxwell implements Runnable { protected MaxwellConfig config; protected MaxwellContext context; protected Replicator replicator; static final Logger LOGGER = LoggerFactory.getLogger(Maxwell.class); public Maxwell(MaxwellConfig config) throws SQLException, URISyntaxException { this(new MaxwellContext(config)); } protected Maxwell(MaxwellContext context) throws SQLException, URISyntaxException { this.config = context.getConfig(); this.context = context; this.context.probeConnections(); } //...... private Position attemptMasterRecovery() throws Exception { HeartbeatRowMap recoveredHeartbeat = null; MysqlPositionStore positionStore = this.context.getPositionStore(); RecoveryInfo recoveryInfo = positionStore.getRecoveryInfo(config); if ( recoveryInfo != null ) { Recovery masterRecovery = new Recovery( config.replicationMysql, config.databaseName, this.context.getReplicationConnectionPool(), this.context.getCaseSensitivity(), recoveryInfo ); recoveredHeartbeat = masterRecovery.recover(); if (recoveredHeartbeat != null) { // load up the schema from the recovery position and chain it into the // new server_id MysqlSchemaStore oldServerSchemaStore = new MysqlSchemaStore( context.getMaxwellConnectionPool(), context.getReplicationConnectionPool(), context.getSchemaConnectionPool(), recoveryInfo.serverID, recoveryInfo.position, context.getCaseSensitivity(), config.filter, false ); // Note we associate this schema to the start position of the heartbeat event, so that // we pick it up when resuming at the event after the heartbeat. oldServerSchemaStore.clone(context.getServerID(), recoveredHeartbeat.getPosition()); return recoveredHeartbeat.getNextPosition(); } } return null; } //...... }
- Maxwell's attemptMasterRecovery method gets recoveryInfo through positionStore.getRecoveryInfo(config). If recoveryInfo is not null, create masterRecovery, execute masterRecovery.recover() to get recoveredHeartbeat, and if recoveredHeartbeat is not null, return recoveredHeartbeat.getNextPosition()
Summary
Recovery provides a recovery method, which first obtains the BinlogPosition list through the getBinlogInfo method, then traverses the BinlogPosition from the back to the front to build the BinlogConnectorReplicator, and finally finds the heartbeatRow.getPosition().getLastHeartbeatRead() as the HeartbeatRowMap of recoveryInfo.getHeartbeat(). If it is not null, it returns directly