Talk about cannal's cannaladapterworker

order

This paper mainly studies the CanalAdapterWorker of canal

CanalAdapterWorker

canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

public class CanalAdapterWorker extends AbstractCanalAdapterWorker {

    private static final int BATCH_SIZE = 50;
    private static final int SO_TIMEOUT = 0;

    private CanalConnector   connector;

    /**
     * The construction method of a single client adapter worker
     *
     * @param canalDestination canal Instance name
     * @param address canal-server address
     * @param canalOuterAdapters External adapter group
     */
    public CanalAdapterWorker(CanalClientConfig canalClientConfig, String canalDestination, SocketAddress address,
                              List<List<OuterAdapter>> canalOuterAdapters){
        super(canalOuterAdapters);
        this.canalClientConfig = canalClientConfig;
        this.canalDestination = canalDestination;
        connector = CanalConnectors.newSingleConnector(address, canalDestination, "", "");
    }

    /**
     * HA Construction method of client adapter worker under Mode
     *
     * @param canalDestination canal Instance name
     * @param zookeeperHosts zookeeper address
     * @param canalOuterAdapters External adapter group
     */
    public CanalAdapterWorker(CanalClientConfig canalClientConfig, String canalDestination, String zookeeperHosts,
                              List<List<OuterAdapter>> canalOuterAdapters){
        super(canalOuterAdapters);
        this.canalDestination = canalDestination;
        this.canalClientConfig = canalClientConfig;
        connector = CanalConnectors.newClusterConnector(zookeeperHosts, canalDestination, "", "");
        ((ClusterCanalConnector) connector).setSoTimeout(SO_TIMEOUT);
    }

    @Override
    protected void process() {
        while (!running) { // waiting until running == true
            while (!running) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            }
        }

        int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
        if (retry == -1) {
            // Number of retries - 1 indicates that retries are blocked all the time in case of exception
            retry = Integer.MAX_VALUE;
        }
        // long timeout = canalClientConfig.getTimeout() == null ? 300000 :
        // canalClientConfig.getTimeout(); / / the default timeout is 5 minutes
        Integer batchSize = canalClientConfig.getBatchSize();
        if (batchSize == null) {
            batchSize = BATCH_SIZE;
        }

        while (running) {
            try {
                syncSwitch.get(canalDestination);

                logger.info("=============> Start to connect destination: {} <=============", this.canalDestination);
                connector.connect();
                logger.info("=============> Start to subscribe destination: {} <=============", this.canalDestination);
                connector.subscribe();
                logger.info("=============> Subscribe destination: {} succeed <=============", this.canalDestination);
                while (running) {
                    try {
                        syncSwitch.get(canalDestination, 1L, TimeUnit.MINUTES);
                    } catch (TimeoutException e) {
                        break;
                    }
                    if (!running) {
                        break;
                    }

                    for (int i = 0; i < retry; i++) {
                        if (!running) {
                            break;
                        }
                        Message message = connector.getWithoutAck(batchSize); // Get the specified amount of data
                        long batchId = message.getId();
                        try {
                            int size = message.getEntries().size();
                            if (batchId == -1 || size == 0) {
                                Thread.sleep(500);
                            } else {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("destination: {} batchId: {} batchSize: {} ",
                                        canalDestination,
                                        batchId,
                                        size);
                                }
                                long begin = System.currentTimeMillis();
                                writeOut(message);
                                if (logger.isDebugEnabled()) {
                                    logger.debug("destination: {} batchId: {} elapsed time: {} ms",
                                        canalDestination,
                                        batchId,
                                        System.currentTimeMillis() - begin);
                                }
                            }
                            connector.ack(batchId); // Submit confirmation
                            break;
                        } catch (Exception e) {
                            if (i != retry - 1) {
                                connector.rollback(batchId); // Processing failed, rollback data
                                logger.error(e.getMessage() + " Error sync and rollback, execute times: " + (i + 1));
                            } else {
                                connector.ack(batchId);
                                logger.error(e.getMessage() + " Error sync but ACK!");
                            }
                            Thread.sleep(500);
                        }
                    }
                }

            } catch (Throwable e) {
                logger.error("process error!", e);
            } finally {
                connector.disconnect();
                logger.info("=============> Disconnect destination: {} <=============", this.canalDestination);
            }

