Source code analysis of database middleware MyCAT: [single library form] query

> Original address: MyCAT Source Analysis: [Single Library Form] Query
> MyCat-Server with annotated address: https://github.com/YunaiV/Mycat-Server
> (vii) This series is updated every 1-2 weeks. You are welcome to subscribe, follow and collect GitHub: https://github.com/YunaiV/Blog

1. Overview

> The main content form is sequence diagram + core code.
> If there is any misrepresentation or ambiguity, please leave a message.
> As for content form, it is very entangled. If you have any suggestions, you are especially welcome to make them.
> Microsignal: wangwenbin-server.

This article explains the code involved in the query.

(vii) Content and MyCAT Source Code Analysis: [Single Library List] Insert Super similar, on the one hand, the process itself is basically the same, on the other hand, the structure of the article is not well split. We use the logic of (vi) marking differences.

Interactions are shown below:

The whole process of MyCAT Server is as follows:

  1. Receive the MySQL Client request and parse the SQL.
  2. Get the result of routing and route.
  3. Get the MySQL connection and execute the SQL.
  4. In response to the execution result, the result is sent to MySQL Client.

Let's take a step-by-step analysis and look at the source code.

2. Receiving requests and parsing SQL

[1 - 2]

Receive a MySQL command. Before [1], there were requests for data to be read and split into a single MySQL SQL.

[3]

  1: // ⬇️⬇️⬇️[FrontendCommandHandler.java]
  2: public class FrontendCommandHandler implements NIOHandler {
  3: 
  4:     @Override
  5:     public void handle(byte[] data) {
  6:     
  7:         // ... omit part of the code
  8:         switch (data[4]) // 
  9:         {
 10:             case MySQLPacket.COM_INIT_DB:
 11:                 commands.doInitDB();
 12:                 source.initDB(data);
 13:                 break;
 14:             case MySQLPacket.COM_QUERY: // Query command
 15:                 // Count Query Command
 16:                 commands.doQuery();
 17:                 // Execute query commands
 18:                 source.query(data);
 19:                 break;
 20:             case MySQLPacket.COM_PING:
 21:                 commands.doPing();
 22:                 source.ping();
 23:                 break;
 24:             // ... omit some case s
 25:         }
 26:     }
 27: 
 28: }

SQL such as INSERT/SELECT/UPDATE/DELETE belongs to MySQL Packet.COM_QUERY. MySQL Protocol Analysis #4.2 Client Command Request Message (Client - > Server).

[4]

Resolve binary arrays into SQL. The core code is as follows:

  1: // ⬇️⬇️⬇️[FrontendConnection.java]
  2: public void query(byte[] data) {
  3:    // Get statement
  4:    String sql = null;      
  5:    try {
  6:        MySQLMessage mm = new MySQLMessage(data);
  7:        mm.position(5);
  8:        sql = mm.readString(charset);
  9:    } catch (UnsupportedEncodingException e) {
 10:        writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");
 11:        return;
 12:    }       
 13:    // Execution statement
 14:    this.query( sql );
 15: }

[5]

Parse the SQL type. The core code is as follows:

  1: // ⬇️⬇️⬇️[ServerQueryHandler.java]
  2: @Override
  3: public void query(String sql) {
  4:    // Resolving SQL Types
  5:    int rs = ServerParse.parse(sql);
  6:    int sqlType = rs & 0xff;
  7:    
  8:    switch (sqlType) {
  9:    //explain sql
 10:    case ServerParse.EXPLAIN:
 11:        ExplainHandler.handle(sql, c, rs >>> 8);
 12:        break;
 13:    // ... omit some case s
 14:        break;
 15:    case ServerParse.SELECT:
 16:        SelectHandler.handle(sql, c, rs >>> 8);
 17:        break;
 18:    // ... omit some case s
 19:    default:
 20:        if(readOnly){
 21:            LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());
 22:            c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");
 23:            break;
 24:        }
 25:        c.execute(sql, rs & 0xff);
 26:    }
 27: }
 28: 
 29:
 30: // ⬇️⬇️⬇️[ServerParse.java]
 31: public static int parse(String stmt) {
 32:    int length = stmt.length();
 33:    //FIX BUG FOR SQL SUCH AS /XXXX/SQL
 34:    int rt = -1;
 35:    for (int i = 0; i < length; ++i) {
 36:        switch (stmt.charAt(i)) {
 37:        // Omit part of case'I':
 38:        case 'i':
 39:            rt = insertCheck(stmt, i);
 40:            if (rt != OTHER) {
 41:                return rt;
 42:            }
 43:            continue;
 44:            // ... omit some case s
 45:        case 'S':
 46:        case 's':
 47:            rt = sCheck(stmt, i);
 48:            if (rt != OTHER) {
 49:                return rt;
 50:            }
 51:            continue;
 52:            // ... omit some case s
 53:        default:
 54:            continue;
 55:        }
 56:    }
 57:    return OTHER;
 58: }

