DataX tutorial (04) - complete interpretation of configuration

01 introduction

Through the previous blog posts, we know the concept and principle of DataX:

This article needs to explain the configuration of DataX, that is, compiling and packaging the bin directory configuration file:

And the configuration file of the conf Directory:

There are also configuration files for the job directory:

02 bin directory file parsing

To analyze the files in the bin directory, you need to have a certain python foundation. You have written relevant columns before. For interested children's shoes, please refer to the following: Python column

In the bin directory, you can see several files, namely:

  • datax.py
  • dxprof.py
  • perftrace.py

2.1 datax.py

2.1.1 run dataX in the command window py

To execute py files, you need to install the python environment. Generally, the mac system comes with it without installation. If it is a windows system, you can use Baidu or Google by yourself. For mac system upgrade python3, please refer to my blog Installing Python 3 on Mac

datax.py is mainly used to submit tasks, which is equivalent to the entry of dataX. Let's use the command window to execute it, enter the compiled dataX directory, and then execute:

python datax.py ../job/job.json

It can be seen that DataX is executed successfully, and the results are as follows (consistent with the effect of running DataX under IDEA, please refer to: DataX tutorial (02) - complete process of running dataX in IDEA (filling all pits)):

2.1.2 running dataX. Using PyCharm py

To interpret dataX Py is the best way to debug breakpoints. I use PyCharm here to debug breakpoints.

First import the compiled project, file - > Open, And open dataX py:

Click Run to see the error report:

In fact, there is no configuration file with specified task. Click "Edit Configuration...":

Path of configuration task file:

Right click the break point Debug:

Next, you can happily follow the breakpoint to interpret dataX Py source code:

2.1.3 datax.py interpretation

datax. The main thing py does is to generate a command and use cmd to execute it.

The entry of the program is in "_main _" Method, let's look at the content of the entry code:

if __name__ == "__main__":
    printCopyright()
    parser = getOptionParser()
    options, args = parser.parse_args(sys.argv[1:])
    if options.reader is not None and options.writer is not None:
        generateJobConfigTemplate(options.reader, options.writer)
        sys.exit(RET_STATE['OK'])
    if len(args) != 1:
        parser.print_help()
        sys.exit(RET_STATE['FAIL'])

    startCommand = buildStartCommand(options, args)
    # print startCommand

    child_process = subprocess.Popen(startCommand, shell=True)
    register_signal()
    (stdout, stderr) = child_process.communicate()

    sys.exit(child_process.returncode)

There are not many codes above. You can mainly see the code in line 12:

startCommand = buildStartCommand(options, args)

Before the 12th line of code, we encapsulated the startCommand command. Through breakpoint debugging, the encapsulated command is as follows:

From the above figure, you can see that it is actually a java command. After line 12, the main task is to create a process to execute the startCommand command command:

# Create a process and execute the specified shell script
child_process = subprocess.Popen(startCommand, shell=True)

# Save the execution result in the semaphore
register_signal()

# The parent-child processes communicate and save the communication results to stdout and stderr
(stdout, stderr) = child_process.communicate()

# Exit (according to the status code of the child process)
sys.exit(child_process.returncode)

Well, the main method has been interpreted here. What other methods are there? Cotrol+O look:

After finishing, it is as follows:

classificationLogical nameanalysis
variableDATAX_HOMEThe root path of the current running datax directory after compilation, such as: / Users / user name / Desktop/datax '
DATAX_VERSIONVersion number of DataX, such as DATAX-OPENSOURCE-3.0
DEFAULT_JVMjvm configuration and dump path, such as' - Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/Users / username / Desktop/datax/log '
DEFAULT_PROPERTY_CONFConfiguration of some default properties, such as: - dfile encoding=UTF-8 -Dlogback. statusListenerClass=ch.qos. logback. core. status. NopStatusListener -Djava. security. egd= file:///dev/urandom -Ddatax. Home = / users / username / desktop / dataX - dlogback Configurationfile = / users / username / desktop / dataX / conf / logback xml
ENGINE_COMMANDThe complete startup command format of the engine, such as Java - server ${JVM} - dfile encoding=UTF-8 -Dlogback. statusListenerClass=ch.qos. logback. core. status. NopStatusListener -Djava. security. egd= file:///dev/urandom -Ddatax. Home = / users / username / desktop / dataX - dlogback Configurationfile = / users / username / desktop / dataX / conf / logback XML - classpath / users / username / desktop / dataX / lib / *:$ {params} com. alibaba. datax. core. Engine -mode ${mode} -jobid ${jobid} -job ${job}
ispy2Is it python2 x
LOGBACK_FILElogback.xml path, such as: / users / yanglinwei / desktop / dataX / conf / logback xml
REMOTE_DEBUG_CONFIGRemote debugging configuration, such as: - Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999
RET_STATEReturn code definition, such as: {"KILL": 143,"FAIL": -1,"OK": 0,"RUN": 1,"RETRY": 2}
methodbuildStartCommand(options, args)Build startup commands through various if else. The startup command contains 2 parts of JVM parameters + environment variables
generateJobConfigTemplate(reader, writer)According to the names of writer and reader, pull the corresponding template from github, and finally create the template of json task
getLocalIp()Get local ip address
getOptionParser()Parser to get options
isUrl(path)Determine whether the input parameter is a url
isWindows()Is the current environment windows
printCopyright()Print copyright information
readPluginTemplate(plugin)Read the plug-in template according to the plug-in name
register_signal()Save the execution result in the semaphore
suicide(signum, e)According to the signal value, end this sub process

