Python gets datax execution result and saves it to database

Execute the datax job, create the execution file, and execute at 1 point every day in crontab (related below):

The two rows of record, job start and job finish, are added by themselves, so as to identify which table is convenient.

#!/bin/bash
source /etc/profile
user1="root"
pass1="pwd"
user2="root"
pass2="pwd"
job_path="/opt/datax/job/"

jobfile=(
job_table_a.json
job_table_b.json
)

for filename in ${jobfile[@]}
do
	echo "job_start:  "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}"
	python /opt/datax/bin/datax.py -p "-Duser1=${user1} -Dpass1=${pass1} -Duser2=${user2} -Dpass2=${pass2}" ${job_path}${filename}
	echo "job_finish: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}"
done

# 0 1 * * *  /opt/datax/job/dc_to_ods_incr.sh >> /opt/datax/job/log/dc_to_ods_incr_$(date +\%Y\%m\%d_\%H\%M\%S).log 2>&1
# egrep 'task, speed, total number, job, start, job, finish' / opt / dataX / job / log/

datax execution log:

job_start:  2018-08-08 01:13:28 job_table_a.json
 Task start time: August 8, 2018 01:13:28
 Task end time: August 8, 2018 01:14:49
 Total task time: 81s
 Task average traffic: 192.82KB/s
 Record write speed: 1998rec/s
 Total number of read records: 159916
 Total read and write failures: 0
job_finish: 2018-08-08 01:14:49 job_table_a.json
job_start:  2018-08-08 01:14:49 job_table_b.json
 Task start time: August 8, 2018 01:14:50
 Task end time: August 8, 2018 01:15:01
 Total task time: 11s
 Task average flow: 0B/s
 Record write speed: 0rec/s
 Total number of read records: 0
 Total read and write failures: 0
job_finish: 2018-08-08 01:15:01 job_table_b.json

Next, read and save the information to the database, and create a table in the database:

CREATE TABLE `datax_job_result` (
  `log_file` varchar(200) DEFAULT NULL,
  `job_file` varchar(200) DEFAULT NULL,
  `start_time` datetime DEFAULT NULL,
  `end_time` datetime DEFAULT NULL,
  `seconds` int(11) DEFAULT NULL,
  `traffic` varchar(50) DEFAULT NULL,
  `write_speed` varchar(50) DEFAULT NULL,
  `read_record` int(11) DEFAULT NULL,
  `failed_record` int(11) DEFAULT NULL,
  `job_start` varchar(200) DEFAULT NULL,
  `job_finish` varchar(200) DEFAULT NULL,
  `insert_time` datetime DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

The following files are executed regularly because datax job is executed at one point. In order to get the latest production log in a day, the script takes the log file produced in 82800 and the latest log produced in 23 hours. So it can be executed at any time of the day. This file is also scheduled to be executed every day (after judging the completion of datax job)

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 0 5 * * * source /etc/profile && /usr/bin/python2.7 /opt/datax/job/save_log_to_db.py > /dev/null 2>&1

import re
import os
import sqlalchemy
import pandas as pd
import datetime as dt

def save_to_db(df):
	engine = sqlalchemy.create_engine("mysql+pymysql://root:pwd@localhost:3306/test", encoding="utf-8") 
	df.to_sql("datax_job_result", engine, index=False, if_exists='append') 

def get_the_latest_file(path):
	t0 = dt.datetime.utcfromtimestamp(0)
	d2 = (dt.datetime.now() - t0).total_seconds()
	d1 = d2 - 82800
	for (dirpath, dirnames, filenames) in os.walk(path):
		for filename in sorted(filenames, reverse = True):
			if filename.endswith(".log"):
				f = os.path.join(dirpath,filename)
				ctime = os.stat(f)[-1]
				if ctime>=d1 and ctime <=d2:
					return f
			
def get_job_result_from_logfile(path):
	result = pd.DataFrame(columns=['log_file','job_file','start_time','end_time','seconds','traffic','write_speed','read_record','failed_record','job_start','job_finish'])
	log_file = get_the_latest_file(path)
	index = 0
	content = open(log_file, "r")
	for line in content:
		result.loc[index, 'log_file'] = log_file
		if re.compile(r'job_start').match(line):
			result.loc[index, 'job_file'] = line.split(' ')[4].strip()
			result.loc[index, 'job_start'] = line,
		elif re.compile(r'Task start time').match(line):
			result.loc[index, 'start_time'] = line.split('moment')[1].strip().split(' ')[1].strip() + ' ' + line.split('moment')[1].strip().split(' ')[2].strip()
		elif re.compile(r'Mission end time').match(line):
			result.loc[index, 'end_time'] = line.split('moment')[1].strip().split(' ')[1].strip() + ' ' + line.split('moment')[1].strip().split(' ')[2].strip()
		elif re.compile(r'Total task time').match(line):
			result.loc[index, 'seconds'] = line.split(':')[1].strip().replace('s','')
		elif re.compile(r'Task average flow').match(line):
			result.loc[index, 'traffic'] = line.split(':')[1].strip()
		elif re.compile(r'Record write speed').match(line):
			result.loc[index, 'write_speed'] = line.split(':')[1].strip()
		elif re.compile(r'Total number of read records').match(line):
			result.loc[index, 'read_record'] = line.split(':')[1].strip()
		elif re.compile(r'Total read and write failures').match(line):
			result.loc[index, 'failed_record'] = line.split(':')[1].strip()
		elif re.compile(r'job_finish').match(line):
			result.loc[index, 'job_finish'] = line,
			index = index + 1
		else:
			pass
	save_to_db(result)

get_job_result_from_logfile("/opt/datax/job/log")

 

Keywords: JSON Python Database crontab

Added by nbaxley on Sun, 05 Jan 2020 08:19:19 +0200