            if (running) { // is reconnect
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // ignore
                }
            }
        }
    }

    @Override
    public void stop() {
        try {
            if (!running) {
                return;
            }

            if (connector instanceof ClusterCanalConnector) {
                ((ClusterCanalConnector) connector).stopRunning();
            } else if (connector instanceof SimpleCanalConnector) {
                ((SimpleCanalConnector) connector).stopRunning();
            }

            running = false;

            syncSwitch.release(canalDestination);

            logger.info("destination {} is waiting for adapters' worker thread die!", canalDestination);
            if (thread != null) {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    // ignore
                }
            }
            groupInnerExecutorService.shutdown();
            logger.info("destination {} adapters worker thread dead!", canalDestination);
            canalOuterAdapters.forEach(outerAdapters -> outerAdapters.forEach(OuterAdapter::destroy));
            logger.info("destination {} all adapters destroyed!", canalDestination);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }
}
  • The process method of CanalAdapterWorker uses two layers of while loop. The first layer executes syncSwitch.get(canalDestination). If there is an exception, execute connector.disconnect() in finally. The second layer executes syncSwitch.get(canalDestination, 1L, TimeUnit.MINUTES), if there is TimeoutException, continue the cycle, then use the cycle of retry times to execute connector.getWithoutAck(batchSize), then call writeOut(message), finally execute connector.ack(batchId), then jump out of the loop; if there is an exception, execute connector.rollback (connector.rollback) without exceeding the number of retry, and go beyond the number of times. P method executes connector.stopRunning, then syncSwitch.release(canalDestination), groupInnerExecutorService.shutdown(), outerdadapter's destroy method

AbstractCanalAdapterWorker

canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

public abstract class AbstractCanalAdapterWorker {

    protected final Logger                    logger  = LoggerFactory.getLogger(this.getClass());

    protected String                          canalDestination;                                                // canal example
    protected String                          groupId = null;                                                  // groupId
    protected List<List<OuterAdapter>>        canalOuterAdapters;                                              // External adapter
    protected CanalClientConfig               canalClientConfig;                                               // To configure
    protected ExecutorService                 groupInnerExecutorService;                                       // In group worker pool
    protected volatile boolean                running = false;                                                 // Running or not
    protected Thread                          thread  = null;
    protected Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error", e);

    protected SyncSwitch                      syncSwitch;

    public AbstractCanalAdapterWorker(List<List<OuterAdapter>> canalOuterAdapters){
        this.canalOuterAdapters = canalOuterAdapters;
        this.groupInnerExecutorService = Util.newFixedThreadPool(canalOuterAdapters.size(), 5000L);
        syncSwitch = (SyncSwitch) SpringContext.getBean(SyncSwitch.class);
    }

    protected void writeOut(final Message message) {
        List<Future<Boolean>> futures = new ArrayList<>();
        // Inter group adapter runs in parallel
        canalOuterAdapters.forEach(outerAdapters -> {
            final List<OuterAdapter> adapters = outerAdapters;
            futures.add(groupInnerExecutorService.submit(() -> {
                try {
                    // Try not to configure the adapter in the group
                    adapters.forEach(adapter -> {
                        long begin = System.currentTimeMillis();
                        List<Dml> dmls = MessageUtil.parse4Dml(canalDestination, groupId, message);
                        if (dmls != null) {
                            batchSync(dmls, adapter);

                            if (logger.isDebugEnabled()) {
                                logger.debug("{} elapsed time: {}",
                                    adapter.getClass().getName(),
                                    (System.currentTimeMillis() - begin));
                            }
                        }
                    });
                    return true;
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return false;
                }
            }));

            // Wait for all adapter writes to complete
            // Because it is a concurrent operation between groups, it blocks until the most time-consuming group operation is completed
            RuntimeException exception = null;
            for (Future<Boolean> future : futures) {
                try {
                    if (!future.get()) {
                        exception = new RuntimeException("Outer adapter sync failed! ");
                    }
                } catch (Exception e) {
                    exception = new RuntimeException(e);
                }
            }
            if (exception != null) {
                throw exception;
            }
        });
    }

