Flink tutorial (25) - Flink advanced features (FlinkSQL integration Hive)

01 introduction

In the previous blog, we learned Flink's File Sink. Interested students can refer to the following:

This article mainly explains how Flink SQL integrates Hive.

02 FlinkSQL integration Hive

2.1 introduction

reference resources:

Using Hive to build data warehouse has become a common solution. At present, some common big data processing engines are compatible with Hive without exception. Flink supports integrated Hive from 1.9, but version 1.9 is beta and is not recommended for use in production environments. At flink1 In version 10, it marks the completion of the integration of Blink, and the integration of Hive also meets the requirements of production level. It is worth noting that different versions of Flink have different integration for Hive. Next, we will use the latest flink1 Take version 12 as an example to realize Flink integration Hive.

2.2 basic ways to integrate Hive

The integration of Flink and Hive is mainly reflected in the following two aspects:

  • Persistent metadata: Flink uses Hive's MetaStore as a persistent Catalog. We can store Flink metadata in different sessions in Hive Metastore through HiveCatalog. For example, we can use HiveCatalog to store its Kafka data source table in Hive Metastore, so that the metadata information of the table will be persisted to the metadata database corresponding to Hive Metastore, and we can reuse them in subsequent SQL queries.
  • Use Flink to read and write Hive tables: Flink opens up the integration with Hive. Just like using SparkSQL or Impala to operate the data in Hive, we can use Flink to read and write Hive tables directly.

The design of HiveCatalog provides good compatibility with Hive. Users can access their existing Hive tables "out of the box". There is no need to modify the existing history Metastore, or change the data location or partition of the table.

2.3 preparation

1. Add hadoop_classpath

vim /etc/profile

Add the following configuration

export HADOOP_CLASSPATH=`hadoop classpath`

Refresh configuration

source /etc/profile

2. Download the jar and upload it to the flink/lib directory

3. Modify hive configuration

vim /export/server/hive/conf/hive-site.xml

4. Start hive metadata service

nohup /export/server/hive/bin/hive --service metastore &


1. Modify the flinksql configuration

vim /export/server/flink/conf/sql-client-defaults.yaml 

Add the following configuration

   - name: myhive
     type: hive
     hive-conf-dir: /export/server/hive/conf
     default-database: default

2. Start the flink cluster


3. Start the Flink SQL client

/export/server/flink/bin/sql-client.sh embedded

4. Execute sql:

show catalogs;
use catalog myhive;
show tables;
select * from person;

2.5 code demonstration

 * Flink SQL Integrate hive
 * @author : YangLinWei
 * @createTime: 2022/3/9 9:22 morning
public class HiveDemo {

    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "./conf";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        //Register catalog
        tableEnv.registerCatalog("myhive", hive);
        //Use registered catalog

        //Write data to Hive table
        String insertSQL = "insert into person select * from person";
        TableResult result = tableEnv.executeSql(insertSQL);


03 end

This article mainly explains the integration of FlinkSQL and Hive. Thank you for reading. The end of this article!

Keywords: Hadoop hive flink

Added by !Mikey on Wed, 09 Mar 2022 03:29:25 +0200