import os import poplib import time import pandas as pd from email.parser import Parser from email.header import decode_header from email.utils import parseaddr from datetime import datetime from config import Config from api import API from back_ground_module import CommonModule import pandas as pd import pymysql from pymysql import Error api_instance = API() common_module = CommonModule() class EmailProcessor: """泰国CRM每日邮件写入简道云与BI""" def __init__(self): # 配置信息 self.user_email_address = 'caowei@f6car.cn' self.user_password = 'Cw@340826' self.pop_server_host = 'pop.qiye.aliyun.com' self.pop_server_port = '995' self.send_name = "f6car" self.send_addr = 'noreplay@notice.f6car.com' # 创建输出目录(如果不存在) output_dir = "email" os.makedirs(output_dir, exist_ok=True) nowtime = datetime.now().strftime("%Y%m%d%H%M%S") self.write_path = os.path.join(output_dir, f'email_data.xlsx') # 初始化字段映射 self.field_mapping = { "指标归属日期": "_widget_1742174728275", "公司ID": "_widget_1742091963874", "公司名称": "_widget_1742091963875", "门店ID": "_widget_1742091963876", "门店名称": "_widget_1742091963877", "门店简称": "_widget_1742091963878", "门店创建时间": "_widget_1742091963879", "指标类型": "_widget_1742091963880", "指标值": "_widget_1742091963882", "指标子类型": "_widget_1742091963881", "指标值": "_widget_1742091963882" } def connect_email_by_pop3(self): try: # 连接到POP服务器 email_server = poplib.POP3_SSL( host=self.pop_server_host, port=self.pop_server_port, timeout=10 ) print("POP服务器连接成功,开始用户邮箱验证") except Exception as e: print(f"POP服务器连接失败。错误: {str(e)}") exit(1) try: # 验证用户邮箱 email_server.user(self.user_email_address) print("用户邮箱验证成功,开始授权码验证") except Exception as e: print(f"用户邮箱验证失败。错误: {str(e)}") exit(1) try: # 验证密码/授权码 email_server.pass_(self.user_password) print("授权码验证成功,开始处理邮件") except Exception as e: print(f"授权码验证失败。错误: {str(e)}") exit(1) # 处理邮件 self.parse_email_server(email_server) def parse_email_server(self, email_server): # 获取所有邮件列表 resp, mails, octets = email_server.list() index = len(mails) # 获取今天的零点时间戳 now = datetime.now() today_start = datetime(now.year, now.month, now.day) # 当天零点 today_start_timestamp = int(today_start.timestamp()) # 转换为时间戳 # 按逆序处理邮件(最新的先处理) for i in range(index, 0, -1): # print(f"正在处理邮件 {i},{index}") try: # 获取邮件内容 resp, lines, octets = email_server.retr(i) msg_content = b'\r\n'.join(lines).decode('utf-8', errors='ignore') # 避免解码错误 msg = Parser().parsestr(msg_content) # 处理邮件时间 mail_datetime = self.parse_mail_time(msg.get("date")) if not mail_datetime: # 如果邮件时间解析失败,跳过 # logging.warning(f"Failed to parse date for email {i}. Skipping...") continue # 将邮件时间转换为时间戳 mail_timestamp = int(mail_datetime.timestamp()) # 如果邮件不是今天的,跳过 if mail_timestamp < today_start_timestamp: # logging.info(f"Skipping email {i} as it is not from today.") # continue break # 打印邮件接收时间 mail_time_str = datetime.strftime(mail_datetime, '%Y-%m-%d %H:%M:%S') print(f"邮件接收时间: {mail_time_str}") # 处理邮件内容 self.parser_content(msg, 0) except Exception as e: # logging.error(f"Error processing email {i}: {e}") print(f"Error processing email {i}: {e}") continue # 退出服务器 email_server.quit() def parser_content(self, msg, indent): print("邮件处理") if indent == 0: self.parser_email_header(msg) # 解析发件人信息 hdr, addr = parseaddr(msg['From']) name, charset = decode_header(hdr)[0] if charset: name = name.decode(charset) print(f'发件人姓名: {name}, 发件人邮箱: {addr}') if name == self.send_name: # 下载附件 for part in msg.walk(): file_name = part.get_filename() if file_name is None: continue filename = self.decode_str(file_name) data = part.get_payload(decode=True) try: with open(self.write_path, 'wb') as att_file: att_file.write(data) print(f"附件保存成功: {self.write_path}+{filename}") except Exception as e: print(f"附件保存失败: {str(e)}") if msg.is_multipart(): parts = msg.get_payload() for part in parts: self.parser_content(part, indent + 1) else: # 解析邮件正文 content_type = msg.get_content_type() if content_type in ['text/plain', 'text/html']: content = msg.get_payload(decode=True) charset = self.guess_charset(msg) if charset: content = content.decode(charset) print(f"{' ' * indent}邮件内容: {content}") def parser_email_header(self, msg): # 解析邮件主题 subject = msg['Subject'] value, charset = decode_header(subject)[0] if charset: value = value.decode(charset) print(f'邮件主题: {value}') # 解析发件人信息 hdr, addr = parseaddr(msg['From']) name, charset = decode_header(hdr)[0] if charset: name = name.decode(charset) print(f'发件人姓名: {name}, 发件人邮箱: {addr}') # 解析收件人信息 hdr, addr = parseaddr(msg['To']) name, charset = decode_header(hdr)[0] if charset: name = name.decode(charset) print(f'收件人姓名: {name}, 收件人邮箱: {addr}') @staticmethod def decode_str(s): value, charset = decode_header(s)[0] if charset: value = value.decode(charset) return value @staticmethod def guess_charset(msg): charset = msg.get_charset() if charset is None: content_type = msg.get('Content-Type', '').lower() for item in content_type.split(';'): item = item.strip() if item.startswith('charset'): charset = item.split('=')[1] break return charset @staticmethod def parse_mail_time(mail_datetime): GMT_FORMAT = "%a, %d %b %Y %H:%M:%S" GMT_FORMAT2 = "%d %b %Y %H:%M:%S" index = mail_datetime.find(' +0') if index > 0: mail_datetime = mail_datetime[:index] # 移除时区信息 formats = [GMT_FORMAT, GMT_FORMAT2] for ft in formats: try: mail_datetime = datetime.strptime(mail_datetime, ft) return mail_datetime except: pass raise Exception("邮件时间格式解析错误") @staticmethod def row_to_dict(row, field_mapping): """将一行数据转换为格式化字典""" result = {} for col_name, widget_id in field_mapping.items(): if col_name in row: value = row[col_name] clean_value = None if pd.isna(value) else value result[widget_id] = {"value": clean_value} return result def update_email(self): # try: print(self.write_path) email_df = pd.read_excel(fr"C:\Users\Administrator.DESKTOP-7IC2USJ\Desktop\新建文件夹\门店使用数据周报2025-07-11.xlsx", sheet_name="Sheet0") print(email_df.head()) email_df['公司ID'] = email_df['公司ID'].astype(str) email_df['门店ID'] = email_df['门店ID'].astype(str) email_df['指标归属日期'] = pd.to_datetime(email_df['指标归属日期'], format="%Y/%m/%d").dt.strftime("%Y-%m-%d") email_df["门店创建时间"] = pd.to_datetime(email_df['门店创建时间'], format="%Y-%m-%d %H:%M:%S") new_email_df = email_df.copy() # 拷贝传参 for index, row in email_df.iterrows(): email_df.loc[index, '指标归属日期'] = common_module.time_to_UTC(row['指标归属日期']) email_df.loc[index, '门店创建时间'] = common_module.time_to_UTC(row['门店创建时间']) email_data = [self.row_to_dict(row, self.field_mapping) for index, row in email_df.iterrows()] new_email_data = {'api_key': "673457d6837e60a418e0e56b", 'entry_id': "67d636bb6212b7619a7a4231", # 'entry_id': "684157deab0c4c9ec636ed36", # 测试 "data_list": email_data} api_instance.entry_data_batch_create(new_email_data) # os.remove(self.write_path) return new_email_df def up_to_BI(self, df): # 连接信息 # df = pd.read_excel(fr"C:\Users\Administrator.DESKTOP-7IC2USJ\Desktop\新建文件夹\门店使用数据周报2025-07-12.xlsx", sheet_name="Sheet0") HS_DB_Config = Config.HS_DB_Config table_name = "thailand_store_data_email" try: # 连接 connection = pymysql.connect( host=HS_DB_Config["host"], user=HS_DB_Config["user"], password=HS_DB_Config["password"], database=HS_DB_Config["database"], charset='utf8mb4', ) print(f"成功连接 {HS_DB_Config["database"]}") with connection.cursor() as cursor: # 处理数据 df = df.where(pd.notna(df), None) # 将NaN转换为None # 生成插入语句 columns = ', '.join(df.columns) placeholders = ', '.join(['%s'] * len(df.columns)) insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" # 批量插入数据 records = [tuple(row) for row in df.values] cursor.executemany(insert_query, records) connection.commit() print(f"成功导入 {cursor.rowcount} 条记录到 {table_name} 表") except Error as e: print(f"数据库操作出错: {e}") if connection: connection.rollback() finally: if connection: connection.close() @classmethod def main(cls): """邮件处理器的主入口点""" task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") processor = cls() # processor.connect_email_by_pop3() df = pd.read_excel(fr"C:\Users\Administrator.DESKTOP-7IC2USJ\Desktop\新建文件夹\门店使用数据周报2025-07-11.xlsx", sheet_name="Sheet0") # email_df = processor.update_email() processor.up_to_BI(df) # 发送到BI common_module.send_task_status(task_start_time, "海外邮件推送") if __name__ == "__main__": EmailProcessor.main()