I. Requirement Description
Customers need to receive specific activity data every Monday, generate Excel or CSV files, and send them to designated recipients by mail. The preliminary analysis of demand draws the following conclusions:
1. The data that the customer wants is not too complicated. Without special processing, it can be simply regarded as the output of SQL query results.
2. Query results output CSV files, and mail sending technology is relatively mature and versatile.
3. Crond Service of Linux System Supports Timing Tasks
II. System Environmental Requirements
- Linux CentOS6.x
- Hadoop2.x
- Python2.7
Title 3. Python Dependency Library
- PyHive
- ppytools2
- thrift
- thrift-sasl
- sasl
####1. Global configuration settings.py
from ppytools.cfgreader import ConfReader import logging ''' //What problems do you not understand? Python Learning Exchange Group: 821460695 to meet your needs, information has been uploaded group files, you can download! ''' '''Log output format ''' logging.basicConfig(level=logging.INFO, encoding='UTF-8', format='%(asctime)s [%(levelname)s] {%(name)-10s} - %(message)s') class ProjectConfig(object): """ProjectConfig """ def __init__(self, *conf_paths): self.cr = ConfReader(*conf_paths) def getHiveConf(self): return self.cr.getValues('HiveServer') def getEmailServer(self): return self.cr.getValues('EmailServer') def getJobInfo(self): return self.cr.getValues('JobInfo') def getCSVFolder(self): return self.cr.getValues('CSVFolder')['folder'] def getCSVFile(self): return self.cr.getValues('CSVFile') def getCSVHead(self): return self.cr.getValues('CSVHead') def getHQLScript(self): return self.cr.getValues('HQLScript') def getEmailInfo(self): return self.cr.getValues('EmailInfo')
#### 2. Core Code main.py
from hiveemailjob.settings import ProjectConfig from ppytools.csvhelper import write from ppytools.emailclient import EmailClient from ppytools.hiveclient import HiveClient from ppytools.lang.timerhelper import timeMeter import datetime import logging import sys import time logger = logging.getLogger(__name__) def build_rk(ts): """ Build HBase row key value :param ts: date time :return: row key """ return hex(int(time.mktime(ts.timetuple())*1000))[2:] def email_att(folder, name): return '{}/{}_{}.csv'.format(folder, name, datetime.datetime.now().strftime('%Y%m%d%H%M%S')) @timeMeter() def run(args): """ Email Job program execute entrance :param args: 1. job file file path 2. start time, format: 2018-01-30 17:09:38 (not require) 3. stop time (not require) :return: Empty """ ''' Read system args start ''' args_len = len(args) if args_len is not 2 and args_len is not 4: logger.error('Enter args is error. Please check!!!') logger.error('1: job file path.') logger.error('2: start time, format: 2018-01-30 17:09:38(option)') logger.error('3: stop time(option)') sys.exit(1) elif args == 4: try: start_time = datetime.datetime.strptime(args[2], '%Y-%m-%d %H:%M:%S') stop_time = datetime.datetime.strptime(args[3], '%Y-%m-%d %H:%M:%S') except Exception, e: raise RuntimeError('Parse start or stop time failed!!!\n', e) else: stop_time = datetime.date.today() start_time = stop_time - datetime.timedelta(days=1) job_file = args[1] start_rk = build_rk(start_time) stop_rk = build_rk(stop_time) '''System settings files (hard code) ''' hive_conf = '/etc/pythoncfg/hive.ini' email_conf = '/etc/pythoncfg/email.ini' sets = ProjectConfig(hive_conf, email_conf, job_file) job_info = sets.getJobInfo() csv_folder = sets.getCSVFolder() logger.info('Now running %s Email Job...', job_info['title']) logger.info('Start time: %s', start_time) logger.info('Stop time: %s', stop_time) hc = HiveClient(**sets.getHiveConf()) csv_file = sets.getCSVFile().items() csv_file.sort() file_list = [] logger.info('File name list: ') for (k, v) in csv_file: logging.info('%s: %s', k, v) file_list.append(v) csv_head = sets.getCSVHead().items() csv_head.sort() head_list = [] logger.info('CSV file head list: ') for (k, v) in csv_head: logging.info('%s: %s', k, v) head_list.append(v) hql_scripts = sets.getHQLScript().items() hql_scripts.sort() email_atts = [] index = 0 for (k, hql) in hql_scripts: logging.info('%s: %s', k, hql) '''Please instance of your logic in here. ''' result, size = hc.execQuery(hql.format(start_rk, stop_rk)) if size is 0: logging.info('The above HQL script not found any data!!!') else: csv_file = email_att(csv_folder, file_list[index]) email_atts.append(csv_file) write(csv_file, head_list[index].split(','), result) index += 1 '''Flush Hive Server connected. ''' hc.closeConn() email_sub = sets.getEmailInfo()['subject'] % start_time email_body = sets.getEmailInfo()['body'] email_to = sets.getEmailInfo()['to'].split(';') email_cc = sets.getEmailInfo()['cc'].split(';') if len(email_atts) == 0: email_body = 'Sorry, no data has been found.\n\n' + email_body ec = EmailClient(**sets.getEmailServer()) ec.send(email_to, email_cc, email_sub, email_body, email_atts, False) ec.quit() logger.info('Finished %s Email Job.', job_info['title'])
#### 3. System configuration files hive.ini and email.ini
# /etc/pythoncfg/hive.ini [HiveServer] host=127.0.0.1 port=10000 user=hive db=default # /etc/pythoncfg/email.ini [EmailServer] server=mail.163.com port=25 user=elkan1788@gmail.com passwd=xxxxxx mode=TSL
Note: The above two files need to be configured under the specified directory / etc/pythoncfg /.
#### 4. Mail Job Configuration Reference to emailjob.ini
[JobInfo] title=Mail Report Task Test [CSVFolder] folder=/opt/csv_files/ # Please notice that CSVFile,CSVHead,HQLScript must be same length. # And suggest that use prefix+number to flag and write. [CSVFile] file1=Provincial grouping statistics file2=Urban grouping statistics [CSVHead] head1=Province,Cumulative head2=Province,City,Cumulative [HQLScript] script1=select cn_state,count(1) m from ext_act_ja1 script2=select cn_state,cn_city,count(1) m from ext_act_ja2 [EmailInfo] to=elkan1788@gmail.com; cc=2292706174@qq.com; # %s it will replace as the start date. subject=%s Regional lottery statistics[test] body=This email is sent automatically by the system, please do not reply, thank you!
Note: The number of CSVFile, CSVHead and HQLScript should be consistent, including the order. It is recommended to use the prefix + number format for naming.
####5.Bin file hive-emailjob.py
from hiveemailjob import main import sys if __name__ == '__main__': main.run(sys.argv)
#### 6. Implementation effect
Type python-u bin/hive_email_job.py in the system terminal and the output is as follows:
2018-02-20 16:28:21,561 [INFO] {__main__ } - Now running Mail Report Task Test Email Job... 2018-02-20 16:28:21,561 [INFO] {__main__ } - Start time: 2018-02-22 2018-02-20 16:28:21,562 [INFO] {__main__ } - Stop time: 2018-02-20 2018-02-20 16:28:21,691 [INFO] {pyhive.hive} - USE `default` 2018-02-20 16:28:21,731 [INFO] {ppytools.hive_client} - Hive server connect is ready. Transport open: True 2018-02-20 16:28:31,957 [INFO] {ppytools.email_client} - Email SMTP server connect ready. 2018-02-20 16:28:31,957 [INFO] {root } - File name list: 2018-02-20 16:28:31,957 [INFO] {root } - file1: Provincial grouping statistics 2018-02-20 16:28:31,957 [INFO] {root } - file2: Urban grouping statistics 2018-02-20 16:28:31,957 [INFO] {root } - CSV file head list: 2018-02-20 16:28:31,957 [INFO] {root } - head1: Province,Cumulative 2018-02-20 16:28:31,957 [INFO] {root } - head2: Province,City,Cumulative 2018-02-20 16:28:31,957 [INFO] {root } - script1: select cn_state,count(1) m from ext_act_ja2 2018-02-20 16:28:31,958 [INFO] {pyhive.hive} - select cn_state,count(1) m from ext_act_ja2 2018-02-20 16:29:04,258 [INFO] {ppytools.hive_client} - Hive client query completed. Records found: 31 2018-02-20 16:29:04,259 [INFO] {ppytools.lang.timer_helper} - Execute <ppytools.hive_client.execQuery> method cost 32.3012499809 seconds. 2018-02-20 16:29:04,261 [INFO] {ppytools.csv_helper} - Write a CSV file successful. --> /opt/csv_files/Provincial grouping statistics_20180223162904.csv 2018-02-20 16:29:04,262 [INFO] {ppytools.lang.timer_helper} - Execute <ppytools.csv_helper.write> method cost 0.00222992897034 seconds. 2018-02-20 16:29:04,262 [INFO] {root } - script2: select cn_state,cn_city,count(1) m from ext_act_ja2 2018-02-20 16:29:04,262 [INFO] {pyhive.hive} - select cn_state,cn_city,count(1) m from ext_act_ja2 2018-02-20 16:29:23,462 [INFO] {ppytools.hive_client} - Hive client query completed. Records found: 367 2018-02-20 16:29:23,463 [INFO] {ppytools.lang.timer_helper} - Execute <ppytools.hive_client.execQuery> method cost 19.2005498409 seconds. 2018-02-20 16:29:23,465 [INFO] {ppytools.csv_helper} - Write a CSV file successful. --> /opt/csv_files/Urban grouping statistics_20180223162923.csv 2018-02-20 16:29:23,465 [INFO] {ppytools.lang.timer_helper} - Execute <ppytools.csv_helper.write> method cost 0.00227284431458 seconds. 2018-02-20 16:29:23,669 [INFO] {ppytools.email_client} - Send email[2018-02-22 Regional lottery statistics[test]] success. To users: elkan1788@163.com. 2018-02-20 16:29:23,669 [INFO] {ppytools.lang.timer_helper} - Execute <ppytools.email_client.send> method cost 0.204078912735 seconds. 2018-02-20 16:29:23,714 [INFO] {__main__ } - Finished Mail Report Task Test Email Job. 2018-02-20 16:29:23,715 [INFO] {ppytools.lang.timer_helper} - Execute <emailjob.main.run> method cost 62.1566159725 seconds.