2.2 dxprof.py

In the datax project, I can't find where this file is called. By reading the source code, I probably know that this class is a tool class used to query the host information, table information and task execution result information of the database.

control+o view dxprof The variables and methods in the PY file are not explained here. The meaning can be seen from the method name:

2.3 perftrace.py

2.3.1 perftrace. Usage of PY

Learn perftrace Py source code, you can know its main function is that users can pass in json strings to generate json for tasks, and then execute them.

In perftrace Py's getUsage() method describes perftrace Usage of Py:

perftrace.py --channel=10 --reader='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "where":"", "splitPk":"", "writer-print":"false"}'

perftrace.py --channel=10 --writer='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'

perftrace.py --file=/tmp/datax.job.json --type=reader --reader='{"writer-print": "false"}'

perftrace.py --file=/tmp/datax.job.json --type=writer --writer='{"reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'

Note that the parameters inside are also required. For reader:

The following params are available for -r --reader:
    [these params is for rdbms reader, used to trace rdbms read performance, it's like datax's key]
    *datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2 etc...
    *jdbcUrl:        datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database
    *username:       username for datasource
    *password:       password for datasource
    *table:          table name for read data
    column:          column to be read, the default value is ['*']
    splitPk:         the splitPk column of rdbms table
    where:           limit the scope of the performance data set
    fetchSize:       how many rows to be fetched at each communicate

    [these params is for stream reader, used to trace rdbms write performance]
    reader-sliceRecordCount:  how man test data to mock(each channel), the default value is 10000
    reader-column          :  stream reader while generate test data(type supports: string|long|date|double|bool|bytes; support constant value and random function),demo: [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]

For writer:

The following params are available for -w --writer:
    [these params is for rdbms writer, used to trace rdbms write performance, it's like datax's key]
    datasourceType:  datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2|ads etc...
    *jdbcUrl:        datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database
    *username:       username for datasource
    *password:       password for datasource
    *table:          table name for write data
    column:          column to be writed, the default value is ['*']
    batchSize:       how many rows to be storeed at each communicate, the default value is 512
    preSql:          prepare sql to be executed before write data, the default value is ''
    postSql:         post sql to be executed end of write data, the default value is ''
    url:             required for ads, pattern is ip:port
    schme:           required for ads, ads database name

    [these params is for stream writer, used to trace rdbms read performance]
    writer-print:           true means print data read from source datasource, the default value is false

For global configuration:

The following params are available global control:
    -c --channel:    the number of concurrent tasks, the default value is 1
    -f --file:       existing completely dataX configuration file path
    -t --type:       test read or write performance for a datasource, couble be reader or writer, in collaboration with -f --file
    -h --help:       print help message

2.3.2 perftrace.py example

For example, you need to print the information of a table in the local database, as follows:

The following commands are directly used here:

perftrace.py --channel=1 --reader='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/bm_wxcp?useSSL=false", "username":"root", "password":"123456", "table": "t_sync_log", "where":"", "splitPk":"id", "writer-print":"true"}'

The printing results are as follows:

yanglineideMBP2:bin yanglinwei$ perftrace.py --channel=1 --reader='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/bm_wxcp?useSSL=false", "username":"root", "password":"123456", "table": "t_sync_log", "where":"", "splitPk":"id", "writer-print":"true"}'