    private void batchSync(List<Dml> dmls, OuterAdapter adapter) {
        // Batch synchronization
        if (dmls.size() <= canalClientConfig.getSyncBatchSize()) {
            adapter.sync(dmls);
        } else {
            int len = 0;
            List<Dml> dmlsBatch = new ArrayList<>();
            for (Dml dml : dmls) {
                dmlsBatch.add(dml);
                if (dml.getData() == null || dml.getData().isEmpty()) {
                    len += 1;
                } else {
                    len += dml.getData().size();
                }
                if (len >= canalClientConfig.getSyncBatchSize()) {
                    adapter.sync(dmlsBatch);
                    dmlsBatch.clear();
                    len = 0;
                }
            }
            if (!dmlsBatch.isEmpty()) {
                adapter.sync(dmlsBatch);
            }
        }
    }

    //......

}
  • AbstractThe writeOut method of the canaladapterworker traverses the canalauteradapters and submits batchSync tasks to the groupInnerExecutorService one by one; the batchSync method executes adapter.sync(dmls) in batches according to the configuration of canalClientConfig.getSyncBatchSize()

SyncSwitch

canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/common/SyncSwitch.java

@Component
public class SyncSwitch {

    private static final String                    SYN_SWITCH_ZK_NODE = "/sync-switch/";

    private static final Map<String, BooleanMutex> LOCAL_LOCK         = new ConcurrentHashMap<>();

    private static final Map<String, BooleanMutex> DISTRIBUTED_LOCK   = new ConcurrentHashMap<>();

    private Mode                                   mode               = Mode.LOCAL;

    @Resource
    private AdapterCanalConfig                     adapterCanalConfig;
    @Resource
    private CuratorClient                          curatorClient;

    @PostConstruct
    public void init() {
        CuratorFramework curator = curatorClient.getCurator();
        if (curator != null) {
            mode = Mode.DISTRIBUTED;
            DISTRIBUTED_LOCK.clear();
            for (String destination : adapterCanalConfig.DESTINATIONS) {
                // Corresponding to each destination registration lock
                BooleanMutex mutex = new BooleanMutex(true);
                initMutex(curator, destination, mutex);
                DISTRIBUTED_LOCK.put(destination, mutex);
                startListen(destination, mutex);
            }
        } else {
            mode = Mode.LOCAL;
            LOCAL_LOCK.clear();
            for (String destination : adapterCanalConfig.DESTINATIONS) {
                // Corresponding to each destination registration lock
                LOCAL_LOCK.put(destination, new BooleanMutex(true));
            }
        }
    }

    public void get(String destination) throws InterruptedException {
        if (mode == Mode.LOCAL) {
            BooleanMutex mutex = LOCAL_LOCK.get(destination);
            if (mutex != null) {
                mutex.get();
            }
        } else {
            BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination);
            if (mutex != null) {
                mutex.get();
            }
        }
    }

    public void get(String destination, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        if (mode == Mode.LOCAL) {
            BooleanMutex mutex = LOCAL_LOCK.get(destination);
            if (mutex != null) {
                mutex.get(timeout, unit);
            }
        } else {
            BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination);
            if (mutex != null) {
                mutex.get(timeout, unit);
            }
        }
    }

    public synchronized void release(String destination) {
        if (mode == Mode.LOCAL) {
            BooleanMutex mutex = LOCAL_LOCK.get(destination);
            if (mutex != null && !mutex.state()) {
                mutex.set(true);
            }
        }
        if (mode == Mode.DISTRIBUTED) {
            BooleanMutex mutex = DISTRIBUTED_LOCK.get(destination);
            if (mutex != null && !mutex.state()) {
                mutex.set(true);
            }
        }
    }

    //......
}
  • SyncSwitch supports LOCAL and DISTRIBUTED modes. Its get method executes mutex.get() or mutex.get(timeout, unit) method, and its release method executes mutex.set(true)

Summary

The process method of CanalAdapterWorker uses two layers of while loop. The first layer executes syncSwitch.get(canalDestination). If there is an exception, execute connector.disconnect() in finally. The second layer executes syncSwitch.get(canalDestination, 1L, TimeUnit.MINUTES), if there is TimeoutException, continue the cycle, then use the cycle of retry times to execute connector.getWithoutAck(batchSize), then call writeOut(message), finally execute connector.ack(batchId), then jump out of the loop; if there is an exception, execute connector.rollback (connector.rollback) without exceeding the number of retry, and go beyond the number of times. P method executes connector.stop running, then syncSwitch.release(canalDestination), groupInnerExecutorService.shutdown(), outerdadapter's destroy method

doc

Keywords: Programming Java Zookeeper

Added by Saviola on Mon, 06 Apr 2020 03:42:57 +0300