linux环境下⽤python读取hive数据并写⼊到excel(定时调度、⾃定义列宽)
功能点:
1、调⽤⾃定义类,实现对excel格式的调整
内嵌定时功能,实现定时执⾏
可对标题等进⾏颜⾊标注
可实现⾃定⼀列宽
可实现数据居左、居右、据中对齐⽅式
并将⽣成的excel通过邮件发送
# -*- coding:utf-8 -*-
"""
定时任务
多线程,每⽇执⾏
"""
from pyhive import hive
import pandas as pd
import smtplib
from import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
from datetime import datetime
lativedelta import relativedelta
from functools import wraps
import time
import os
from multiprocessing import Process
import openpyxl
from openpyxl.styles import Alignment
from openpyxl.styles import PatternFill
from openpyxl.styles import Font
from openpyxl import Workbook, load_workbook
from os import remove, path
def get_delta_time(mode, times):
"""
:
param mode: 三种模式:daily, monthly, weekly
:param times:
当mode为daily时,times: [hour,minute,second];
当mode为monthly时,times: [monthday,hour,minute,second],
当mode为weekly时,times: [weekday,hour,minute,second];
其中weekday为周⼀~周天: 1,2,3,4,5,6,0。
:return:最近的执⾏时间,倒计时
"""
current_w()
if mode=="daily":
# 如果⽬标时间晚于当前时间
time_str ="{} {}:{}:{}".format(current_time.date(),*times)
target_time =datetime.strptime(time_str,'%Y-%m-%d %H:%M:%S')
if target_time <current_time:
# 如果⽬标时间早于当前时间,获取下⼀天的时间
next_time =(current_time+ relativedelta(days=1))
time_str ="{} {}:{}:{}".format(next_time.date(),*times)
target_time =datetime.strptime(time_str,'%Y-%m-%d %H:%M:%S')
elif mode=="monthly":
time_str ='{}-{}-{} {}:{}:{}'.format(ar,h,*times)
target_time =datetime.strptime(time_str,'%Y-%m-%d %H:%M:%S')
if target_time <current_time:
next_time =(current_time+ relativedelta(months=1))
time_str ='{}-{}-{} {}:{}:{}'.format(ar, h,*times)
target_time =datetime.strptime(time_str,'%Y-%m-%d %H:%M:%S')
target_time =datetime.strptime(time_str,'%Y-%m-%d %H:%M:%S')
elif mode=="weekly":
time_str ='{}-{}-{} {}:{}:{}'.format(ar,current_time.strftime('%W'),*times)
target_time =datetime.strptime(time_str,'%Y-%W-%w %H:%M:%S')
if target_time <current_time:
next_time =(current_time+ relativedelta(weeks=1))
time_str ='{}-{}-{} {}:{}:{}'.format(ar, next_time.strftime('%W'),*times)
target_time =datetime.strptime(time_str,'%Y-%W-%w %H:%M:%S')
time_delta =(target_time -current_time).total_seconds()# 时间差
return target_time, time_delta
def days_schedule(mode, times):
def decorator(func):
@wraps(func)
def wrapper(*args,**kwargs):
while True:
target_time, time_delta = get_delta_time(mode, times)
print("{}下⼀次执⾏时间: {}".format(func.__name__, target_time))
time.sleep(time_delta)
func(*args,**kwargs)
return1
return wrapper
return decorator
class XlsxSaver:
"""
⼀个将DataFrame转换成格式化excel的⼯具
"""
def __init__(self, df_in, filename='a.xlsx', sheet_name='Sheet1'):
"""
df_in : 从⼀个DataFrame对象获取表格内容
filename : ⽂件名
sheet_name : 表名
"""
self.filename = filename # 保存的xlsx⽂件的名字
self.user_def =[]# 储存由⽤户⾃定义的列的列名,这些列不再参与⾃动计算列宽ists(filename):
# 如果⽂件存在,就直接打开,添加Sheet
self.wb = load_workbook(filename)
self.sheet = ate_sheet(sheet_name)
else:
# 如果⽂件不存在,就创建表格
self.wb = Workbook()
self.sheet = self.wb.active
self.sheet.title = sheet_name
# 将df的内容复制给sheet
self.df = py()
self.sheet.append(list(lumns))
for row in range(0,len(list(self.df.index))):
for col in range(0,len(list(lumns))):
ll(row+2, col+1).value= self.df.iloc[row, col]# 注意:sheet⾏列从1开始计数
def remove_file(self):
remove(self.filename)
def set_sheet_name(self, sheet_name):
self.sheet.title = sheet_name
def set_filename(self, filename):
self.filename = filename
def get_maxlength(self, series_in, col):
"""
获取⼀个类型为object的Series中的最⼤占位长度,⽤于确定导出的xlsx⽂件的列宽
col : 表头,也参与⽐较,解决有时候表头过长的问题
col : 表头,也参与⽐较,解决有时候表头过长的问题
"""
series = series_in.fillna('-')# 填充空值,防⽌出现nan
str_list = list(series)
len_list =[]
for elem in str_list +[col]:
elem_split = list(elem)
length =0
for c in elem_split:
if ord(c)<=256:
length +=1
else:
length +=2
len_list.append(length)
return max(len_list)
def __auto_width(self):
cols_list = list(lumns)# 获取列名
for i in range(0,len(cols_list)):
col = cols_list[i]
if col in self.user_def:
continue
ll(1, i+1).font = Font(bold=True)# 加粗表头
letter = chr(i+65)# 由ASCII值获得对应的列字母
max_len = _maxlength(self.df[col].astype(str), col)
if max_len <=12:
lumn_dimensions[letter].width =12
elif max_len <=50:
lumn_dimensions[letter].width = max_len +2
else:
lumn_dimensions[letter].width =50
for cell in self.sheet[letter]:
cell.alignment = Alignment(horizontal='left',wrap_text=True)
def set_width(self, col_name, width):
# 提供调整列宽的接⼝
index= list(lumns).index(col_name)
letter = chr(index+65)
lumn_dimensions[letter].width = width
self.user_def.append(col_name)
def set_color(self, col_name, color,rule):
# 提供设置颜⾊的接⼝,rule:规则函数
index= list(lumns).index(col_name)
letter = chr(index+65)
for cell in self.sheet[letter]:
if rule(cell.value):
cell.fill = PatternFill(fill_type="solid", start_color=color, end_color=color)
def set_center_alignment(self, col_name):
index= list(lumns).index(col_name)
letter = chr(index+65)
for cell in self.sheet[letter]:
cell.alignment = Alignment(wrap_text=True, horizontal='left')
def save(self):
# ⾃动调整列宽,并保存
self.__auto_width()
self.wb.save(self.filename)
def get_data_from_hive_1(sql):
# 连接hive获取数据
conn = hive.Connection(host='oser406436.wal-mart',
port=10000,
username='svccnahahs',
database='cn_ec_wmt_dl_secure',
auth='KERBEROS',
auth='KERBEROS',
kerberos_service_name='hive'
)
result = pd.read_sql(sql, conn)
return result
def get_data_from_hive_2(sql):
# 连接hive获取数据
conn = hive.Connection(host='oser406436.wal-mart',
port=10000,
username='svccnahahs',
database='cn_ods_aloha_order',
auth='KERBEROS',
kerberos_service_name='hive'
)
result = pd.read_sql(sql, conn)
return result
def get_sql(yesterday_str):
with open('./sql_file/check_flash_data.sql','r')as f:
sql01 = f.read()
sql01 = sql01 %(yesterday_str)
with open('./sql_file/check_ec_data.sql','r')as f:
sql02 = f.read()
with open('./sql_file/check_ods_data.sql','r')as f:
sql03 = f.read()
with open('./sql_file/check_divide_data.sql','r')as f:
sql04 = f.read()
return sql01,sql02,sql03,sql04
def send_mail(smtp, from_addr, to_addrs, title, content, attachments):
message = MIMEMultipart()
# 邮件内容
message.attach(MIMEText(content,'plain','utf-8'))
# 邮件标题
message['Subject']= Header(title,'utf-8')
message['To']=';'.join(to_addrs)
# 添加附件
if len(attachments)>0:
for att_file in attachments:
att = MIMEText(open(att_file,'rb').read(),'base64','utf-8')
att["Content-Type"]='application/octet-stream'
att["Content-Disposition"]="attachment; filename="+att_file
message.attach(att)
# 发送邮件
try:
smtpObj = smtplib.SMTP(host=smtp["host"], port=smtp["port"])
smtpObj.sendmail(
from_addr=from_addr,
to_addrs=to_addrs,
msg=message.as_string()
)
print("邮件发送成功")
except smtplib.SMTPException:
print("Error: ⽆法发送邮件")
def get_data(nowTime,yesterday_str):
file_name ='./{}_check_data.xlsx'.format(nowTime)
rename_dic = {'t_type': '数据源','ts': '分区时间ts','count': '数据量',
'upd_ts': '数据加载时间','quy_time': '查询时间','tb_name':'数据来源',
'ts_t_2':'ts_t_2','etl_load_time_t_2':'etl_load_time_t_2','cnt_t_2':'cnt_t_2',
'ts_t_1':'ts_t_1','etl_t_1':'etl_t_1','cnt_t_1':'cnt_t_1','per':'per','load_ts':'数据更新时间','upd_ts':'数据更新时间','ts':'分区时间ts' }
}
if ists(file_name): # 如果⽂件存在
# 删除⽂件,可使⽤以下两种⽅法。
sql01,sql02,sql03,sql04 = get_sql(yesterday_str)
print("开始获取第⼀模块数据....")
result01 = get_data_from_hive_1(sql01)
xlsx1 = XlsxSaver(result01, file_name ,'flashreport汇总表数据')
xlsx1.set_center_alignment('cnt_t_2')
xlsx1.set_center_alignment('per')
xlsx1.set_center_alignment('数据更新时间')
xlsx1.set_center_alignment('query_time')
xlsx1.save()
print("第⼀模块完成")
print("开始获取第⼆模块数据......")
result02 = get_data_from_hive_1(sql02)
xlsx2 = XlsxSaver(result02, file_name ,'ec模型数据')
xlsx2.set_center_alignment('数据更新时间')
xlsx2.set_center_alignment('数据量')
xlsx2.set_center_alignment('查询时间')
xlsx2.save()
print("开始获取第三模块数据......")
result03 = get_data_from_hive_1(sql03)
xlsx3 = XlsxSaver(result03, file_name ,'ods层源表数据')
xlsx3.set_center_alignment('数据量')
xlsx3.set_center_alignment('数据更新时间')
xlsx3.set_center_alignment('查询时间')
xlsx3.save()
print("开始获取第四模块数据......")
result04 = get_data_from_hive_2(sql04)
xlsx4 = XlsxSaver(result04, file_name ,'flash四张表数据')
xlsx4.set_center_alignment('数据量')
xlsx4.set_center_alignment('数据更新时间')
xlsx4.set_center_alignment('查询时间')
xlsx4.save()
print("数据获取完成")
def send_main(nowTime,yesterday):
yesterday_str = yesterday.strftime('%Y-%m-%d')
yesterday_month = h
yesterday_day = yesterday.day
file_name ='./{}_check_data.xlsx'.format(nowTime)
path ='./{}_check_data.xlsx'.format(yesterday_str)# ⽂件路径
if ists(path): # 如果⽂件存在
# 删除⽂件,可使⽤以下两种⽅法。
#os.unlink(path)
else:
print('no such file:%s_check_data.xlsx'%yesterday_str)# 则返回⽂件不存在
smtp = {'host': "xxxcom",'port': 25}
from_addr ='xxx@wal-mart'
to_addrs=['xxx@sskjdata']python怎么读取excel的数据
title ='%s⽉%s⽇flashreport数据监控邮件'%(yesterday_month,yesterday_day)
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论