DataX Util Tools (UNKNOWN_DATAX_VERSION), From Alibaba !
Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.
trace environments:
dataxJobPath:  /Users/yanglinwei/Desktop/datax/bin/perftrace-c336d159-8a57-11ec-b78e-f45c89ba8565
dataxHomePath: /Users/yanglinwei/Desktop/datax
dataxCommand:  /Users/yanglinwei/Desktop/datax/bin/datax.py /Users/yanglinwei/Desktop/datax/bin/perftrace-c336d159-8a57-11ec-b78e-f45c89ba8565

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2022-02-10 17:56:49.547 [main] INFO  VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2022-02-10 17:56:49.560 [main] INFO  Engine - the machine info  => 

        osInfo: Oracle Corporation 11 11.0.2+9-LTS
        jvmInfo:        Mac OS X x86_64 10.15.1
        cpu num:        4

        totalPhysicalMemory:    -0.00G
        freePhysicalMemory:     -0.00G
        maxFileDescriptorCount: -1
        currentOpenFileDescriptorCount: -1

        GC Names        [G1 Young Generation, G1 Old Generation]

        MEMORY_NAME                    | allocation_size                | init_size                      
        CodeHeap 'profiled nmethods'   | 117.22MB                       | 2.44MB                         
        G1 Old Gen                     | 1,024.00MB                     | 970.00MB                       
        G1 Survivor Space              | -0.00MB                        | 0.00MB                         
        CodeHeap 'non-profiled nmethods' | 117.22MB                       | 2.44MB                         
        Compressed Class Space         | 1,024.00MB                     | 0.00MB                         
        Metaspace                      | -0.00MB                        | 0.00MB                         
        G1 Eden Space                  | -0.00MB                        | 54.00MB                        
        CodeHeap 'non-nmethods'        | 5.56MB                         | 2.44MB                         


2022-02-10 17:56:49.614 [main] INFO  Engine - 
{
        "content":[
                {
                        "reader":{
                                "name":"mysqlreader",
                                "parameter":{
                                        "column":[
                                                "*"
                                        ],
                                        "connection":[
                                                {
                                                        "jdbcUrl":[
                                                                "jdbc:mysql://127.0.0.1:3306/bm_wxcp?useSSL=false"
                                                        ],
                                                        "table":[
                                                                "t_sync_log"
                                                        ]
                                                }
                                        ],
                                        "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/bm_wxcp?useSSL=false",
                                        "password":"******",
                                        "sliceRecordCount":"10000",
                                        "splitPk":"id",
                                        "table":"t_sync_log",
                                        "username":"root",
                                        "where":""
                                }
                        },
                        "writer":{
                                "name":"streamwriter",
                                "parameter":{
                                        "print":"true"
                                }
                        }
                }
        ],
        "setting":{
                "speed":{
                        "channel":"1"
                }
        }
}

2022-02-10 17:56:49.665 [main] WARN  Engine - prioriy set to 0, because NumberFormatException, the value is: null
2022-02-10 17:56:49.668 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2022-02-10 17:56:49.668 [main] INFO  JobContainer - DataX jobContainer starts job.
2022-02-10 17:56:49.671 [main] INFO  JobContainer - Set jobId = 0
2022-02-10 17:56:50.356 [job-0] INFO  OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:mysql://127.0.0.1:3306/bm_wxcp?useSSL=false&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true.
2022-02-10 17:56:50.358 [job-0] WARN  OriginalConfPretreatmentUtil - The column configuration in your profile is at risk. Because you are not configured to read the columns of the database table, when the number and type of fields in your table change, the correctness of the task may be affected or even an error may occur. Please check your configuration and make changes.
2022-02-10 17:56:50.369 [job-0] INFO  JobContainer - jobContainer starts to do prepare ...
2022-02-10 17:56:50.369 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] do prepare work .
2022-02-10 17:56:50.371 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] do prepare work .
2022-02-10 17:56:50.372 [job-0] INFO  JobContainer - jobContainer starts to do split ...
2022-02-10 17:56:50.372 [job-0] INFO  JobContainer - Job set Channel-Number to 1 channels.
2022-02-10 17:56:50.377 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks.
2022-02-10 17:56:50.378 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] splits to [1] tasks.
2022-02-10 17:56:50.423 [job-0] INFO  JobContainer - jobContainer starts to do schedule ...
2022-02-10 17:56:50.429 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2022-02-10 17:56:50.432 [job-0] INFO  JobContainer - Running by standalone Mode.
2022-02-10 17:56:50.440 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2022-02-10 17:56:50.465 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to 2000000.
2022-02-10 17:56:50.467 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2022-02-10 17:56:50.498 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2022-02-10 17:56:50.505 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Begin to read record by Sql: [select * from t_sync_log 
] jdbcUrl:[jdbc:mysql://127.0.0.1:3306/bm_wxcp?useSSL=false&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2022-02-10 17:56:50.572 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Finished read record by Sql: [select * from t_sync_log 
] jdbcUrl:[jdbc:mysql://127.0.0.1:3306/bm_wxcp?useSSL=false&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
5941    cc01b87e-abd9-4c9d-9da7-2103987db5ae    8888    0       1       SYNC_ORG        2021-10-28 09:57:45     2021-10-28 09:57:46     true    1       0       false  Yang Linwei    Yang Linwei 2021-10-28 01:57:44     2021-10-28 01:57:46
5942    cc01b87e-abd9-4c9d-9da7-2103987db5ae    8888    1026081 1       SYNC_DEPT       2021-10-28 09:57:46     null    false   1       0       false   Yang Linwei Yang Linwei 2021-10-28 01:57:46      2021-10-28 01:57:46
2022-02-10 17:56:50.803 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[309]ms
2022-02-10 17:56:50.803 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.
2022-02-10 17:57:00.482 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2 records, 191 bytes | Speed 19B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.076s | Percentage 100.00%
2022-02-10 17:57:00.482 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2022-02-10 17:57:00.483 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] do post work.
2022-02-10 17:57:00.483 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] do post work.
2022-02-10 17:57:00.484 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
2022-02-10 17:57:00.492 [job-0] INFO  JobContainer - 
         [total cpu info] => 
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
                -1.00%                         | -1.00%                         | -1.00%
                        

         [total gc info] => 
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
                 G1 Young Generation  | 1                  | 1                  | 1                  | 0.009s             | 0.009s             | 0.009s             
                 G1 Old Generation    | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             

