Advanced operation of tables: tilt table & transaction table

Advanced operation of tables: tilt table & transaction table

Hive Skewed Tables

What is a tilt table?

For tables with skewed values in one or more columns, you can create Skewed Tables to improve performance. For example, if 50% of the data contained in the key field in the table is the string "1", this is an obvious skew phenomenon. Therefore, when processing the key field, the skewed data will consume more time.

At this time, sketched tables can be created to label the tilted data in metadata, so as to optimize query and join performance.

For example, create the skew table skewed_single, including two fields key and value; Among the data contained in the key field, 1, 5 and 6 are tilted; Create SQL as follows:

CREATE TABLE skewed_single (key STRING, value STRING)
SKEWED BY (key) ON ('1','5','6')
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

--Insert data into a table
insert OVERWRITE table skewed_single values('1','1'),('2','2'),('3','3'),('4','4'),('5','5'),('6','6'),('1','1'),('5','5'),('6','6'),('1','1'),('5','5'),('6','6'),('1','1'),('5','5'),('6','6'),('1','1'),('5','5'),('6','6');

In addition to a single field, you can also label the tilt values of multiple fields:

CREATE TABLE skewed_multiple (key STRING, value STRING)
SKEWED BY (key, value) ON (('1','One'), ('3','Three'), ('5','Five'))
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

--insert data
insert OVERWRITE table skewed_multiple values('1','One'),('2','One'),('3','One'),('4','One'),('5','One'),('6','One'),('1','Two'),('5','Two'),('6','Two'),('1','Three'),('2','Three'),('3','Three'),('4','Three'),('5','Three'),('6','Three'),('1','Four'),('5','Four'),('6','Four'),('5','Five'),('6','Four');

Optimizing Skewed Joins

After the skew table is created, it will produce the effect of optimizing skew joins. Suppose that the id field of table a has values of 1, 2, 3 and 4, and table B also contains id columns with values of 1, 2 and 3. We use the following statement to connect.

--Only explanations are given here, not created A,B Two tables
select A.id from A join B on A.id = B.id

At this time, A group of mappers will read these two tables and send them to reducers based on the connection key ID. it is assumed that the rows with id=1 are distributed to Reducer R1, and the rows with id=2 are distributed to R2. These reducers cross connect A and B, and R4 obtains all rows with id=4 from A, but will not produce any results.

Now let's assume that A tilts on id=1, so R2 and R3 will be completed quickly, but R1 will execute for A long time, so it becomes the bottleneck of the job. If the tilt information is known in advance, this bottleneck can be avoided manually by using the following methods:

--First Join´╝îExclude tilt data A.id=1
select A.id from A join B on A.id = B.id where A.id <> 1;
--Tilt data is individually Join
select A.id from A join B on A.id = B.id where A.id = 1 and B.id = 1;

However, this method requires human intervention to know the tilt value in advance and perform the Join operation twice. However, if table A is sketched tables and A.id=1 is set as the skew value, the following optimization will be performed automatically when performing the Join operation between table A and table B:

  1. Load the data with id=1 in table B into the memory hash table, distribute it to all Mapper tasks in table A, and directly Join with the data with A.id=1.
  2. For other non skewed data, perform the normal Reduce operation and Join.

This will improve the efficiency of Join execution in skewed data.

List Bucketing

List bucket is a special kind of sketched tables. It will save the tilted data to an independent directory, which can more effectively improve the efficiency of query and Join. The process of creating list buckets is relatively simple. You only need to specify stored as directors in the sketched tables creation syntax.

After testing, there are still many unsolved bugs in list bucket in Hive Version (1.2.1), which cannot be split into independent directories for storage, but it can run normally in the newer version (2.3.7). If your current version has some support problems, you don't need to execute the following code, just understand it.

For example, create a list bucket table: list_bucket_single, which contains two fields key and value; Among the data contained in the key field, 1, 5 and 6 are tilted; Create SQL as follows.

--The table is partitioned here. Of course, in 2.3.7 It's OK not to partition in the version. There will be problems in some earlier versions
CREATE TABLE list_bucket_single (key STRING, value STRING)
partitioned by (dt string)
  SKEWED BY (key) ON (1,5,6) STORED AS DIRECTORIES
STORED AS ORCFile;

--Insert data into a table
insert OVERWRITE table list_bucket_single
partition (dt='2020-11-19')
select * from skewed_single;

