order
This paper mainly studies the CanalAdapterWorker of canal
CanalAdapterWorker
canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java
public class CanalAdapterWorker extends AbstractCanalAdapterWorker { private static final int BATCH_SIZE = 50; private static final int SO_TIMEOUT = 0; private CanalConnector connector; /** * The construction method of a single client adapter worker * * @param canalDestination canal Instance name * @param address canal-server address * @param canalOuterAdapters External adapter group */ public CanalAdapterWorker(CanalClientConfig canalClientConfig, String canalDestination, SocketAddress address, List<List<OuterAdapter>> canalOuterAdapters){ super(canalOuterAdapters); this.canalClientConfig = canalClientConfig; this.canalDestination = canalDestination; connector = CanalConnectors.newSingleConnector(address, canalDestination, "", ""); } /** * HA Construction method of client adapter worker under Mode * * @param canalDestination canal Instance name * @param zookeeperHosts zookeeper address * @param canalOuterAdapters External adapter group */ public CanalAdapterWorker(CanalClientConfig canalClientConfig, String canalDestination, String zookeeperHosts, List<List<OuterAdapter>> canalOuterAdapters){ super(canalOuterAdapters); this.canalDestination = canalDestination; this.canalClientConfig = canalClientConfig; connector = CanalConnectors.newClusterConnector(zookeeperHosts, canalDestination, "", ""); ((ClusterCanalConnector) connector).setSoTimeout(SO_TIMEOUT); } @Override protected void process() { while (!running) { // waiting until running == true while (!running) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } } int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries(); if (retry == -1) { // Number of retries - 1 indicates that retries are blocked all the time in case of exception retry = Integer.MAX_VALUE; } // long timeout = canalClientConfig.getTimeout() == null ? 300000 : // canalClientConfig.getTimeout(); / / the default timeout is 5 minutes Integer batchSize = canalClientConfig.getBatchSize(); if (batchSize == null) { batchSize = BATCH_SIZE; } while (running) { try { syncSwitch.get(canalDestination); logger.info("=============> Start to connect destination: {} <=============", this.canalDestination); connector.connect(); logger.info("=============> Start to subscribe destination: {} <=============", this.canalDestination); connector.subscribe(); logger.info("=============> Subscribe destination: {} succeed <=============", this.canalDestination); while (running) { try { syncSwitch.get(canalDestination, 1L, TimeUnit.MINUTES); } catch (TimeoutException e) { break; } if (!running) { break; } for (int i = 0; i < retry; i++) { if (!running) { break; } Message message = connector.getWithoutAck(batchSize); // Get the specified amount of data long batchId = message.getId(); try { int size = message.getEntries().size(); if (batchId == -1 || size == 0) { Thread.sleep(500); } else { if (logger.isDebugEnabled()) { logger.debug("destination: {} batchId: {} batchSize: {} ", canalDestination, batchId, size); } long begin = System.currentTimeMillis(); writeOut(message); if (logger.isDebugEnabled()) { logger.debug("destination: {} batchId: {} elapsed time: {} ms", canalDestination, batchId, System.currentTimeMillis() - begin); } } connector.ack(batchId); // Submit confirmation break; } catch (Exception e) { if (i != retry - 1) { connector.rollback(batchId); // Processing failed, rollback data logger.error(e.getMessage() + " Error sync and rollback, execute times: " + (i + 1)); } else { connector.ack(batchId); logger.error(e.getMessage() + " Error sync but ACK!"); } Thread.sleep(500); } } } } catch (Throwable e) { logger.error("process error!", e); } finally { connector.disconnect(); logger.info("=============> Disconnect destination: {} <=============", this.canalDestination); } if (running) { // is reconnect try { Thread.sleep(1000); } catch (InterruptedException e) { // ignore } } } } @Override public void stop() { try { if (!running) { return; } if (connector instanceof ClusterCanalConnector) { ((ClusterCanalConnector) connector).stopRunning(); } else if (connector instanceof SimpleCanalConnector) { ((SimpleCanalConnector) connector).stopRunning(); } running = false; syncSwitch.release(canalDestination); logger.info("destination {} is waiting for adapters' worker thread die!", canalDestination); if (thread != null) { try { thread.join(); } catch (InterruptedException e) { // ignore } } groupInnerExecutorService.shutdown(); logger.info("destination {} adapters worker thread dead!", canalDestination); canalOuterAdapters.forEach(outerAdapters -> outerAdapters.forEach(OuterAdapter::destroy)); logger.info("destination {} all adapters destroyed!", canalDestination); } catch (Exception e) { logger.error(e.getMessage(), e); } } }
- The process method of CanalAdapterWorker uses two layers of while loop. The first layer executes syncSwitch.get(canalDestination). If there is an exception, execute connector.disconnect() in finally. The second layer executes syncSwitch.get(canalDestination, 1L, TimeUnit.MINUTES), if there is TimeoutException, continue the cycle, then use the cycle of retry times to execute connector.getWithoutAck(batchSize), then call writeOut(message), finally execute connector.ack(batchId), then jump out of the loop; if there is an exception, execute connector.rollback (connector.rollback) without exceeding the number of retry, and go beyond the number of times. P method executes connector.stopRunning, then syncSwitch.release(canalDestination), groupInnerExecutorService.shutdown(), outerdadapter's destroy method
AbstractCanalAdapterWorker
canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
public abstract class AbstractCanalAdapterWorker { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); protected String canalDestination; // canal example protected String groupId = null; // groupId protected List<List<OuterAdapter>> canalOuterAdapters; // External adapter protected CanalClientConfig canalClientConfig; // To configure protected ExecutorService groupInnerExecutorService; // In group worker pool protected volatile boolean running = false; // Running or not protected Thread thread = null; protected Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error", e); protected SyncSwitch syncSwitch; public AbstractCanalAdapterWorker(List<List<OuterAdapter>> canalOuterAdapters){ this.canalOuterAdapters = canalOuterAdapters; this.groupInnerExecutorService = Util.newFixedThreadPool(canalOuterAdapters.size(), 5000L); syncSwitch = (SyncSwitch) SpringContext.getBean(SyncSwitch.class); } protected void writeOut(final Message message) { List<Future<Boolean>> futures = new ArrayList<>(); // Inter group adapter runs in parallel canalOuterAdapters.forEach(outerAdapters -> { final List<OuterAdapter> adapters = outerAdapters; futures.add(groupInnerExecutorService.submit(() -> { try { // Try not to configure the adapter in the group adapters.forEach(adapter -> { long begin = System.currentTimeMillis(); List<Dml> dmls = MessageUtil.parse4Dml(canalDestination, groupId, message); if (dmls != null) { batchSync(dmls, adapter); if (logger.isDebugEnabled()) { logger.debug("{} elapsed time: {}", adapter.getClass().getName(), (System.currentTimeMillis() - begin)); } } }); return true; } catch (Exception e) { logger.error(e.getMessage(), e); return false; } })); // Wait for all adapter writes to complete // Because it is a concurrent operation between groups, it blocks until the most time-consuming group operation is completed RuntimeException exception = null; for (Future<Boolean> future : futures) { try { if (!future.get()) { exception = new RuntimeException("Outer adapter sync failed! "); } } catch (Exception e) { exception = new RuntimeException(e); } } if (exception != null) { throw exception; } }); } private void batchSync(List<Dml> dmls, OuterAdapter adapter) { // Batch synchronization if (dmls.size() <= canalClientConfig.getSyncBatchSize()) { adapter.sync(dmls); } else { int len = 0; List<Dml> dmlsBatch = new ArrayList<>(); for (Dml dml : dmls) { dmlsBatch.add(dml); if (dml.getData() == null || dml.getData().isEmpty()) { len += 1; } else { len += dml.getData().size(); } if (len >= canalClientConfig.getSyncBatchSize()) { adapter.sync(dmlsBatch); dmlsBatch.clear(); len = 0; } } if (!dmlsBatch.isEmpty()) { adapter.sync(dmlsBatch); } } } //...... }
- AbstractThe writeOut method of the canaladapterworker traverses the canalauteradapters and submits batchSync tasks to the groupInnerExecutorService one by one; the batchSync method executes adapter.sync(dmls) in batches according to the configuration of canalClientConfig.getSyncBatchSize()
SyncSwitch
canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java
@Component public class SyncSwitch { private static final String SYN_SWITCH_ZK_NODE = "/sync-switch/"; private static final Map<String, BooleanMutex> LOCAL_LOCK = new ConcurrentHashMap<>(); private static final Map<String, BooleanMutex> DISTRIBUTED_LOCK = new ConcurrentHashMap<>(); private Mode mode = Mode.LOCAL; @Resource private AdapterCanalConfig adapterCanalConfig; @Resource private CuratorClient curatorClient; @PostConstruct public void init() { CuratorFramework curator = curatorClient.getCurator(); if (curator != null) { mode = Mode.DISTRIBUTED; DISTRIBUTED_LOCK.clear(); for (String destination : adapterCanalConfig.DESTINATIONS) { // Corresponding to each destination registration lock BooleanMutex mutex = new BooleanMutex(true); initMutex(curator, destination, mutex); DISTRIBUTED_LOCK.put(destination, mutex); startListen(destination, mutex); } } else { mode = Mode.LOCAL; LOCAL_LOCK.clear(); for (String destination : adapterCanalConfig.DESTINATIONS) { // Corresponding to each destination registration lock LOCAL_LOCK.put(destination, new BooleanMutex(true)); } } } public void get(String destination) throws InterruptedException { if (mode == Mode.LOCAL) { BooleanMutex mutex = LOCAL_LOCK.get(destination); if (mutex != null) { mutex.get(); } } else { BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination); if (mutex != null) { mutex.get(); } } } public void get(String destination, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { if (mode == Mode.LOCAL) { BooleanMutex mutex = LOCAL_LOCK.get(destination); if (mutex != null) { mutex.get(timeout, unit); } } else { BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination); if (mutex != null) { mutex.get(timeout, unit); } } } public synchronized void release(String destination) { if (mode == Mode.LOCAL) { BooleanMutex mutex = LOCAL_LOCK.get(destination); if (mutex != null && !mutex.state()) { mutex.set(true); } } if (mode == Mode.DISTRIBUTED) { BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination); if (mutex != null && !mutex.state()) { mutex.set(true); } } } //...... }
- SyncSwitch supports LOCAL and DISTRIBUTED modes. Its get method executes mutex.get() or mutex.get(timeout, unit) method, and its release method executes mutex.set(true)
Summary
The process method of CanalAdapterWorker uses two layers of while loop. The first layer executes syncSwitch.get(canalDestination). If there is an exception, execute connector.disconnect() in finally. The second layer executes syncSwitch.get(canalDestination, 1L, TimeUnit.MINUTES), if there is TimeoutException, continue the cycle, then use the cycle of retry times to execute connector.getWithoutAck(batchSize), then call writeOut(message), finally execute connector.ack(batchId), then jump out of the loop; if there is an exception, execute connector.rollback (connector.rollback) without exceeding the number of retry, and go beyond the number of times. P method executes connector.stop running, then syncSwitch.release(canalDestination), groupInnerExecutorService.shutdown(), outerdadapter's destroy method