Talk about puma's DefaultTaskExecutor

order

This paper focuses on puma's DefaultTaskExecutor

TaskExecutor

puma/puma/src/main/java/com/dianping/puma/taskexecutor/TaskExecutor.java

public interface TaskExecutor extends LifeCycle {

    boolean isStop();

    boolean isMerging();

    void stopUntil(long timestamp);

    void cancelStopUntil();

    void setContext(PumaContext context);

    void initContext();

    PumaContext getContext();

    String getTaskId();

    void setTaskId(String taskId);

    String getTaskName();

    void setTaskName(String taskName);

    String getDefaultBinlogFileName();

    void setDefaultBinlogFileName(String binlogFileName);

    Long getDefaultBinlogPosition();

    void setDefaultBinlogPosition(Long binlogFileName);

    void setInstanceStorageManager(InstanceStorageManager holder);

    List<Sender> getFileSender();

    DataHandler getDataHandler();

    void resume() throws Exception;

    void pause() throws Exception;

    PumaTaskStateEntity getTaskState();

    void setTaskState(PumaTaskStateEntity taskState);

    void setInstanceTask(InstanceTask instanceTask);

    InstanceTask getInstanceTask();

    TableSet getTableSet();
}
  • TaskExecutor inherits life cycle and defines methods such as initContext and getContext

AbstractTaskExecutor

puma/puma/src/main/java/com/dianping/puma/taskexecutor/AbstractTaskExecutor.java

@ThreadUnSafe
public abstract class AbstractTaskExecutor implements TaskExecutor {
	private PumaContext context;

	private String taskId;

	private long serverId;

	protected String taskName;

	protected Date beginTime;

	protected TableSet tableSet;

	private String defaultBinlogFileName;

	private Long defaultBinlogPosition;

	protected Parser parser;

	protected DataHandler dataHandler;

	protected Dispatcher dispatcher;

	private volatile boolean stop = true;

	protected InstanceStorageManager instanceStorageManager;

	protected PumaTaskStateEntity state;

	protected InstanceManager instanceManager;

	@Override
	public String getTaskId() {
		return taskId;
	}

	@Override
	public void setTaskId(String taskId) {
		this.taskId = taskId;
	}

	@Override
	public String getTaskName() {
		return taskName;
	}

	@Override
	public void setTaskName(String taskName) {
		this.taskName = taskName;
	}

	/**
	 * @param instanceStorageManager
	 *           the binlogPositionHolder to set
	 */
	public void setInstanceStorageManager(InstanceStorageManager instanceStorageManager) {
		this.instanceStorageManager = instanceStorageManager;
	}

	public void setContext(PumaContext context) {
		this.context = context;
	}

	public PumaContext getContext() {
		return context;
	}

	public String getDefaultBinlogFileName() {
		return defaultBinlogFileName;
	}

	public void setDefaultBinlogFileName(String binlogFileName) {
		this.defaultBinlogFileName = binlogFileName;
	}

	/**
	 * @return the defaultBinlogPosition
	 */
	public Long getDefaultBinlogPosition() {
		return defaultBinlogPosition;
	}

	/**
	 * @param defaultBinlogPosition
	 *           the defaultBinlogPosition to set
	 */
	public void setDefaultBinlogPosition(Long defaultBinlogPosition) {
		this.defaultBinlogPosition = defaultBinlogPosition;
	}

	/**
	 * @param parser
	 *           the parser to set
	 */
	public void setParser(Parser parser) {
		this.parser = parser;
	}

	/**
	 * @param dataHandler
	 *           the dataHandler to set
	 */
	public void setDataHandler(DataHandler dataHandler) {
		this.dataHandler = dataHandler;
	}

	/**
	 * @param dispatcher
	 *           the dispatcher to set
	 */
	public void setDispatcher(Dispatcher dispatcher) {
		this.dispatcher = dispatcher;
	}

	public long getServerId() {
		return serverId;
	}

	public void setServerId(long serverId) {
		this.serverId = serverId;
	}

	public boolean isStop() {
		return stop;
	}

	protected abstract void doStop() throws Exception;