At this time, it will create four directories (three directories correspond to three oblique values of the key, and the other directory is the remaining values). The data of this table is divided into four parts: 1, 5, 6 and others

hadoop fs -ls /user/hive/warehouse/list_bucket_single/dt=2020-11-19

In this way, the query with higher inclined key value will improve the efficiency.

Of course, the list bagging table also supports labeling the tilt values of multiple fields:

CREATE TABLE list_bucket_multiple (key STRING, value STRING)
partitioned by (dt string)
SKEWED BY (key, value) ON (('1','One'), ('3','Three'), ('5','Five')) STORED AS DIRECTORIES
STORED AS RCFile;

--Insert data into a table
insert OVERWRITE table list_bucket_multiple
partition (dt='2020-11-19')
select * from skewed_multiple;

At this time, it will create four directories (Three directories correspond to Three combined tilt values ('1 ',' One '), ('3', 'Three'), ('5 ',' Five '), and the other directory is the rest). However, there are still some problems in Hive's implementation of multi field tilt values, whether it is 1.2 1 or 2.3 Version 7 only has HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME a single directory.

DDL operation of tilt table

For skew tables created in, you can use the alert table statement to modify the skew information, or convert ordinary tables to skew tables:

ALTER TABLE <T> (SCHEMA) SKEWED BY  (keys) ON ('c1', 'c2') [STORED AS DIRECTORIES];

Of course, you can also modify sketched tables to normal tables:

ALTER TABLE <T> (SCHEMA) NOT SKEWED;

Or convert the list bagging table to ordinary sketched tables:

ALTER TABLE <T> (SCHEMA) NOT STORED AS DIRECTORIES;

It supports modifying the storage directory of tilt values:

--Syntax format
ALTER TABLE <T> (SCHEMA) SET SKEWED LOCATION (key1="loc1", key2="loc2");
--take list_bucket_multiple Combined tilt value of table('1','One'),('3','Three')Change the storage directory
ALTER TABLE list_bucket_multiple SET SKEWED LOCATION (('1','One')="hdfs://node01:9000/file/one_data" ,('3','Three')="hdfs://node01:9000/file/one_data");

Of course, these modifications will not affect the previous data, but only the modified table data.

Hive transaction table (ACID)

Hive supports row level ACID semantics after 0.13, that is, atomicity Consistency, Isolation and Durability. Row level ACID means that when other programs are reading data from a partition, they can insert new data into the partition. Supported operations include INSERT/UPDATE/DELETE/MERGE statements, incremental data extraction, etc.

After version 3.0, this feature has been optimized, including improving the underlying file organization, reducing the restrictions on table structure, and supporting conditional push and vectorization query.

However, the transaction function only supports the ORC table, and the transaction function depends on the bucket splitting storage format, so the transaction table must be bucket splitting.

Hive enable transaction configuration

By default, transactions are not open. Before starting a transaction, you need to configure the cluster (after installing Hive with a script, the configuration has been completed automatically).

For Hive server (MetaStore), you need to add the following configuration in hive-site.xml:

  <property>
    <name>hive.compactor.initiator.on</name>
    <value>true</value>
    <description>
      Necessary configuration for starting a transaction: Yes or no Metastore Start initialization and cleanup threads on
    </description>
  </property>

  <property>
    <name>hive.compactor.worker.threads</name>
    <value>3</value>
    <description>
      Necessary configuration for starting a transaction: started when compressing worker Number of threads
    </description>
  </property>

After the server is configured, the transaction table can be created. However, Hive does not allow non ACID sessions to operate on the transaction table. Then you need to enable the ACID function of the client. Here, you need to configure the transaction of hiveserver2. Configuration items can be added directly to Hive site XML, you can also use the set command for temporary settings when executing SQL. For convenience, it is directly added to Hive site XML.

  <property>
    <name>hive.support.concurrency</name>
    <value>true</value>
    <description>
      Necessary configuration for starting transaction: HiveServer2 Client Configure whether concurrency is allowed
    </description>
  </property>

  <property>
    <name>hive.txn.manager</name>
    <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
    <description>
      Necessary configuration for starting transaction: HiveServer2 Client Configuration, providing transaction behavior
    </description>
  </property>

  <property>
    <name>hive.enforce.bucketing</name>
    <value>true</value>
    <description>
      Necessary configuration for starting transaction: HiveServer2 Client Configure and turn on automatic barrel separation
      In 2.x No configuration is required later. This parameter is added for compatibility with previous versions
    </description>
  </property>

