1. Introduction
SqlClient is an SQL command line interaction tool provided by Flink. When downloading the blink binary package, there is a SQL client in its bin directory SH, you can enter the interactive page by starting the script. The specific source code implementation of SqlClient can be found under the Flink SQL client sub module of the Flink table module, and its startup function is at org / Apache / Flink / table / client / SqlClient In Java, after creating the interactive environment, the startup function will call the open function of CliClient to enter an endless loop:
public void open() { isRunning = true; // print welcome terminal.writer().append(CliStrings.MESSAGE_WELCOME); // begin reading loop while (isRunning) { // make some space to previous command terminal.writer().append("\n"); terminal.flush(); final String line; try { // 1. Read user input (with ";) Is a terminator) line = lineReader.readLine(prompt, null, (MaskingCallback) null, null); } catch (UserInterruptException e) { // user cancelled line with Ctrl+C continue; } catch (EndOfFileException | IOError e) { // user cancelled application with Ctrl+D or kill break; } catch (Throwable t) { throw new SqlClientException("Could not read from command line.", t); } if (line == null) { continue; } // 2. Call parseCommand to parse user input and obtain the corresponding command final Optional<SqlCommandCall> cmdCall = parseCommand(line); // 3. Call callCommand to execute the command cmdCall.ifPresent(this::callCommand); } }
2. Parse command - parseCommand
2.1 SqlCommandCall
SqlCommandCall is the internal class of SqlCommandParser, which is defined as follows:
public static class SqlCommandCall { public final SqlCommand command; public final String[] operands; }
SqlCommand is an enumeration class, which declares each SQL operation type. In addition, it has another function: regular matching of SQL commands. SqlCommand defines two fields for each operation type:
- String pattern: regular expression used to match Sql;
- Function < string [], optional \ < string > [] > operandconverter: a functional interface that further parses and converts the results obtained using regular matching above, and the converted results will eventually be saved in operands.
As mentioned above, operands is used to store the parsing results of SQL commands.
2.2 analysis entry
public static SqlCommandCall parse(Parser sqlParser, String stmt) { // normalize stmt = stmt.trim(); // remove ';' at the end if (stmt.endsWith(";")) { stmt = stmt.substring(0, stmt.length() - 1).trim(); } // parse statement via regex matching first Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt); if (callOpt.isPresent()) { return callOpt.get(); } else { return parseBySqlParser(sqlParser, stmt); } }
Flink first attempts to call parseByRegexMatching to perform regular matching on SQL, If it fails, call parseBySqlParser for SQL (the base layer uses the calculate framework) parsing. I don't quite understand why there are two sets of schemes for parsing when I first read it. Later, I heard that regular matching is used at first, but because Flink SQL needs to be parsed into logic, the execution plan itself uses calculate as the Parser. In order to unify the design, the method of SQL Parser is added. However, Flink still has some Regular matching is reserved for commands unrelated to data operation, such as QUIT, EXIT, HELP, SET and so on.
Before [Flink-17893], the parsing order of SqlClient was opposite to that of now. Sql Parser will be used to try parsing first, and regular will be used after parsing fails.
2.3 parseByRegexMatching
The specific process of parseByRegexMatching is as follows:
- Traverse each member of the SqlCommand enumeration class. If pattern is specified, try regular matching;
- If the matching is successful, operandConverter is called for parsing and conversion;
- Otherwise, it directly returns optional empty().
2.4 parseBySqlParser
private static SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) { List<Operation> operations; try { operations = sqlParser.parse(stmt); } catch (Throwable e) { throw new SqlExecutionException("Invalidate SQL statement.", e); } if (operations.size() != 1) { throw new SqlExecutionException("Only single statement is supported now."); } final SqlCommand cmd; String[] operands = new String[] {stmt}; Operation operation = operations.get(0); if (operation instanceof CatalogSinkModifyOperation) { boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite(); cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO; } else if (operation instanceof CreateTableOperation) { cmd = SqlCommand.CREATE_TABLE; } // Omit the remaining branches return new SqlCommandCall(cmd, operands); }
First, call the Parser to parse the SQL text. The lower layer of this part uses the calculate framework for lexical and syntax parsing, converts the SQL text into SqlNode, and then converts it into Operation through SqlToOperationConverter.
After that, it is converted into the corresponding SqlCommand according to the specific type of Operation, and the specific operands information is extracted from the Operation and encapsulated into the return of SqlCommandCall.
This part of logic can be briefly summarized as four steps: String - > sqlnode - > opration - > SqlCommand.
3. Execute command - callCommand
The essence of callCommand is to check the command type in SqlCommandCall and enter different branch execution logic.
private void callCommand(SqlCommandCall cmdCall) { switch (cmdCall.command) { case QUIT: callQuit(); break; // ...... case SELECT: callSelect(cmdCall); break; case INSERT_INTO: case INSERT_OVERWRITE: callInsert(cmdCall); break; // ...... default: throw new SqlClientException("Unsupported command: " + cmdCall.command); } }
Triggered by callselect and explored layer by layer, you can get the call chain: cliclient callSelect => LocalExecutor. executeQuery => LocalExecutor. executeQueryInternal. View the source code of executeQueryInternal as follows:
private <C> ResultDescriptor executeQueryInternal( String sessionId, ExecutionContext<C> context, String query) { // create table final Table table = createTable(context, context.getTableEnvironment(), query); // TODO refactor this after Table#execute support all kinds of changes // initialize result final DynamicResult<C> result = resultStore.createResult( context.getEnvironment(), removeTimeAttributes(table.getSchema()), context.getExecutionConfig()); final String jobName = sessionId + ": " + query; final String tableName = String.format("_tmp_table_%s", Math.abs(query.hashCode())); final Pipeline pipeline; try { // writing to a sink requires an optimization step that might reference UDFs during code // compilation context.wrapClassLoader( () -> { ((TableEnvironmentInternal) context.getTableEnvironment()) .registerTableSinkInternal(tableName, result.getTableSink()); table.insertInto(tableName); }); pipeline = context.createPipeline(jobName); } catch (Throwable t) { // the result needs to be closed as long as // it not stored in the result store result.close(); // catch everything such that the query does not crash the executor throw new SqlExecutionException("Invalid SQL query.", t); } finally { // Remove the temporal table object. context.wrapClassLoader( () -> { context.getTableEnvironment().dropTemporaryTable(tableName); }); } // create a copy so that we can change settings without affecting the original config Configuration configuration = new Configuration(context.getFlinkConfig()); // for queries we wait for the job result, so run in attached mode configuration.set(DeploymentOptions.ATTACHED, true); // shut down the cluster if the shell is closed configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true); // create execution final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline, context.getClassLoader()); JobClient jobClient; // wrap in classloader because CodeGenOperatorFactory#getStreamOperatorClass // requires to access UDF in deployer.deploy(). jobClient = context.wrapClassLoader( () -> { try { // blocking deployment return deployer.deploy().get(); } catch (Exception e) { throw new SqlExecutionException("Error while submitting job.", e); } }); String jobId = jobClient.getJobID().toString(); // store the result under the JobID resultStore.storeResult(jobId, result); // start result retrieval result.startRetrieval(jobClient); return new ResultDescriptor( jobId, removeTimeAttributes(table.getSchema()), result.isMaterialized(), context.getEnvironment().getExecution().isTableauMode()); }
- Create a Table to store the result Schema;
- Creating Pipeline is also the core step, which will convert SQL into specific StreamGraph;
- Create JobClient and deploy the job;
- Start a thread ResultRetrievalThread to continuously obtain new results and update the local queue;
- Returns the result identifier.
Take a closer look at the code for creating Pipeline pipeline = context createPipeline(jobName):
public Pipeline getPipeline(String jobName) { return execEnv.createPipeline(translateAndClearBuffer(), tableConfig, jobName); }
translateAndClearBuffer is mainly responsible for converting the list \ < modifyoperation > cached in TableEnvironmentImpl into list \ < Transformation >, which will not be mentioned in this section, while createPipeline assembles these transformations into a StreamGraph:
public Pipeline createPipeline( List<Transformation<?>> transformations, TableConfig tableConfig, String jobName) { StreamGraph streamGraph = ExecutorUtils.generateStreamGraph(getExecutionEnvironment(), transformations); streamGraph.setJobName(getNonEmptyJobName(jobName)); return streamGraph; }
View executorutils Generatestreamgraph is as follows. It creates a StreamGraphGenerator and calls its generate method. From here on, it is completely consistent with the logic of StreamGraph generation mentioned in DataStream, which is connected in series with SQL and Transformation.
public static StreamGraph generateStreamGraph( StreamExecutionEnvironment execEnv, List<Transformation<?>> transformations) { if (transformations.size() <= 0) { throw new IllegalStateException( "No operators defined in streaming topology. Cannot generate StreamGraph."); } StreamGraphGenerator generator = new StreamGraphGenerator( transformations, execEnv.getConfig(), execEnv.getCheckpointConfig()) .setStateBackend(execEnv.getStateBackend()) .setChaining(execEnv.isChainingEnabled()) .setUserArtifacts(execEnv.getCachedFiles()) .setTimeCharacteristic(execEnv.getStreamTimeCharacteristic()) .setDefaultBufferTimeout(execEnv.getBufferTimeout()); return generator.generate(); }