Article catalog
background
sql,Structured Query Language: as a general and popular query language, Structured Query Language is becoming more and more popular not only in the traditional database, but also in the field of big data. hive, spark, kafka, flink and other big data components support sql query. Using sql can make some people who do not understand the principles of these components operate easily and greatly reduce the threshold of use. Today Let's talk about how to use sql in the flow processing of flink
Example explanation
Construct the StreamTableEnvironment object
To use sql in the flow processing of flink, we need to construct a StreamTableEnvironment object first, and the method is relatively simple.
The catalog, table, function, etc. used in sql need to be registered with the StreamTableEnvironment to be used.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Register table
Next, to register the information of the corresponding table into the StreamTableEnvironment object, there are several ways to choose
The following code is explained based on Flink version 1.10.0. Each version is slightly different.
Using Tuple
//Use the binary of flink, and you need to customize the field name at this time Tuple2<String,Integer> tuple2 = Tuple2.of("jack", 10); //Construct a Tuple DataStream DataStream<Tuple2<String,Integer>> tupleStream = env.fromElements(tuple2); // Register to StreamTableEnvironment and specify the corresponding field name tableEnv.createTemporaryView("usersTuple", tupleStream, "name,age"); //Execute an sql query and return a table object Table table = tableEnv.sqlQuery("select name,age from usersTuple"); // Turn the table object into the DataStream of the flink for subsequent operations, and we will output it here tableEnv.toAppendStream(table, Row.class).print();
For details, please refer to the notes in the code
Using Row
Tuple provided in flink is limited to tuple 25 at most, so if we have more fields, we can choose to use the Row object in flink
//Using Row Row row = new Row(2); row.setField(0, "zhangsan"); row.setField(1, 20); DataStream<Row> rowDataStream = env.fromElements(row); tableEnv.createTemporaryView("usersRow", rowDataStream, "name,age"); Table tableRow = tableEnv.sqlQuery("select name,age from usersRow"); tableEnv.toAppendStream(tableRow, Row.class).print();
Pojo class using java
First, define a pojo class
public static class User{ private String name; private int age; public String getName(){ return name; } public void setName(String name){ this.name = name; } public int getAge(){ return age; } public void setAge(int age){ this.age = age; } }
To define this pojo class, it is required to conform to the serialization rules of flink. For details, please refer to [1]:
- This class is of public type and has no non static inner class
- This class has a public nonparametric constructor
- All non static and non transient fields in the class (and all superclasses) are public (non final); or follow the Java bean rule, the fields are private, but have getter and setter methods of public type
User user = new User(); user.setName("Tom"); user.setAge(20); DataStream<User> userDataStream = env.fromElements(user); tableEnv.createTemporaryView("usersPojo", userDataStream); Table tablePojo = tableEnv.sqlQuery("select name,age from usersPojo"); tableEnv.toAppendStream(tablePojo, Row.class).print();
If you use DataStream of java pojo type, you don't need to declare the field name. flink will automatically resolve the field name and type in the pojo class as the field and type of table.
Use external storage
//Connect external systems, such as files, kafka, etc Schema schema = new Schema() .field("name", DataTypes.STRING()) .field("age", DataTypes.INT()); tableEnv.connect(new FileSystem().path("....")) .withFormat(new Csv()) .withSchema(schema) .createTemporaryTable("usersFile"); Table tableFile = tableEnv.sqlQuery("select name,age from usersFile"); tableEnv.toAppendStream(tableFile, Row.class).print();
When using external storage, you need to specify the following objects:
- tableEnv.connect(ConnectorDescriptor ...)
Specify the connector. Currently, flink supports Elasticsearch, hbase, kafka and filesystem - withFormat(FormatDescriptor format)
This is to specify the format of the data we read from the above data sources, such as json, csv, parquet, etc - .withSchema(Schema schema)
Define a schema for our table, that is, the name and type of the field, which is used for sql query - . createTemporaryTable("usersFile") names the table and registers it with the StreamTableEnvironment
In fact, there are some other registration methods, but they have been marked as expired. We will not explain them here.
reference material:
[1].https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html
Refer to for complete code
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/sql/SqlFirst.java
Welcome to my WeChat official account, big data technology and application, to get updated details.