Execute the datax job, create the execution file, and execute at 1 point every day in crontab (related below):
The two rows of record, job start and job finish, are added by themselves, so as to identify which table is convenient.
#!/bin/bash source /etc/profile user1="root" pass1="pwd" user2="root" pass2="pwd" job_path="/opt/datax/job/" jobfile=( job_table_a.json job_table_b.json ) for filename in ${jobfile[@]} do echo "job_start: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}" python /opt/datax/bin/datax.py -p "-Duser1=${user1} -Dpass1=${pass1} -Duser2=${user2} -Dpass2=${pass2}" ${job_path}${filename} echo "job_finish: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}" done # 0 1 * * * /opt/datax/job/dc_to_ods_incr.sh >> /opt/datax/job/log/dc_to_ods_incr_$(date +\%Y\%m\%d_\%H\%M\%S).log 2>&1 # egrep 'task, speed, total number, job, start, job, finish' / opt / dataX / job / log/
datax execution log:
job_start: 2018-08-08 01:13:28 job_table_a.json Task start time: August 8, 2018 01:13:28 Task end time: August 8, 2018 01:14:49 Total task time: 81s Task average traffic: 192.82KB/s Record write speed: 1998rec/s Total number of read records: 159916 Total read and write failures: 0 job_finish: 2018-08-08 01:14:49 job_table_a.json job_start: 2018-08-08 01:14:49 job_table_b.json Task start time: August 8, 2018 01:14:50 Task end time: August 8, 2018 01:15:01 Total task time: 11s Task average flow: 0B/s Record write speed: 0rec/s Total number of read records: 0 Total read and write failures: 0 job_finish: 2018-08-08 01:15:01 job_table_b.json
Next, read and save the information to the database, and create a table in the database:
CREATE TABLE `datax_job_result` ( `log_file` varchar(200) DEFAULT NULL, `job_file` varchar(200) DEFAULT NULL, `start_time` datetime DEFAULT NULL, `end_time` datetime DEFAULT NULL, `seconds` int(11) DEFAULT NULL, `traffic` varchar(50) DEFAULT NULL, `write_speed` varchar(50) DEFAULT NULL, `read_record` int(11) DEFAULT NULL, `failed_record` int(11) DEFAULT NULL, `job_start` varchar(200) DEFAULT NULL, `job_finish` varchar(200) DEFAULT NULL, `insert_time` datetime DEFAULT CURRENT_TIMESTAMP ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
The following files are executed regularly because datax job is executed at one point. In order to get the latest production log in a day, the script takes the log file produced in 82800 and the latest log produced in 23 hours. So it can be executed at any time of the day. This file is also scheduled to be executed every day (after judging the completion of datax job)
#!/usr/bin/python # -*- coding: UTF-8 -*- # 0 5 * * * source /etc/profile && /usr/bin/python2.7 /opt/datax/job/save_log_to_db.py > /dev/null 2>&1 import re import os import sqlalchemy import pandas as pd import datetime as dt def save_to_db(df): engine = sqlalchemy.create_engine("mysql+pymysql://root:pwd@localhost:3306/test", encoding="utf-8") df.to_sql("datax_job_result", engine, index=False, if_exists='append') def get_the_latest_file(path): t0 = dt.datetime.utcfromtimestamp(0) d2 = (dt.datetime.now() - t0).total_seconds() d1 = d2 - 82800 for (dirpath, dirnames, filenames) in os.walk(path): for filename in sorted(filenames, reverse = True): if filename.endswith(".log"): f = os.path.join(dirpath,filename) ctime = os.stat(f)[-1] if ctime>=d1 and ctime <=d2: return f def get_job_result_from_logfile(path): result = pd.DataFrame(columns=['log_file','job_file','start_time','end_time','seconds','traffic','write_speed','read_record','failed_record','job_start','job_finish']) log_file = get_the_latest_file(path) index = 0 content = open(log_file, "r") for line in content: result.loc[index, 'log_file'] = log_file if re.compile(r'job_start').match(line): result.loc[index, 'job_file'] = line.split(' ')[4].strip() result.loc[index, 'job_start'] = line, elif re.compile(r'Task start time').match(line): result.loc[index, 'start_time'] = line.split('moment')[1].strip().split(' ')[1].strip() + ' ' + line.split('moment')[1].strip().split(' ')[2].strip() elif re.compile(r'Mission end time').match(line): result.loc[index, 'end_time'] = line.split('moment')[1].strip().split(' ')[1].strip() + ' ' + line.split('moment')[1].strip().split(' ')[2].strip() elif re.compile(r'Total task time').match(line): result.loc[index, 'seconds'] = line.split(':')[1].strip().replace('s','') elif re.compile(r'Task average flow').match(line): result.loc[index, 'traffic'] = line.split(':')[1].strip() elif re.compile(r'Record write speed').match(line): result.loc[index, 'write_speed'] = line.split(':')[1].strip() elif re.compile(r'Total number of read records').match(line): result.loc[index, 'read_record'] = line.split(':')[1].strip() elif re.compile(r'Total read and write failures').match(line): result.loc[index, 'failed_record'] = line.split(':')[1].strip() elif re.compile(r'job_finish').match(line): result.loc[index, 'job_finish'] = line, index = index + 1 else: pass save_to_db(result) get_job_result_from_logfile("/opt/datax/job/log")