🚀[6][7]

Parse the Select SQL type and distribute it to the corresponding logic. The core code is as follows:

  1: // ⬇️⬇️⬇️[SelectHandler.java]
  2: public static void handle(String stmt, ServerConnection c, int offs) {
  3:    int offset = offs;
  4:    switch (ServerParseSelect.parse(stmt, offs)) { // Resolving Select SQL Types
  5:    case ServerParseSelect.VERSION_COMMENT: // select @@VERSION_COMMENT;
  6:        SelectVersionComment.response(c);
  7:        break;
  8:    case ServerParseSelect.DATABASE: // select DATABASE();
  9:        SelectDatabase.response(c);
 10:        break;
 11:    case ServerParseSelect.USER: // select CURRENT_USER();
 12:         SelectUser.response(c);
 13:        break;
 14:    case ServerParseSelect.VERSION: // select VERSION();
 15:        SelectVersion.response(c);
 16:        break;
 17:    case ServerParseSelect.SESSION_INCREMENT: // select @@session.auto_increment_increment;
 18:        SessionIncrement.response(c);
 19:        break;
 20:    case ServerParseSelect.SESSION_ISOLATION: // select @@session.tx_isolation;
 21:        SessionIsolation.response(c);
 22:        break;
 23:    case ServerParseSelect.LAST_INSERT_ID: // select LAST_INSERT_ID();
 24:        // .... Eliminate code
 25:        break;
 26:    case ServerParseSelect.IDENTITY: // select @@identity
 27:        // .... Eliminate code
 28:        break;
 29:     case ServerParseSelect.SELECT_VAR_ALL: //
 30:         SelectVariables.execute(c,stmt);
 31:             break;
 32:     case ServerParseSelect.SESSION_TX_READ_ONLY: //
 33:         SelectTxReadOnly.response(c);
 34:            break;
 35:    default: // Others, such as select * from table
 36:        c.execute(stmt, ServerParse.SELECT);
 37:    }
 38: }
 39: // ⬇️⬇️⬇️[ServerParseSelect.java]
 40: public static int parse(String stmt, int offset) {
 41:    int i = offset;
 42:    for (; i < stmt.length(); ++i) {
 43:        switch (stmt.charAt(i)) {
 44:        case ' ':
 45:            continue;
 46:        case '/':
 47:        case '#':
 48:            i = ParseUtil.comment(stmt, i);
 49:            continue;
 50:        case '@':
 51:            return select2Check(stmt, i);
 52:        case 'D':
 53:        case 'd':
 54:            return databaseCheck(stmt, i);
 55:        case 'L':
 56:        case 'l':
 57:            return lastInsertCheck(stmt, i);
 58:        case 'U':
 59:        case 'u':
 60:            return userCheck(stmt, i);
 61:        case 'C':
 62:        case 'c':
 63:            return currentUserCheck(stmt, i);
 64:        case 'V':
 65:        case 'v':
 66:            return versionCheck(stmt, i);
 67:        default:
 68:            return OTHER;
 69:        }
 70:    }
 71:    return OTHER;
 72: }

[8]

Execute SQL, detailed analysis see below, the core code is as follows:

  1: // ⬇️⬇️⬇️[ServerConnection.java]
  2: public class ServerConnection extends FrontendConnection {
  3:    public void execute(String sql, int type) {
  4:        // .... Eliminate code
  5:        SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);
  6:        if (schema == null) {
  7:            writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,
  8:                    "Unknown MyCAT Database '" + db + "'");
  9:            return;
 10:        }
 11: 
 12:        // .... Eliminate code
 13: 
 14:        // Route to the back-end database to execute SQL
 15:        routeEndExecuteSQL(sql, type, schema);
 16:    }
 17:    
 18:     public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {
 19:        // Routing Computing
 20:        RouteResultset rrs = null;
 21:        try {
 22:            rrs = MycatServer
 23:                    .getInstance()
 24:                    .getRouterservice()
 25:                    .route(MycatServer.getInstance().getConfig().getSystem(),
 26:                            schema, type, sql, this.charset, this);
 27: 
 28:        } catch (Exception e) {
 29:            StringBuilder s = new StringBuilder();
 30:            LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e);
 31:            String msg = e.getMessage();
 32:            writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg);
 33:            return;
 34:        }
 35: 
 36:        // Executing SQL
 37:        if (rrs != null) {
 38:            // session execution
 39:            session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);
 40:        }
 41:        
 42:    }
 43: 
 44: }

3. Obtain routing results

[ 1 - 5 ]

