Apache doris Datax DorisWriter extension usage

 

DataX is an open source version of Alibaba cloud DataWorks data integration. It is an offline data synchronization tool / platform widely used in Alibaba group. DataX implements efficient data synchronization between various heterogeneous data sources, including MySQL, Oracle, SqlServer, Postgre, HDFS, Hive, ADS, HBase, TableStore(OTS), MaxCompute(ODPS), holograms, DRDS, etc

Apache Doris is a modern MPP analytical database product. Query results can be obtained with sub second response time, which effectively supports real-time data analysis. Apache Doris's distributed architecture is very simple, easy to operate, and can support large data sets of more than 10PB.

Apache Doris can meet a variety of data analysis needs, such as fixed historical reports, real-time data analysis, interactive data analysis and exploratory data analysis. Make your data analysis more simple and efficient!

In order to better expand the Apache doris ecosystem and provide doris users with more convenient data import, the community development extension supports Datax DorisWriter to make it easier for Datax to enter data

1. Scene

This is a demonstration of how to use Doris's Datax extension DorisWriter to extract data from Mysql data and import it into Doris data warehouse table

2. Compile DorisWriter

This extension can be compiled without doris's docker compilation environment. This article is compiled under WLS under windows

First, pull the source code from github

git clone https://github.com/apache/incubator-doris.git

Enter initiator Doris / extension / dataX / to perform compilation

First:

sh init_env.sh

This script is mainly used to build the DataX development environment. It mainly performs the following operations:

  1. clone the DataX code base locally.

  2. Soft link the {doriswriter / directory to the} DataX/doriswriter} directory.

  3. In dataX / POM Add the < module > doriswriter < / module > module to the XML file.

  4. Set dataX / core / POM The version of httpclient in the XML} file was changed from 4.5 to 4.5 thirteen

    httpclient v4.5. There is a bug in processing 307 forwarding.

After the script is executed, the developer can enter the DataX / directory to start development or compilation. Because of the soft link, any modification to the files in the {DataX/doriswriter} directory will be reflected in the} doriswriter / directory to facilitate developers to submit code

2.1 start compilation

Here, in order to speed up the compilation, I have removed many useless plug-ins: here, it is directly in POM. Under the Datax directory Just comment it out of the XML

hbase11xreader
hbase094xreader
tsdbreader
oceanbasev10reader
odpswriter
hdfswriter
adswriter
ocswriter
oscarwriter
oceanbasev10writer

Then enter the Datax directory under the initiator Doris / extension / Datax / directory and execute the compilation

Here, I compile Datax into tar package, which is different from the official compilation command.

mvn -U clean package assembly:assembly -Dmaven.test.skip=true

 

 

After the compilation is completed, the tar package is in the Datax/target directory. You can copy the tar package to the place you need. Here, I directly execute the test in datax, because the python version here is 3.0 For version x, you need to replace the three files in the bin directory with another version of python 3. You can download this at the following address:

https://github.com/WeiYe-Jing/datax-web/tree/master/doc/datax-web/datax-python3

After replacing the downloaded three files with the files in the bin directory, the whole compilation and installation are completed

If you don't compile successfully, you can also download the compiled package from my baidu online disk. Pay attention to the plug-ins I compiled and removed above

Link: https://pan.baidu.com/s/1hXYkpkrUE2qW4j98k2Wu7A 
Extraction code: 3 azi

3. Data access

At this time, we can start to use the doriswriter extension of Datax to directly extract data from Mysql (or other data sources) and import it into the Doris table.

3.1 Mysql database preparation

The following is my database table creation script (mysql 8):

