The first experience of flink streaming sql

Article catalog


sql,Structured Query Language: as a general and popular query language, Structured Query Language is becoming more and more popular not only in the traditional database, but also in the field of big data. hive, spark, kafka, flink and other big data components support sql query. Using sql can make some people who do not understand the principles of these components operate easily and greatly reduce the threshold of use. Today Let's talk about how to use sql in the flow processing of flink

Example explanation

Construct the StreamTableEnvironment object

To use sql in the flow processing of flink, we need to construct a StreamTableEnvironment object first, and the method is relatively simple.

The catalog, table, function, etc. used in sql need to be registered with the StreamTableEnvironment to be used.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Register table

Next, to register the information of the corresponding table into the StreamTableEnvironment object, there are several ways to choose
The following code is explained based on Flink version 1.10.0. Each version is slightly different.

Using Tuple

		//Use the binary of flink, and you need to customize the field name at this time
		Tuple2<String,Integer> tuple2 = Tuple2.of("jack", 10);
		//Construct a Tuple DataStream
		DataStream<Tuple2<String,Integer>> tupleStream = env.fromElements(tuple2);
//		Register to StreamTableEnvironment and specify the corresponding field name
		tableEnv.createTemporaryView("usersTuple", tupleStream, "name,age");
		//Execute an sql query and return a table object
		Table table = tableEnv.sqlQuery("select name,age from usersTuple");
//		Turn the table object into the DataStream of the flink for subsequent operations, and we will output it here
		tableEnv.toAppendStream(table, Row.class).print();

For details, please refer to the notes in the code

Using Row

Tuple provided in flink is limited to tuple 25 at most, so if we have more fields, we can choose to use the Row object in flink

	//Using Row
		Row row = new Row(2);
		row.setField(0, "zhangsan");
		row.setField(1, 20);
		DataStream<Row> rowDataStream = env.fromElements(row);
		tableEnv.createTemporaryView("usersRow", rowDataStream, "name,age");
		Table tableRow = tableEnv.sqlQuery("select name,age from usersRow");
		tableEnv.toAppendStream(tableRow, Row.class).print();

Pojo class using java

First, define a pojo class

	public static class User{
		private String name;
		private int age;

		public String getName(){
			return name;

		public void setName(String name){ = name;

		public int getAge(){
			return age;

		public void setAge(int age){
			this.age = age;

To define this pojo class, it is required to conform to the serialization rules of flink. For details, please refer to [1]:

  1. This class is of public type and has no non static inner class
  2. This class has a public nonparametric constructor
  3. All non static and non transient fields in the class (and all superclasses) are public (non final); or follow the Java bean rule, the fields are private, but have getter and setter methods of public type
User user = new User();
		DataStream<User> userDataStream = env.fromElements(user);
		tableEnv.createTemporaryView("usersPojo", userDataStream);
		Table tablePojo = tableEnv.sqlQuery("select name,age from usersPojo");
		tableEnv.toAppendStream(tablePojo, Row.class).print();

If you use DataStream of java pojo type, you don't need to declare the field name. flink will automatically resolve the field name and type in the pojo class as the field and type of table.

Use external storage

		//Connect external systems, such as files, kafka, etc
		Schema schema = new Schema()
				.field("name", DataTypes.STRING())
				.field("age", DataTypes.INT());
		tableEnv.connect(new FileSystem().path("...."))
		        .withFormat(new Csv())
		Table tableFile = tableEnv.sqlQuery("select name,age from usersFile");
		tableEnv.toAppendStream(tableFile, Row.class).print();

When using external storage, you need to specify the following objects:

  1. tableEnv.connect(ConnectorDescriptor ...)
    Specify the connector. Currently, flink supports Elasticsearch, hbase, kafka and filesystem
  2. withFormat(FormatDescriptor format)
    This is to specify the format of the data we read from the above data sources, such as json, csv, parquet, etc
  3. .withSchema(Schema schema)
    Define a schema for our table, that is, the name and type of the field, which is used for sql query
  4. . createTemporaryTable("usersFile") names the table and registers it with the StreamTableEnvironment

In fact, there are some other registration methods, but they have been marked as expired. We will not explain them here.

reference material:

Refer to for complete code

Welcome to my WeChat official account, big data technology and application, to get updated details.

Keywords: SQL Java Big Data kafka

Added by TodManPlaa on Sun, 21 Jun 2020 07:16:31 +0300