Get the main routing process. The core code is as follows:

  1: // ⬇️⬇️⬇️[SelectHandler.java]
  2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema,
  3:        int sqlType, String stmt, String charset, ServerConnection sc)
  4:        throws SQLNonTransientException {
  5:    RouteResultset rrs = null;
  6: 
  7:    // SELECT-type SQL to detect the presence of caches
  8:    if (sqlType == ServerParse.SELECT) {
  9:        cacheKey = schema.getName() + stmt;         
 10:        rrs = (RouteResultset) sqlRouteCache.get(cacheKey);
 11:        if (rrs != null) {
 12:            checkMigrateRule(schema.getName(),rrs,sqlType);
 13:            return rrs;
 14:            }
 15:        }
 16:    }
 17: 
 18:    // .... Eliminate code
 19:    int hintLength = RouteService.isHintSql(stmt);
 20:    if(hintLength != -1){ // TODO Read: hint
 21:        // .... Eliminate code
 22:        }
 23:    } else {
 24:        stmt = stmt.trim();
 25:        rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt,
 26:                charset, sc, tableId2DataNodeCache);
 27:    }
 28: 
 29:    // Record Query Command Routing Result Cache
 30:    if (rrs != null && sqlType == ServerParse.SELECT && rrs.isCacheAble()) {
 31:        sqlRouteCache.putIfAbsent(cacheKey, rrs);
 32:    }
 33:    // The code return rrs is omitted.
 34: }
 35: // ⬇️⬇️⬇️[AbstractRouteStrategy.java]
 36: @Override
 37: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
 38:        String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {
 39: 
 40:    // .... Eliminate code
 41: 
 42:    // Processing some logic before routing; global serial number, parent-child table insertion
 43:    if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) {
 44:        return null;
 45:    }
 46: 
 47:    // .... Eliminate code
 48: 
 49:    // Check for fragmentation
 50:    if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
 51:        rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
 52:    } else {
 53:        RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);
 54:        if (returnedSet == null) {
 55:            rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);
 56:        }
 57:    }
 58: 
 59:    return rrs;
 60: }

(3) Lines 7 to 16: When a routing result cache exists in Select SQL, the cache is returned directly. (6) Lines 29 to 32: Record Select SQL routing results to the cache.

Route detailed analysis, we will open a separate article to avoid excessive content, affecting everyone's understanding of the process and logic of [insertion].

4. Get MySQL connection and execute SQL

[ 1 - 8 ]

Get a MySQL connection.

  • Physical DBNode: Physical database node.
  • Physical Data source: Physical database data source.

[ 9 - 13 ]

Send SQL to MySQL Server to execute SQL.

(5) Response execution of SQL results

The core code is as follows:

  1: // ⬇️⬇️⬇️[MySQLConnectionHandler.java]
  2: @Override
  3: protected void handleData(byte[] data) {
  4:    switch (resultStatus) {
  5:    case RESULT_STATUS_INIT:
  6:        switch (data[4]) {
  7:        case OkPacket.FIELD_COUNT:
  8:            handleOkPacket(data);
  9:            break;
 10:        case ErrorPacket.FIELD_COUNT:
 11:            handleErrorPacket(data);
 12:            break;
 13:        case RequestFilePacket.FIELD_COUNT:
 14:            handleRequestPacket(data);
 15:            break;
 16:        default: // Initialize header fields
 17:            resultStatus = RESULT_STATUS_HEADER;
 18:            header = data;
 19:            fields = new ArrayList<byte[]>((int) ByteUtil.readLength(data,
 20:                    4));
 21:        }
 22:        break;
 23:    case RESULT_STATUS_HEADER:
 24:        switch (data[4]) {
 25:        case ErrorPacket.FIELD_COUNT:
 26:            resultStatus = RESULT_STATUS_INIT;
 27:            handleErrorPacket(data);
 28:            break;
 29:        case EOFPacket.FIELD_COUNT: // Resolution field end
 30:            resultStatus = RESULT_STATUS_FIELD_EOF;
 31:            handleFieldEofPacket(data);
 32:            break;
 33:        default: // Analyzing fields
 34:            fields.add(data);
 35:        }
 36:        break;
 37:    case RESULT_STATUS_FIELD_EOF:
 38:        switch (data[4]) {
 39:        case ErrorPacket.FIELD_COUNT:
 40:            resultStatus = RESULT_STATUS_INIT;
 41:            handleErrorPacket(data);
 42:            break;
 43:        case EOFPacket.FIELD_COUNT: // End of parsing each line of records
 44:            resultStatus = RESULT_STATUS_INIT;
 45:            handleRowEofPacket(data);
 46:            break;
 47:        default: // Each row of records
 48:            handleRowPacket(data);
 49:        }
 50:        break;
 51:    default:
 52:        throw new RuntimeException("unknown status!");
 53:    }
 54: }

6. Others: Update/Delete

Process Basics and MyCAT Source Code Analysis: [Single Library List] Insert Same. We will not analyze other articles.

Keywords: SQL MySQL Java mycat

Added by TeddyKiller on Wed, 26 Jun 2019 23:23:57 +0300