> Original address: MyCAT Source Analysis: [Single Library Form] Query
> MyCat-Server with annotated address: https://github.com/YunaiV/Mycat-Server
> (vii) This series is updated every 1-2 weeks. You are welcome to subscribe, follow and collect GitHub: https://github.com/YunaiV/Blog
- 1. Overview
- 2. Receiving requests and parsing SQL
- 3. Obtain routing results
- 4. Get MySQL connection and execute SQL
- 5. Response to execution of SQL results
- 6. Others: Update/Delete
1. Overview
> The main content form is sequence diagram + core code.
> If there is any misrepresentation or ambiguity, please leave a message.
> As for content form, it is very entangled. If you have any suggestions, you are especially welcome to make them.
> Microsignal: wangwenbin-server.
This article explains the code involved in the query.
(vii) Content and MyCAT Source Code Analysis: [Single Library List] Insert Super similar, on the one hand, the process itself is basically the same, on the other hand, the structure of the article is not well split. We use the logic of (vi) marking differences.
Interactions are shown below:
The whole process of MyCAT Server is as follows:
- Receive the MySQL Client request and parse the SQL.
- Get the result of routing and route.
- Get the MySQL connection and execute the SQL.
- In response to the execution result, the result is sent to MySQL Client.
Let's take a step-by-step analysis and look at the source code.
2. Receiving requests and parsing SQL
[1 - 2]
Receive a MySQL command. Before [1], there were requests for data to be read and split into a single MySQL SQL.
[3]
1: // ⬇️⬇️⬇️[FrontendCommandHandler.java] 2: public class FrontendCommandHandler implements NIOHandler { 3: 4: @Override 5: public void handle(byte[] data) { 6: 7: // ... omit part of the code 8: switch (data[4]) // 9: { 10: case MySQLPacket.COM_INIT_DB: 11: commands.doInitDB(); 12: source.initDB(data); 13: break; 14: case MySQLPacket.COM_QUERY: // Query command 15: // Count Query Command 16: commands.doQuery(); 17: // Execute query commands 18: source.query(data); 19: break; 20: case MySQLPacket.COM_PING: 21: commands.doPing(); 22: source.ping(); 23: break; 24: // ... omit some case s 25: } 26: } 27: 28: }
SQL such as INSERT/SELECT/UPDATE/DELETE belongs to MySQL Packet.COM_QUERY. MySQL Protocol Analysis #4.2 Client Command Request Message (Client - > Server).
[4]
Resolve binary arrays into SQL. The core code is as follows:
1: // ⬇️⬇️⬇️[FrontendConnection.java] 2: public void query(byte[] data) { 3: // Get statement 4: String sql = null; 5: try { 6: MySQLMessage mm = new MySQLMessage(data); 7: mm.position(5); 8: sql = mm.readString(charset); 9: } catch (UnsupportedEncodingException e) { 10: writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'"); 11: return; 12: } 13: // Execution statement 14: this.query( sql ); 15: }
[5]
Parse the SQL type. The core code is as follows:
1: // ⬇️⬇️⬇️[ServerQueryHandler.java] 2: @Override 3: public void query(String sql) { 4: // Resolving SQL Types 5: int rs = ServerParse.parse(sql); 6: int sqlType = rs & 0xff; 7: 8: switch (sqlType) { 9: //explain sql 10: case ServerParse.EXPLAIN: 11: ExplainHandler.handle(sql, c, rs >>> 8); 12: break; 13: // ... omit some case s 14: break; 15: case ServerParse.SELECT: 16: SelectHandler.handle(sql, c, rs >>> 8); 17: break; 18: // ... omit some case s 19: default: 20: if(readOnly){ 21: LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString()); 22: c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly"); 23: break; 24: } 25: c.execute(sql, rs & 0xff); 26: } 27: } 28: 29: 30: // ⬇️⬇️⬇️[ServerParse.java] 31: public static int parse(String stmt) { 32: int length = stmt.length(); 33: //FIX BUG FOR SQL SUCH AS /XXXX/SQL 34: int rt = -1; 35: for (int i = 0; i < length; ++i) { 36: switch (stmt.charAt(i)) { 37: // Omit part of case'I': 38: case 'i': 39: rt = insertCheck(stmt, i); 40: if (rt != OTHER) { 41: return rt; 42: } 43: continue; 44: // ... omit some case s 45: case 'S': 46: case 's': 47: rt = sCheck(stmt, i); 48: if (rt != OTHER) { 49: return rt; 50: } 51: continue; 52: // ... omit some case s 53: default: 54: continue; 55: } 56: } 57: return OTHER; 58: }
🚀[6][7]
Parse the Select SQL type and distribute it to the corresponding logic. The core code is as follows:
1: // ⬇️⬇️⬇️[SelectHandler.java] 2: public static void handle(String stmt, ServerConnection c, int offs) { 3: int offset = offs; 4: switch (ServerParseSelect.parse(stmt, offs)) { // Resolving Select SQL Types 5: case ServerParseSelect.VERSION_COMMENT: // select @@VERSION_COMMENT; 6: SelectVersionComment.response(c); 7: break; 8: case ServerParseSelect.DATABASE: // select DATABASE(); 9: SelectDatabase.response(c); 10: break; 11: case ServerParseSelect.USER: // select CURRENT_USER(); 12: SelectUser.response(c); 13: break; 14: case ServerParseSelect.VERSION: // select VERSION(); 15: SelectVersion.response(c); 16: break; 17: case ServerParseSelect.SESSION_INCREMENT: // select @@session.auto_increment_increment; 18: SessionIncrement.response(c); 19: break; 20: case ServerParseSelect.SESSION_ISOLATION: // select @@session.tx_isolation; 21: SessionIsolation.response(c); 22: break; 23: case ServerParseSelect.LAST_INSERT_ID: // select LAST_INSERT_ID(); 24: // .... Eliminate code 25: break; 26: case ServerParseSelect.IDENTITY: // select @@identity 27: // .... Eliminate code 28: break; 29: case ServerParseSelect.SELECT_VAR_ALL: // 30: SelectVariables.execute(c,stmt); 31: break; 32: case ServerParseSelect.SESSION_TX_READ_ONLY: // 33: SelectTxReadOnly.response(c); 34: break; 35: default: // Others, such as select * from table 36: c.execute(stmt, ServerParse.SELECT); 37: } 38: } 39: // ⬇️⬇️⬇️[ServerParseSelect.java] 40: public static int parse(String stmt, int offset) { 41: int i = offset; 42: for (; i < stmt.length(); ++i) { 43: switch (stmt.charAt(i)) { 44: case ' ': 45: continue; 46: case '/': 47: case '#': 48: i = ParseUtil.comment(stmt, i); 49: continue; 50: case '@': 51: return select2Check(stmt, i); 52: case 'D': 53: case 'd': 54: return databaseCheck(stmt, i); 55: case 'L': 56: case 'l': 57: return lastInsertCheck(stmt, i); 58: case 'U': 59: case 'u': 60: return userCheck(stmt, i); 61: case 'C': 62: case 'c': 63: return currentUserCheck(stmt, i); 64: case 'V': 65: case 'v': 66: return versionCheck(stmt, i); 67: default: 68: return OTHER; 69: } 70: } 71: return OTHER; 72: }
[8]
Execute SQL, detailed analysis see below, the core code is as follows:
1: // ⬇️⬇️⬇️[ServerConnection.java] 2: public class ServerConnection extends FrontendConnection { 3: public void execute(String sql, int type) { 4: // .... Eliminate code 5: SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db); 6: if (schema == null) { 7: writeErrMessage(ErrorCode.ERR_BAD_LOGICDB, 8: "Unknown MyCAT Database '" + db + "'"); 9: return; 10: } 11: 12: // .... Eliminate code 13: 14: // Route to the back-end database to execute SQL 15: routeEndExecuteSQL(sql, type, schema); 16: } 17: 18: public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) { 19: // Routing Computing 20: RouteResultset rrs = null; 21: try { 22: rrs = MycatServer 23: .getInstance() 24: .getRouterservice() 25: .route(MycatServer.getInstance().getConfig().getSystem(), 26: schema, type, sql, this.charset, this); 27: 28: } catch (Exception e) { 29: StringBuilder s = new StringBuilder(); 30: LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e); 31: String msg = e.getMessage(); 32: writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg); 33: return; 34: } 35: 36: // Executing SQL 37: if (rrs != null) { 38: // session execution 39: session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type); 40: } 41: 42: } 43: 44: }
3. Obtain routing results
[ 1 - 5 ]
Get the main routing process. The core code is as follows:
1: // ⬇️⬇️⬇️[SelectHandler.java] 2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema, 3: int sqlType, String stmt, String charset, ServerConnection sc) 4: throws SQLNonTransientException { 5: RouteResultset rrs = null; 6: 7: // SELECT-type SQL to detect the presence of caches 8: if (sqlType == ServerParse.SELECT) { 9: cacheKey = schema.getName() + stmt; 10: rrs = (RouteResultset) sqlRouteCache.get(cacheKey); 11: if (rrs != null) { 12: checkMigrateRule(schema.getName(),rrs,sqlType); 13: return rrs; 14: } 15: } 16: } 17: 18: // .... Eliminate code 19: int hintLength = RouteService.isHintSql(stmt); 20: if(hintLength != -1){ // TODO Read: hint 21: // .... Eliminate code 22: } 23: } else { 24: stmt = stmt.trim(); 25: rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt, 26: charset, sc, tableId2DataNodeCache); 27: } 28: 29: // Record Query Command Routing Result Cache 30: if (rrs != null && sqlType == ServerParse.SELECT && rrs.isCacheAble()) { 31: sqlRouteCache.putIfAbsent(cacheKey, rrs); 32: } 33: // The code return rrs is omitted. 34: } 35: // ⬇️⬇️⬇️[AbstractRouteStrategy.java] 36: @Override 37: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL, 38: String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException { 39: 40: // .... Eliminate code 41: 42: // Processing some logic before routing; global serial number, parent-child table insertion 43: if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) { 44: return null; 45: } 46: 47: // .... Eliminate code 48: 49: // Check for fragmentation 50: if (schema.isNoSharding() && ServerParse.SHOW != sqlType) { 51: rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt); 52: } else { 53: RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs); 54: if (returnedSet == null) { 55: rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc); 56: } 57: } 58: 59: return rrs; 60: }
(3) Lines 7 to 16: When a routing result cache exists in Select SQL, the cache is returned directly. (6) Lines 29 to 32: Record Select SQL routing results to the cache.
Route detailed analysis, we will open a separate article to avoid excessive content, affecting everyone's understanding of the process and logic of [insertion].
4. Get MySQL connection and execute SQL
[ 1 - 8 ]
Get a MySQL connection.
- Physical DBNode: Physical database node.
- Physical Data source: Physical database data source.
[ 9 - 13 ]
Send SQL to MySQL Server to execute SQL.
(5) Response execution of SQL results
The core code is as follows:
1: // ⬇️⬇️⬇️[MySQLConnectionHandler.java] 2: @Override 3: protected void handleData(byte[] data) { 4: switch (resultStatus) { 5: case RESULT_STATUS_INIT: 6: switch (data[4]) { 7: case OkPacket.FIELD_COUNT: 8: handleOkPacket(data); 9: break; 10: case ErrorPacket.FIELD_COUNT: 11: handleErrorPacket(data); 12: break; 13: case RequestFilePacket.FIELD_COUNT: 14: handleRequestPacket(data); 15: break; 16: default: // Initialize header fields 17: resultStatus = RESULT_STATUS_HEADER; 18: header = data; 19: fields = new ArrayList<byte[]>((int) ByteUtil.readLength(data, 20: 4)); 21: } 22: break; 23: case RESULT_STATUS_HEADER: 24: switch (data[4]) { 25: case ErrorPacket.FIELD_COUNT: 26: resultStatus = RESULT_STATUS_INIT; 27: handleErrorPacket(data); 28: break; 29: case EOFPacket.FIELD_COUNT: // Resolution field end 30: resultStatus = RESULT_STATUS_FIELD_EOF; 31: handleFieldEofPacket(data); 32: break; 33: default: // Analyzing fields 34: fields.add(data); 35: } 36: break; 37: case RESULT_STATUS_FIELD_EOF: 38: switch (data[4]) { 39: case ErrorPacket.FIELD_COUNT: 40: resultStatus = RESULT_STATUS_INIT; 41: handleErrorPacket(data); 42: break; 43: case EOFPacket.FIELD_COUNT: // End of parsing each line of records 44: resultStatus = RESULT_STATUS_INIT; 45: handleRowEofPacket(data); 46: break; 47: default: // Each row of records 48: handleRowPacket(data); 49: } 50: break; 51: default: 52: throw new RuntimeException("unknown status!"); 53: } 54: }
6. Others: Update/Delete
Process Basics and MyCAT Source Code Analysis: [Single Library List] Insert Same. We will not analyze other articles.