After configuration, restart Hive on the Hive installation node (Node03):

# Stop the hiveserver2 and metastore services
jps | grep RunJar | awk '{print $1}' | xargs kill -9
# Restart the hiveserver2 and metastore services
hive --service hiveserver2 &
hive --service metastore &

Start beeline client:

beeline -u jdbc:hive2://node03:10000 -n root

The official suggestion is to enable the non strict mode, which will prohibit some dangerous SQL execution.

set hive.exec.dynamic.partition.mode=nonstrict;

Creation of transaction table

First, create the transaction table. First, it needs to be the ORC table, then divide the bucket, and add the attribute 'transactional' = 'true' to the table.

CREATE TABLE employee (id int, name string, salary int)
CLUSTERED BY (id) INTO 2 BUCKETS
STORED AS ORC 
TBLPROPERTIES ('transactional' = 'true');

After the transaction table is created, insert some data:

INSERT INTO employee VALUES
(1, 'Jerry', 5000),
(2, 'Tom',   8000),
(3, 'Kate',  6000);

After inserting data, you can view the storage of transaction tables on HDFS:

hadoop fs -ls -R /user/hive/warehouse/employee

The transaction table on HDFS actually contains two types of files: base file and delta file.

The delta file is used to store new, updated and deleted data. For the results of each transaction data, a delta directory will be created separately to store data. The data stored in the directory is divided according to buckets. The base file is used to store ordinary data. Hive performs regular tasks to merge delta files into base files.

Now you can see that only delta files are included in HDFS because the delta files have not been merged into the base.

Suppose there is a table named t with only 2 buckets, its file structure should be in the following form.

hive> dfs -ls -R /user/hive/warehouse/t;
drwxr-xr-x   - ekoifman staff          0 2016-06-09 17:03 /user/hive/warehouse/t/base_0000022
-rw-r--r--   1 ekoifman staff        602 2016-06-09 17:03 /user/hive/warehouse/t/base_0000022/bucket_00000
-rw-r--r--   1 ekoifman staff        602 2016-06-09 17:03 /user/hive/warehouse/t/base_0000022/bucket_00001
drwxr-xr-x   - ekoifman staff          0 2016-06-09 17:06 /user/hive/warehouse/t/delta_0000023_0000023_0000
-rw-r--r--   1 ekoifman staff        611 2016-06-09 17:06 /user/hive/warehouse/t/delta_0000023_0000023_0000/bucket_00000
-rw-r--r--   1 ekoifman staff        611 2016-06-09 17:06 /user/hive/warehouse/t/delta_0000023_0000023_0000/bucket_00001
drwxr-xr-x   - ekoifman staff          0 2016-06-09 17:07 /user/hive/warehouse/t/delta_0000024_0000024_0000
-rw-r--r--   1 ekoifman staff        610 2016-06-09 17:07 /user/hive/warehouse/t/delta_0000024_0000024_0000/bucket_00000
-rw-r--r--   1 ekoifman staff        610 2016-06-09 17:07 /user/hive/warehouse/t/delta_0000024_0000024_0000/bucket_00001

When reading data, the user will read the base file and delta file into memory, judge which data has been modified and updated, and then merge them into the latest data.

After the transaction table is created, you can update data at the row level.

UPDATE employee SET salary = 7000 WHERE id = 2;

You can also delete data at the row level:

delete from employee where id=1;

However, each time you append, update, or delete, a new delta directory will be generated to store the delta files. So the performance is not excellent.

For the transaction table, you can view all ongoing transaction operations:

SHOW TRANSACTIONS;

Compression of transaction tables

With the accumulation of operations on the transaction table, there will be more and more delta files. The reading of the transaction table will traverse and merge all files. Too many files will affect the efficiency. Moreover, the number of small files will also affect HDFS. Therefore, Hive supports file compression, which is divided into Minor and Major.

Minor comparison will merge all delta files into one delta directory and store them in buckets. It will be executed regularly in MetaStore. Of course, it can also be triggered manually:

ALTER TABLE employee COMPACT 'minor';

Minor comparison is just a simple file merging, does not delete any data, and can reduce the number of files.

Major comparison combines all files into a base file to base_N naming. base_ Only the latest data will be retained in n. Major comparison is also executed regularly, and manual triggering is not supported. Major comparison consumes more cluster resources.

For compression tasks during execution, you can use the command to view:

SHOW COMPACTIONS;

Added by Timma on Thu, 23 Dec 2021 17:07:41 +0200