	protected abstract void doStart() throws Exception;

	@Override
	public void start() {
		try {
			stop = false;

			parser.start();
			dataHandler.start();
			dispatcher.start();
			doStart();
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public void stop() {
		try {
			stop = true;

			parser.stop();
			dataHandler.stop();
			dispatcher.stop();

			doStop();
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	public void resume() throws Exception {
		stop = false;
	}

	public void pause() throws Exception {
		stop = true;
	}

	@Override
	public List<Sender> getFileSender() {
		return dispatcher.getSenders();
	}

	@Override
	public DataHandler getDataHandler() {
		return this.dataHandler;
	}

	public PumaTaskStateEntity getTaskState() {
		return state;
	}

	public void setTaskState(PumaTaskStateEntity state) {
		this.state = state;
	}

	public Date getBeginTime() {
		return beginTime;
	}

	public void setBeginTime(Date beginTime) {
		this.beginTime = beginTime;
	}

	public TableSet getTableSet() {
		return tableSet;
	}

	public void setTableSet(TableSet tableSet) {
		this.tableSet = tableSet;
	}

	public InstanceManager getInstanceManager() {
		return instanceManager;
	}

	public void setInstanceManager(InstanceManager instanceManager) {
		this.instanceManager = instanceManager;
	}
}
  • AbstractTaskExecutor declares to implement TaskExecutor interface, which defines properties such as context, defaultBinlogFileName, defaultBinlogPosition, parser, dataHandler, dispatcher, etc.; its start method implements start method and doStart method of parser, dataHandler, dispatcher; its stop method implements stop method and doStop method of parser, dataHandler, dispatcher

DefaultTaskExecutor

puma/puma/src/main/java/com/dianping/puma/taskexecutor/DefaultTaskExecutor.java

@ThreadUnSafe
public class DefaultTaskExecutor extends AbstractTaskExecutor {

    private static final Logger LOG = LoggerFactory.getLogger(DefaultTaskExecutor.class);

    private SrcDbEntity currentSrcDbEntity;

    private DefaultTableMetaInfoFetcher tableMetaInfoFetcher;

    private String encoding = "utf-8";

    private Socket mysqlSocket;

    private InputStream is;

    private OutputStream os;

    private InstanceTask instanceTask;

    private boolean merging = false;

    private long runUntilTimestamp;

    @Override
    public void doStart() throws Exception {
        Thread.currentThread().setName("DefaultTaskExecutor-" + taskName);
        long failCount = 0;
        merging = false;
        SystemStatusManager.addServer(getTaskName(), "", 0, tableSet);

        do {
            try {
                loadServerId(instanceManager.getUrlByCluster(instanceTask.getInstance()));

                // Read position/file file file
                BinlogInfo binlogInfo = instanceStorageManager.getBinlogInfo(getContext().getPumaServerName());

                if (binlogInfo == null) {
                    this.currentSrcDbEntity = initSrcDbByServerId(-1);
                    if (beginTime != null) {
                        binlogInfo = getBinlogByTimestamp(beginTime.getTime() / 1000);
                    }
                } else {
                    this.currentSrcDbEntity = initSrcDbByServerId(binlogInfo.getServerId());

                    if (binlogInfo.getServerId() != currentSrcDbEntity.getServerId()) {
                        BinlogInfo oldBinlogInfo = binlogInfo;
                        binlogInfo = getBinlogByTimestamp(oldBinlogInfo.getTimestamp() - 60);
                        if (binlogInfo == null) {
                            throw new IOException("Switch Binlog Failed!");
                        } else {
                            Cat.logEvent("BinlogSwitch", taskName, Message.SUCCESS,
                                    oldBinlogInfo.toString() + " -> " + binlogInfo.toString());
                        }
                    }
                }

                updateTableMetaInfoFetcher();
                getContext().setMasterUrl(currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());

                if (!connect()) {
                    throw new IOException("Connection failed.");
                }

                initConnect();

                initBinlogPosition(binlogInfo);

                if (dumpBinlog()) {
                    processBinlog();
                } else {
                    throw new IOException("Binlog dump failed.");
                }
            } catch (Exception e) {
                if (++failCount % 3 == 0) {
                    this.currentSrcDbEntity = chooseNextSrcDb();
                    updateTableMetaInfoFetcher();
                    failCount = 0;
                }
                String msg = "Exception occurs. taskName: " + getTaskName() + " dbServerId: " + (currentSrcDbEntity == null ? 0 : currentSrcDbEntity.getServerId())
                        + ". Reconnect...";
                LOG.error(msg, e);
                Cat.logError(msg, e);

                Thread.sleep(((failCount % 10) + 1) * 2000);
            }
        } while (!isStop() && !Thread.currentThread().isInterrupted());

    }

    protected void doStop() throws Exception {
        LOG.info("TaskName: " + getTaskName() + ", Stopped.");
        closeTransport();
        SystemStatusManager.deleteServer(getTaskName());
    }

    //......

}
  • DefaultTaskExecutor inherits AbstractTaskExecutor, whose doStart method passes in stanceStorageManager.getBinlogInfo *** Close transport SystemStatusManager.deleteServer(getTaskName()) method

getBinlogByTimestamp

    protected BinlogInfo getBinlogByTimestamp(long time) throws IOException {
        BinlogInfo binlogResult = null;
        Transaction t = Cat.newTransaction("BinlogFindByTime", taskName);

        Cat.logEvent("BinlogFindByTime.Time", String.valueOf(time));

        try {
            if (!connect()) {
                throw new IOException("Connection failed.");
            }
            initConnect();
            List<BinlogInfo> binaryLogs = getBinaryLogs();

            Cat.logEvent("BinlogFindByTime.BinaryLogs", currentSrcDbEntity.toString(), Message.SUCCESS, Joiner.on(",").join(binaryLogs));

            BinlogInfo closestBinlogInfo = null;

            for (int k = binaryLogs.size() - 1; k >= 0; k--) {
                if (binlogResult != null) {
                    break;
                }

                BinlogInfo newBinlogInfo = binaryLogs.get(k);

                Cat.logEvent("BinlogFindByTime.Start", newBinlogInfo.toString());

                getContext().setDBServerId(currentSrcDbEntity.getServerId());
                getContext().setBinlogFileName(newBinlogInfo.getBinlogFile());
                getContext().setBinlogStartPos(4);
                getContext().setMasterUrl(currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());

                if (!connect()) {
                    throw new IOException("Connection failed.");
                }
                initConnect();

                if (dumpBinlog()) {
                    while (!isStop()) {
                        BinlogPacket binlogPacket = (BinlogPacket) PacketFactory.parsePacket(is,
                                PacketType.BINLOG_PACKET,
                                getContext());

                        if (!binlogPacket.isOk()) {
                            LOG.error("TaskName: " + getTaskName() + ", Binlog packet response error.");
                            throw new IOException("TaskName: " + getTaskName() + ", Binlog packet response error.");
                        } else {
                            BinlogEvent binlogEvent = parser.parse(binlogPacket.getBinlogBuf(), getContext());

                            try {
                                getContext().setNextBinlogPos(binlogEvent.getHeader().getNextPosition());

                                if (binlogEvent.getHeader().getEventType() == BinlogConstants.ROTATE_EVENT) {
                                    if (closestBinlogInfo == null) {
                                        break;
                                    } else {
                                        continue;
                                    }
                                }

                                if (binlogEvent.getHeader().getTimestamp() >= time) {
                                    if (closestBinlogInfo != null) {
                                        binlogResult = closestBinlogInfo;
                                    }
                                    break;
                                }

                                if (binlogEvent.getHeader().getEventType() == BinlogConstants.XID_EVENT
                                        && binlogEvent.getHeader().getTimestamp() < time) {
                                    closestBinlogInfo = new BinlogInfo(
                                            currentSrcDbEntity.getServerId(),
                                            getContext().getBinlogFileName(),
                                            binlogEvent.getHeader().getNextPosition(),
                                            0, binlogEvent.getHeader().getTimestamp());
                                }
                            } finally {
                                if (binlogEvent.getHeader().getEventType() == BinlogConstants.ROTATE_EVENT) {
                                    RotateEvent rotateEvent = (RotateEvent) binlogEvent;
                                    getContext().setBinlogFileName(rotateEvent.getNextBinlogFileName());
                                    getContext().setBinlogStartPos(rotateEvent.getFirstEventPosition());
                                } else {
                                    getContext().setBinlogStartPos(binlogEvent.getHeader().getNextPosition());
                                }
                            }
                        }
                    }
                } else {
                    throw new IOException("Binlog dump failed.");
                }
            }

            Cat.logEvent("BinlogFindByTime.Success", taskName, Message.SUCCESS,
                    time + " -> " + (binlogResult == null ? "null" : binlogResult.toString()));
            t.setStatus(Message.SUCCESS);
            t.complete();
            return binlogResult;
        } catch (IOException e) {
            t.setStatus(e);
            t.complete();
            throw e;
        }
    }
  • *** binlogEvent.getHeader ***

connect

    private boolean connect() {
        try {
            closeTransport();
            this.mysqlSocket = new Socket();
            this.mysqlSocket.setTcpNoDelay(false);
            this.mysqlSocket.setKeepAlive(true);
            this.mysqlSocket.connect(new InetSocketAddress(currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort()));
            this.is = new BufferedInputStream(mysqlSocket.getInputStream());
            this.os = new BufferedOutputStream(mysqlSocket.getOutputStream());
            PacketFactory.parsePacket(is, PacketType.CONNECT_PACKET, getContext());

            LOG.info("TaskName: " + getTaskName() + ", Connection db success.");

            return true;
        } catch (Exception e) {
            LOG.error("TaskName: " + getTaskName() + ", Connect failed. Reason: " + e.getMessage());

            return false;
        }
    }
  • The connect method executes closeTransport first, and then creates mysqlSocket to connect

initConnect

    protected void initConnect() throws IOException {
        if (!auth()) {
            throw new IOException("Login failed.");
        }

        if (getContext().isCheckSum()) {
            if (!updateSetting()) {
                throw new IOException("Update setting command failed.");
            }
        }

        if (!queryBinlogFormat()) {
            throw new IOException("Query config binlogformat failed.");
        }
        if (!queryBinlogImage()) {
            throw new IOException("Query config binlog row image failed.");
        }

        if (queryServerId() != currentSrcDbEntity.getServerId()) {
            throw new IOException("Server Id Changed.");
        }
    }

    private boolean auth() {
        try {
            LOG.info("server logining taskName: " + getTaskName() + " host: " + currentSrcDbEntity.getHost() + " port: " + currentSrcDbEntity.getPort() + " username: "
                    + currentSrcDbEntity.getUsername() + " dbServerId: " + currentSrcDbEntity.getServerId());
            AuthenticatePacket authPacket = (AuthenticatePacket) PacketFactory.createCommandPacket(
                    PacketType.AUTHENTICATE_PACKET, getContext());

            authPacket.setPassword(currentSrcDbEntity.getPassword());
            authPacket.setUser(currentSrcDbEntity.getUsername());
            authPacket.buildPacket(getContext());
            authPacket.write(os, getContext());

            OKErrorPacket okErrorPacket = (OKErrorPacket) PacketFactory.parsePacket(is, PacketType.OKERROR_PACKET,
                    getContext());
            boolean isAuth;

            if (okErrorPacket.isOk()) {
                LOG.info("TaskName: " + getTaskName() + ", Server login success.");
                isAuth = true;
            } else {
                isAuth = false;
                LOG.error("TaskName: " + getTaskName() + ", Login failed. Reason: " + okErrorPacket.getMessage());
            }

            return isAuth;
        } catch (Exception e) {
            LOG.error("TaskName: " + getTaskName() + ", Login failed. Reason: " + e.getMessage());

            return false;
        }
    }

    private boolean queryBinlogFormat() throws IOException {
        try {
            QueryExecutor executor = new QueryExecutor(is, os);
            String cmd = "show global variables like 'binlog_format'";
            ResultSet rs = executor.query(cmd, getContext());
            List<String> columnValues = rs.getFiledValues();
            boolean isQuery = true;
            if (columnValues == null || columnValues.size() != 2 || columnValues.get(1) == null) {
                LOG.error("TaskName: " + getTaskName()
                        + ", QueryConfig failed Reason:unexcepted binlog format query result.");
                isQuery = false;
            }
            BinlogFormat binlogFormat = BinlogFormat.valuesOf(columnValues.get(1));
            String eventName = String.format("slave(%s) -- db(%s:%d)", getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());
            if (binlogFormat == null || !binlogFormat.isRow()) {
                isQuery = false;
                LOG.error("TaskName: " + getTaskName() + ", Unexcepted binlog format: " + binlogFormat.value);
            }

            Cat.logEvent("Slave.dbBinlogFormat", eventName, isQuery ? Message.SUCCESS : "1", "");
            if (isQuery) {
                LOG.info("TaskName: " + getTaskName() + ", Query config binlogformat is legal.");
            }
            return isQuery;
        } catch (Exception e) {
            LOG.error("TaskName: " + getTaskName() + ", QueryConfig failed Reason: " + e.getMessage());
            return false;
        }
    }

    private boolean queryBinlogImage() throws IOException {
        try {
            QueryExecutor executor = new QueryExecutor(is, os);
            String cmd = "show variables like 'binlog_row_image'";
            ResultSet rs = executor.query(cmd, getContext());
            List<String> columnValues = rs.getFiledValues();
            boolean isQuery = true;
            if (columnValues == null || columnValues.size() == 0) {// 5.1
                isQuery = true;
            } else if (columnValues != null && columnValues.size() == 2 && columnValues.get(1) != null) {// 5.6
                BinlogRowImage binlogRowImage = BinlogRowImage.valuesOf(columnValues.get(1));
                isQuery = true;
                if (binlogRowImage == null || !binlogRowImage.isFull()) {
                    isQuery = false;
                    LOG.error("TaskName: " + getTaskName() + ", Unexcepted binlog row image: " + binlogRowImage.value);
                }
            } else {
                LOG.error("TaskName: " + getTaskName()
                        + ", QueryConfig failed Reason:unexcepted binlog row image query result.");
                isQuery = false;
            }
            String eventName = String.format("slave(%s) -- db(%s:%d)", getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());
            Cat.logEvent("Slave.dbBinlogRowImage", eventName, isQuery ? Message.SUCCESS : "1", "");
            if (isQuery) {
                LOG.info("TaskName: " + getTaskName() + ", Query config binlog row image is legal.");
            }
            return isQuery;
        } catch (Exception e) {
            LOG.error("TaskName: " + getTaskName() + ", QueryConfig failed Reason: " + e.getMessage());
            return false;
        }
    }

  • initConnect method executes auth, querybinnlogformat and querybinnlogimage methods in turn; auth method performs account password verification; querybinnlogformat mainly executes show global variables like 'binlog_ Query binlogimage mainly executes show variables like 'binlog_row_image'

initBinlogPosition

    protected void initBinlogPosition(BinlogInfo binlogInfo) throws IOException {
        if (binlogInfo == null) {
            List<BinlogInfo> binaryLogs = getBinaryLogs();
            BinlogInfo begin = beginTime == null ? binaryLogs.get(binaryLogs.size() - 1) : binaryLogs.get(0);
            binlogInfo = new BinlogInfo(currentSrcDbEntity.getServerId(), begin.getBinlogFile(), 4l, 0, begin.getTimestamp());
        }

        getContext().setDBServerId(currentSrcDbEntity.getServerId());
        getContext().setBinlogFileName(binlogInfo.getBinlogFile());
        getContext().setBinlogStartPos(binlogInfo.getBinlogPosition());
        setBinlogInfo(binlogInfo);

        SystemStatusManager.addServer(getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort(), tableSet);
        SystemStatusManager.updateServerBinlog(getTaskName(), binlogInfo);
    }
  • initBinlogPosition is mainly used to set binlogInfo information to PumaContext

dumpBinlog

    private boolean dumpBinlog() {
        try {
            ComBinlogDumpPacket dumpBinlogPacket = (ComBinlogDumpPacket) PacketFactory.createCommandPacket(
                    PacketType.COM_BINLOG_DUMP_PACKET, getContext());
            dumpBinlogPacket.setBinlogFileName(getContext().getBinlogFileName());
            dumpBinlogPacket.setBinlogFlag(0);
            dumpBinlogPacket.setBinlogPosition(getContext().getBinlogStartPos());
            dumpBinlogPacket.setServerId(getServerId());
            dumpBinlogPacket.buildPacket(getContext());

            dumpBinlogPacket.write(os, getContext());

            OKErrorPacket dumpCommandResultPacket = (OKErrorPacket) PacketFactory.parsePacket(is,
                    PacketType.OKERROR_PACKET, getContext());

            if (dumpCommandResultPacket.isOk()) {
                LOG.info("TaskName: " + getTaskName() + ", Dump binlog command success.");

                return true;
            } else {
                LOG.error("TaskName: " + getTaskName() + ", Dump binlog failed. Reason: "
                        + dumpCommandResultPacket.getMessage());

                return false;
            }
        } catch (Exception e) {
            LOG.error("TaskName: " + getTaskName() + " Dump binlog failed. Reason: " + e.getMessage());

            return false;
        }

    }
  • dumpBinlog method mainly sends COM_BINLOG_DUMP_PACKET

processBinlog

    private void processBinlog() throws IOException {
        while (!isStop()) {
            BinlogPacket binlogPacket = (BinlogPacket) PacketFactory.parsePacket(is, PacketType.BINLOG_PACKET,
                    getContext());

            if (!binlogPacket.isOk()) {
                LOG.error("TaskName: " + getTaskName() + ", Binlog packet response error.");
                throw new IOException("TaskName: " + getTaskName() + ", Binlog packet response error.");
            } else {
                processBinlogPacket(binlogPacket);
            }
        }
    }

    protected void processBinlogPacket(BinlogPacket binlogPacket) throws IOException {
        BinlogEvent binlogEvent = parser.parse(binlogPacket.getBinlogBuf(), getContext());

        if (merging) {
            if (binlogEvent.getHeader().getTimestamp() >= runUntilTimestamp) {
                stop();
            }
        }

        SystemStatusManager.incServerParsedCounter(getTaskName());

        if (binlogEvent.getHeader().getEventType() == BinlogConstants.INTVAR_EVENT
                || binlogEvent.getHeader().getEventType() == BinlogConstants.RAND_EVENT
                || binlogEvent.getHeader().getEventType() == BinlogConstants.USER_VAR_EVENT) {
            LOG.error("TaskName: " + getTaskName() + ", Binlog_format is MIXED or STATEMENT ,System is not support.");
            String eventName = String.format("slave(%s) -- db(%s:%d)", getTaskName(), currentSrcDbEntity.getHost(), currentSrcDbEntity.getPort());
            Cat.logEvent("Slave.dbBinlogFormat", eventName, "1", "");
            Cat.logError("Puma.server.mixedorstatement.format", new IllegalArgumentException("TaskName: "
                    + getTaskName() + ", Binlog_format is MIXED or STATEMENT ,System is not support."));
            stopTask();
        }

        if (binlogEvent.getHeader().getEventType() != BinlogConstants.FORMAT_DESCRIPTION_EVENT) {
            getContext().setNextBinlogPos(binlogEvent.getHeader().getNextPosition());
        }

        if (binlogEvent.getHeader().getEventType() == BinlogConstants.ROTATE_EVENT) {
            processRotateEvent(binlogEvent);
        } else {
            processDataEvent(binlogEvent);
        }
    }

    protected void processRotateEvent(BinlogEvent binlogEvent) {
        RotateEvent rotateEvent = (RotateEvent) binlogEvent;
        getContext().setBinlogFileName(rotateEvent.getNextBinlogFileName());
        getContext().setBinlogStartPos(rotateEvent.getFirstEventPosition());
    }

    protected void processDataEvent(BinlogEvent binlogEvent) {
        DataHandlerResult dataHandlerResult = null;
        // Handle multiple lines of a binlogEvent all the time, and distribute each line immediately after processing, so as to prevent a binlogEvent from consuming too much memory due to too many changedevents
        int eventIndex = 0;
        do {
            dataHandlerResult = dataHandler.process(binlogEvent, getContext());
            if (dataHandlerResult != null && !dataHandlerResult.isEmpty()) {
                ChangedEvent changedEvent = dataHandlerResult.getData();

                changedEvent.getBinlogInfo().setEventIndex(eventIndex++);

                updateOpsCounter(changedEvent);

                dispatch(changedEvent);
            }
        } while (dataHandlerResult != null && !dataHandlerResult.isFinished());

        if (binlogEvent.getHeader().getEventType() != BinlogConstants.FORMAT_DESCRIPTION_EVENT) {
            getContext().setBinlogStartPos(binlogEvent.getHeader().getNextPosition());
            setBinlogInfo(new BinlogInfo(getBinlogInfo().getServerId(), getBinlogInfo().getBinlogFile(), binlogEvent
                    .getHeader().getNextPosition(), 0, 0));
        }

        BinlogInfo binlogInfo = new BinlogInfo(getContext().getDBServerId(), getContext()
                .getBinlogFileName(), binlogEvent.getHeader().getNextPosition(), 0, binlogEvent.getHeader().getTimestamp());
        SystemStatusManager.updateServerBinlog(getTaskName(), binlogInfo);

        if (binlogEvent.getHeader().getNextPosition() != 0
                && StringUtils.isNotBlank(getContext().getBinlogFileName())
                && dataHandlerResult != null
                && !dataHandlerResult.isEmpty()
                && (dataHandlerResult.getData() instanceof DdlEvent || (dataHandlerResult.getData() instanceof RowChangedEvent && ((RowChangedEvent) dataHandlerResult
                .getData()).isTransactionCommit()))) {


            instanceStorageManager.setBinlogInfo(getTaskName(), binlogInfo);
        }
    }

- the processBinlog method circulates to receive the binlogPacket, and then executes processBinlogPacket; the method uses the parser.parse Get binlogevent, for FORMAT_DESCRIPTION_EVENT, update binlogEvent.getHeader().getNextPosition() into context; for ROTATE_EVENT executes processRotateEvent, otherwise processDataEvent; processRotateEvent mainly updates binlogFileName and binlogStartPos; processDataEvent mainly updates binlogFileName and binlogStartPos through dataHandler.process(binlogEvent, getContext()) processing, and then executing dispatch(changedEvent)

closeTransport

puma/puma/src/main/java/com/dianping/puma/taskexecutor/DefaultTaskExecutor.java

    private void closeTransport() {
        // Close in.
        try {
            if (this.is != null) {
                this.is.close();
            }
        } catch (IOException ioEx) {
            LOG.warn("Server " + this.getTaskName() + ", Failed to close the input stream.");
        } finally {
            this.is = null;
        }

        // Close os.
        try {
            if (this.os != null) {
                this.os.close();
            }
        } catch (IOException ioEx) {
            LOG.warn("Server " + this.getTaskName() + ", Failed to close the output stream");
        } finally {
            this.os = null;
        }

        // Close socket.
        try {
            if (this.mysqlSocket != null) {
                this.mysqlSocket.close();
            }
        } catch (IOException ioEx) {
            LOG.warn("Server " + this.getTaskName() + ", Failed to close the socket", ioEx);
        } finally {
            this.mysqlSocket = null;
        }
    }
  • closeTransport is mainly used to close InputStream, OutputStream and mysqlSocket

Summary

DefaultTaskExecutor inherits AbstractTaskExecutor, whose doStart method passes in stanceStorageManager.getBinlogInfo *** Close transport SystemStatusManager.deleteServer(getTaskName()) method

doc

Keywords: Programming Java socket encoding

Added by tsg on Wed, 03 Jun 2020 19:14:37 +0300