2022-02-10 17:57:00.492 [job-0] INFO  JobContainer - PerfTrace not enable!
2022-02-10 17:57:00.493 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2 records, 191 bytes | Speed 19B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.076s | Percentage 100.00%
2022-02-10 17:57:00.495 [job-0] INFO  JobContainer - 
Start task at any time                    : 2022-02-10 17:56:49
 Task end time                    : 2022-02-10 17:57:00
 Total task time                    :                 10s
 Task average flow                    :               19B/s
 Record write speed                    :              0rec/s
 Total number of read records                    :                   2
 Total number of read and write failures                    :                   0

03 config directory file parsing

The config directory mainly contains the following two files:

  • core.json
  • logback.xml

For students doing Javak development, logback XML is already very common. It mainly controls the level of printing and the format of printing log. It will not be described here.

This is mainly about core json.

3.1 core.json

core. The contents in the JSON file are as follows:

{
    "entry": {
        "jvm": "-Xms1G -Xmx1G",
        "environment": {}
    },
    "common": {
        "column": {
            "datetimeFormat": "yyyy-MM-dd HH:mm:ss",
            "timeFormat": "HH:mm:ss",
            "dateFormat": "yyyy-MM-dd",
            "extraFormats":["yyyyMMdd"],
            "timeZone": "GMT+8",
            "encoding": "utf-8"
        }
    },
    "core": {
        "dataXServer": {
            "address": "http://localhost:7001/api",
            "timeout": 10000,
            "reportDataxLog": false,
            "reportPerfLog": false
        },
        "transport": {
            "channel": {
                "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
                "speed": {
                    "byte": 2000000,
                    "record": -1
                },
                "flowControlInterval": 20,
                "capacity": 512,
                "byteCapacity": 67108864
            },
            "exchanger": {
                "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
                "bufferSize": 32
            }
        },
        "container": {
            "job": {
                "reportInterval": 10000
            },
            "taskGroup": {
                "channel": 5
            },
            "trace": {
                "enable": "false"
            }

        },
        "statistics": {
            "collector": {
                "plugin": {
                    "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector",
                    "maxDirtyNumber": 10
                }
            }
        }
    }
}

The main contents of these classes are* Variable:

Due to space and time, the meaning of each configuration will be supplemented later.

04 job directory file parsing

The job directory has only one job JSON file is also an example file. The specific contents are as follows:

{
    "job": {
        "setting": {
            "speed": {
                "byte":10485760
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column" : [
                            {
                                "value": "DataX",
                                "type": "string"
                            },
                            {
                                "value": 19890604,
                                "type": "long"
                            },
                            {
                                "value": "1989-06-04 00:00:00",
                                "type": "date"
                            },
                            {
                                "value": true,
                                "type": "bool"
                            },
                            {
                                "value": "test",
                                "type": "bytes"
                            }
                        ],
                        "sliceRecordCount": 100000
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false,
                        "encoding": "UTF-8"
                    }
                }
            }
        ]
    }
}

In fact, in the source code, each reader or writer plug-in already has a doc document. What is written here is certainly not as good as the official website. You can clone the source code and have a look:

05 end

This article is sorted out after reading the source code. Children's shoes with questions are welcome to leave a message. The end of this article!

Keywords: Big Data etl datax

Added by hatching on Fri, 11 Feb 2022 08:35:25 +0200