DataX is an open source version of Alibaba cloud DataWorks data integration. It is an offline data synchronization tool / platform widely used in Alibaba group. DataX implements efficient data synchronization between various heterogeneous data sources, including MySQL, Oracle, SqlServer, Postgre, HDFS, Hive, ADS, HBase, TableStore(OTS), MaxCompute(ODPS), holograms, DRDS, etc
Apache Doris is a modern MPP analytical database product. Query results can be obtained with sub second response time, which effectively supports real-time data analysis. Apache Doris's distributed architecture is very simple, easy to operate, and can support large data sets of more than 10PB.
Apache Doris can meet a variety of data analysis needs, such as fixed historical reports, real-time data analysis, interactive data analysis and exploratory data analysis. Make your data analysis more simple and efficient!
In order to better expand the Apache doris ecosystem and provide doris users with more convenient data import, the community development extension supports Datax DorisWriter to make it easier for Datax to enter data
1. Scene
This is a demonstration of how to use Doris's Datax extension DorisWriter to extract data from Mysql data and import it into Doris data warehouse table
2. Compile DorisWriter
This extension can be compiled without doris's docker compilation environment. This article is compiled under WLS under windows
First, pull the source code from github
git clone https://github.com/apache/incubator-doris.git
Enter initiator Doris / extension / dataX / to perform compilation
First:
sh init_env.sh
This script is mainly used to build the DataX development environment. It mainly performs the following operations:
-
clone the DataX code base locally.
-
Soft link the {doriswriter / directory to the} DataX/doriswriter} directory.
-
In dataX / POM Add the < module > doriswriter < / module > module to the XML file.
-
Set dataX / core / POM The version of httpclient in the XML} file was changed from 4.5 to 4.5 thirteen
httpclient v4.5. There is a bug in processing 307 forwarding.
After the script is executed, the developer can enter the DataX / directory to start development or compilation. Because of the soft link, any modification to the files in the {DataX/doriswriter} directory will be reflected in the} doriswriter / directory to facilitate developers to submit code
2.1 start compilation
Here, in order to speed up the compilation, I have removed many useless plug-ins: here, it is directly in POM. Under the Datax directory Just comment it out of the XML
hbase11xreader hbase094xreader tsdbreader oceanbasev10reader odpswriter hdfswriter adswriter ocswriter oscarwriter oceanbasev10writer
Then enter the Datax directory under the initiator Doris / extension / Datax / directory and execute the compilation
Here, I compile Datax into tar package, which is different from the official compilation command.
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
After the compilation is completed, the tar package is in the Datax/target directory. You can copy the tar package to the place you need. Here, I directly execute the test in datax, because the python version here is 3.0 For version x, you need to replace the three files in the bin directory with another version of python 3. You can download this at the following address:
https://github.com/WeiYe-Jing/datax-web/tree/master/doc/datax-web/datax-python3
After replacing the downloaded three files with the files in the bin directory, the whole compilation and installation are completed
If you don't compile successfully, you can also download the compiled package from my baidu online disk. Pay attention to the plug-ins I compiled and removed above
Link: https://pan.baidu.com/s/1hXYkpkrUE2qW4j98k2Wu7A Extraction code: 3 azi
3. Data access
At this time, we can start to use the doriswriter extension of Datax to directly extract data from Mysql (or other data sources) and import it into the Doris table.
3.1 Mysql database preparation
The following is my database table creation script (mysql 8):
CREATE TABLE `order_analysis` ( `date` varchar(19) DEFAULT NULL, `user_src` varchar(9) DEFAULT NULL, `order_src` varchar(11) DEFAULT NULL, `order_location` varchar(2) DEFAULT NULL, `new_order` int DEFAULT NULL, `payed_order` int DEFAULT NULL, `pending_order` int DEFAULT NULL, `cancel_order` int DEFAULT NULL, `reject_order` int DEFAULT NULL, `good_order` int DEFAULT NULL, `report_order` int DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT
Sample data
INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-12 00:00:00', 'Advertising QR code', 'Android APP', 'Shanghai', 15253, 13210, 684, 1247, 1000, 10824, 862); INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-14 00:00:00', 'Wechat circle of friends H5 page', 'iOS APP', 'Guangzhou', 17134, 11270, 549, 204, 224, 10234, 773); INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-17 00:00:00', 'Push QR code scanning', 'iOS APP', 'Beijing', 16061, 9418, 1220, 1247, 458, 13877, 749); INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-17 00:00:00', 'Wechat circle of friends H5 page', 'WeChat official account', 'Wuhan', 12749, 11127, 1773, 6, 5, 9874, 678); INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-18 00:00:00', 'Push QR code scanning', 'iOS APP', 'Shanghai', 13086, 15882, 1727, 1764, 1429, 12501, 625); INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-18 00:00:00', 'Wechat circle of friends H5 page', 'iOS APP', 'Wuhan', 15129, 15598, 1204, 1295, 1831, 11500, 320); INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-19 00:00:00', 'Push QR code scanning', 'Android APP', 'Hangzhou', 20687, 18526, 1398, 550, 213, 12911, 185); INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-19 00:00:00', 'App store', 'WeChat official account', 'Wuhan', 12388, 11422, 702, 106, 158, 5820, 474); INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-20 00:00:00', 'Wechat circle of friends H5 page', 'WeChat official account', 'Shanghai', 14298, 11682, 1880, 582, 154, 7348, 354); INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-21 00:00:00', 'Push QR code scanning', 'Android APP', 'Shenzhen', 22079, 14333, 5565, 1742, 439, 8246, 211); INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-22 00:00:00', 'UC Browser drainage', 'iOS APP', 'Shanghai', 28968, 18151, 7212, 2373, 1232, 10739, 578);
3.2 doris database preparation
The following is the table creation script corresponding to the above data table in doris
CREATE TABLE `order_analysis` ( `date` datetime DEFAULT NULL, `user_src` varchar(30) DEFAULT NULL, `order_src` varchar(50) DEFAULT NULL, `order_location` varchar(10) DEFAULT NULL, `new_order` int DEFAULT NULL, `payed_order` int DEFAULT NULL, `pending_order` int DEFAULT NULL, `cancel_order` int DEFAULT NULL, `reject_order` int DEFAULT NULL, `good_order` int DEFAULT NULL, `report_order` int DEFAULT NULL ) ENGINE=OLAP DUPLICATE KEY(`date`,user_src) COMMENT "OLAP" DISTRIBUTED BY HASH(`user_src`) BUCKETS 1 PROPERTIES ( "replication_num" = "3", "in_memory" = "false", "storage_format" = "V2" );
3.3 Datax Job JSON file
Create and edit the datax job task json file and save it to the specified directory
{ "job": { "setting": { "speed": { "channel": 1 }, "errorLimit": { "record": 0, "percentage": 0 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "zhangfeng", "column": ["date","user_src","order_src","order_location","new_order","payed_order"," pending_order"," cancel_order"," reject_order"," good_order"," report_order" ], "connection": [ { "table": [ "order_analysis" ], "jdbcUrl": [ "jdbc:mysql://localhost:3306/demo" ] } ] } }, "writer": { "name": "doriswriter", "parameter": { "feLoadUrl": ["fe:8030"], "beLoadUrl": ["be1:8040","be1:8040","be1:8040","be1:8040","be1:8040","be1:8040"], "jdbcUrl": "jdbc:mysql://fe:9030/", "database": "test_2", "table": "order_analysis", "column": ["date","user_src","order_src","order_location","new_order","payed_order"," pending_order"," cancel_order"," reject_order"," good_order"," report_order"], "username": "root", "password": "", "postSql": [], "preSql": [], "loadProps": { }, "maxBatchRows" : 10000, "maxBatchByteSize" : 104857600, "labelPrefix": "datax_doris_writer_demo_", "lineDelimiter": "\n" } } } ] } }
For the usage of Mysql reader, please refer to:
https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
Usage and parameter description of doriswriter:
https://github.com/apache/incubator-doris/blob/master/extension/DataX/doriswriter/doc/doriswriter.md
4. Execute Datax data import task
python bin/datax.py doris.json
Then you can see the execution result:
Go to the Doris database to check your table, the data has been imported, and the task execution is over
Because Datax tasks can only be executed by external triggers, you can use Linux crontab or dolphin scheduling to control task operation