CREATE TABLE `order_analysis` (
  `date` varchar(19) DEFAULT NULL,
  `user_src` varchar(9) DEFAULT NULL,
  `order_src` varchar(11) DEFAULT NULL,
  `order_location` varchar(2) DEFAULT NULL,
  `new_order` int DEFAULT NULL,
  `payed_order` int DEFAULT NULL,
  `pending_order` int DEFAULT NULL,
  `cancel_order` int DEFAULT NULL,
  `reject_order` int DEFAULT NULL,
  `good_order` int DEFAULT NULL,
  `report_order` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT

Sample data

INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-12 00:00:00', 'Advertising QR code', 'Android APP', 'Shanghai', 15253, 13210, 684, 1247, 1000, 10824, 862);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-14 00:00:00', 'Wechat circle of friends H5 page', 'iOS APP', 'Guangzhou', 17134, 11270, 549, 204, 224, 10234, 773);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-17 00:00:00', 'Push QR code scanning', 'iOS APP', 'Beijing', 16061, 9418, 1220, 1247, 458, 13877, 749);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-17 00:00:00', 'Wechat circle of friends H5 page', 'WeChat official account', 'Wuhan', 12749, 11127, 1773, 6, 5, 9874, 678);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-18 00:00:00', 'Push QR code scanning', 'iOS APP', 'Shanghai', 13086, 15882, 1727, 1764, 1429, 12501, 625);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-18 00:00:00', 'Wechat circle of friends H5 page', 'iOS APP', 'Wuhan', 15129, 15598, 1204, 1295, 1831, 11500, 320);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-19 00:00:00', 'Push QR code scanning', 'Android APP', 'Hangzhou', 20687, 18526, 1398, 550, 213, 12911, 185);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-19 00:00:00', 'App store', 'WeChat official account', 'Wuhan', 12388, 11422, 702, 106, 158, 5820, 474);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-20 00:00:00', 'Wechat circle of friends H5 page', 'WeChat official account', 'Shanghai', 14298, 11682, 1880, 582, 154, 7348, 354);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-21 00:00:00', 'Push QR code scanning', 'Android APP', 'Shenzhen', 22079, 14333, 5565, 1742, 439, 8246, 211);
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-22 00:00:00', 'UC Browser drainage', 'iOS APP', 'Shanghai', 28968, 18151, 7212, 2373, 1232, 10739, 578);

3.2 doris database preparation

The following is the table creation script corresponding to the above data table in doris

CREATE TABLE `order_analysis` (
  `date` datetime DEFAULT NULL,
  `user_src` varchar(30) DEFAULT NULL,
  `order_src` varchar(50) DEFAULT NULL,
  `order_location` varchar(10) DEFAULT NULL,
  `new_order` int DEFAULT NULL,
  `payed_order` int DEFAULT NULL,
  `pending_order` int DEFAULT NULL,
  `cancel_order` int DEFAULT NULL,
  `reject_order` int DEFAULT NULL,
  `good_order` int DEFAULT NULL,
  `report_order` int DEFAULT NULL
) ENGINE=OLAP
DUPLICATE KEY(`date`,user_src)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`user_src`) BUCKETS 1
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "V2"
);

3.3 Datax Job JSON file

Create and edit the datax job task json file and save it to the specified directory

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "zhangfeng",
                        "column": ["date","user_src","order_src","order_location","new_order","payed_order"," pending_order"," cancel_order"," reject_order"," good_order"," report_order" ],
                        "connection": [ { "table": [ "order_analysis" ], "jdbcUrl": [ "jdbc:mysql://localhost:3306/demo" ] } ] }
                },
                "writer": {
                    "name": "doriswriter",
                    "parameter": {
                        "feLoadUrl": ["fe:8030"],
                        "beLoadUrl": ["be1:8040","be1:8040","be1:8040","be1:8040","be1:8040","be1:8040"],
                        "jdbcUrl": "jdbc:mysql://fe:9030/",
                        "database": "test_2",
                        "table": "order_analysis",
                        "column": ["date","user_src","order_src","order_location","new_order","payed_order"," pending_order"," cancel_order"," reject_order"," good_order"," report_order"],
                        "username": "root",
                        "password": "",
                        "postSql": [],
                        "preSql": [],
                        "loadProps": {
                        },
                        "maxBatchRows" : 10000,
                        "maxBatchByteSize" : 104857600,
                        "labelPrefix": "datax_doris_writer_demo_",
                        "lineDelimiter": "\n"
                    }
                }
            }
        ]
    }
}

For the usage of Mysql reader, please refer to:

https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

Usage and parameter description of doriswriter:

https://github.com/apache/incubator-doris/blob/master/extension/DataX/doriswriter/doc/doriswriter.md

4. Execute Datax data import task

python bin/datax.py doris.json

Then you can see the execution result:

 

Go to the Doris database to check your table, the data has been imported, and the task execution is over

Because Datax tasks can only be executed by external triggers, you can use Linux crontab or dolphin scheduling to control task operation

Keywords: Big Data Apache Doris

Added by Salis on Wed, 15 Dec 2021 05:59:35 +0200