# -*- coding: utf-8 -*- import sys import io import imaplib import email import re from datetime import datetime, timedelta from email.header import decode_header import pandas as pd # 假设 api.py 在当前目录下,且包含 API 类 import requests from typing import Optional, List, Dict, Any from decimal import Decimal import time import numpy as np from log_config import configure_task_logger, configure_error_task_logger import json # === 强制标准输出为 UTF-8 (兼容不同运行环境) === # 注意:在部分 IDE 中重新包装 sys.stdout 可能会导致乱码,若报错可注释掉以下两行 try: sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') except AttributeError: pass # ================= 配置区域 ================= EMAIL_ACCOUNT = "zhangyang@f6car.cn" PASSWORD = "RGBdMggmJ4s2FzZK" # ⚠️ 生产环境建议使用环境变量,不要硬编码 IMAP_SERVER = "imap.qiye.aliyun.com" IMAP_PORT = 993 SUBJECT_KEYWORD = "展会线索登记" DAYS_TO_SCAN = 30 # 扫描最近30天 OUTPUT_FILE = f"展会线索_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" # 定义标准字段顺序 FIELD_KEYS = ["姓名", "手机号", "省", "市", "区", "公司名称", "备注"] class API: def entry_data_list(self, data: dict, replace: bool = False, max_retries: int = 20) -> Dict: # 获取多条表单数据 """ 获取多条表单数据 :param max_retries: 最大重试次数 :param replace: 是否替换字段 :param data: api_key: 应用id entry_id: 表单id :return: """ url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list' headers = { 'Authorization': "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN", # 曹伟应用api测试 app_key 'Content-Type': 'application/json' } all_data_batches = [] # 用于存储每次请求返回的数据批次 last_data_id = None exit_flag = False while True: payload = json.dumps({ "app_id": data['api_key'], # 应用ID "entry_id": data['entry_id'], # 表单ID "limit": 90, "data_id": last_data_id, "filter": data.get('filter', None) }) retries = 0 while retries <= max_retries: data_get = None try: res = requests.post(url=url, data=payload, headers=headers, timeout=10) res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常 data_get = res.json() if data_get["data"]: all_data_batches.extend(data_get['data']) last_data_id = data_get['data'][-1].get('_id') print(f"已获取 {len(all_data_batches)} 条数据") break # 成功则跳出循环 else: if 'data' not in data_get or len(data_get['data']) == 0: exit_flag = True break retries += 1 time.sleep(0.5) # 在重试之间稍作停顿 except requests.exceptions.RequestException as e: retries += 1 time.sleep(0.5) # 在重试之间稍作停顿 if retries > max_retries: all_data_batches.append(None) # 或者可以选择记录失败的payload以便后续处理 if exit_flag: break # 构建最终返回的字典 final_data = { 'data': all_data_batches # 'data' 键对应的值是列表的列表 } return final_data @staticmethod def data_batch_create(data: dict, max_retries: int = 20) -> Optional[requests.Response]: # 新建单条数据 """ 新建单条表单数据 :param max_retries: 最大重试次数 :param data: 应该包含应用id、表单id,以及新建的数据data['data'] :return: 返回创建后简道云返回的信息 """ url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/create' headers = { 'Authorization': "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN", # 曹伟应用api测试 app_key 'Content-Type': 'application/json' } # noinspection DuplicatedCode payload = json.dumps({ "app_id": data['api_key'], # 应用ID "entry_id": data['entry_id'], # 表单ID "data": data['data'], "is_start_workflow": data.get('is_start_workflow', "false"), "is_start_trigger": data.get('is_start_trigger', "false"), "transaction_id": data.get('transaction_id', "") } ) retries = 0 while retries <= max_retries: try: res: requests.Response = requests.post(url=url, data=payload, headers=headers, timeout=10) res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常 data_get = res.json() if res.status_code == 200: return data_get else: retries += 1 time.sleep(3) # 在重试之间稍作停顿 except requests.exceptions.RequestException as e: retries += 1 time.sleep(3) # 在重试之间稍作停顿 if retries > max_retries: print( f"任务 {data['data_list']} 连续{max_retries}次请求失败,放弃此次请求。") return None # =========================================== def decode_mime_words(s): if not s: return "" decoded_parts = [] # decode_header 返回的是 list of (bytes/str, encoding) for part, encoding in decode_header(s): if isinstance(part, bytes): decoded_parts.append(part.decode(encoding or 'utf-8', errors='ignore')) else: decoded_parts.append(str(part)) return "".join(decoded_parts) def extract_data_from_body(body_text): """ 从邮件正文中提取线索数据。 格式:姓名 | 手机号 | 省 | 市 | 区 | 公司 | 备注 """ if not body_text: return [] data_list = [] # 【修复点 1】splitlines() 是方法,需要加括号 lines = body_text.splitlines() for line in lines: line = line.strip() # 如果行中没有分隔符,跳过 if '|' not in line: continue # 按 '|' 分割并去除首尾空格 parts = [p.strip() for p in line.split('|')] # 【关键校验】至少需要前两个字段(姓名、手机号)非空 if len(parts) < 2 or not parts[0] or not parts[1]: continue # 构建字典,动态映射 record = {} for i, key in enumerate(FIELD_KEYS): if i < len(parts): record[key] = parts[i] else: record[key] = "" # 缺失的字段填空字符串 data_list.append(record) return data_list def save_to_excel(leads, filename): if not leads: return None df = pd.DataFrame(leads) # 定义期望的列顺序 cols = ["姓名", "手机号", "省", "市", "区", "公司名称", "备注", "来源邮件时间"] # 确保列存在且顺序正确 # 先保留所有现有列中在 cols 里的,按 cols 顺序 ordered_cols = [c for c in cols if c in df.columns] # 再加上可能存在的其他列(虽然逻辑上不应该有,但以防万一) other_cols = [c for c in df.columns if c not in cols] final_cols = ordered_cols + other_cols df = df[final_cols] # df.to_excel(filename, index=False) return df def main(): print(f"正在连接 IMAP 服务器:{IMAP_SERVER} ...") mail = None start_date = datetime.now() - timedelta(days=DAYS_TO_SCAN) date_str = start_date.strftime("%d-%b-%Y").upper() all_leads = [] count_processed = 0 try: mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT) mail.login(EMAIL_ACCOUNT, PASSWORD) mail.select("INBOX") print(f"正在搜索 [{date_str}] 之后的邮件...") search_query = f'(SINCE "{date_str}")' status, messages = mail.search(None, search_query) if status != "OK": print("❌ 搜索失败") return mail_ids = messages[0].split() if not mail_ids: print(f"✅ 未找到 {date_str} 之后的新邮件。") return print(f"📩 找到 {len(mail_ids)} 封近期邮件,开始详细扫描...") for mail_id in mail_ids: try: status, msg_data = mail.fetch(mail_id, "(RFC822)") if status != "OK": continue raw_email = msg_data[0][1] if isinstance(raw_email, bytes): mime_msg = email.message_from_bytes(raw_email) else: mime_msg = email.message_from_string(raw_email.decode('utf-8', errors='ignore')) subject = decode_mime_words(mime_msg.get("Subject")) if SUBJECT_KEYWORD not in subject: continue count_processed += 1 date_str_full = mime_msg.get("Date") body_content = "" if mime_msg.is_multipart(): for part in mime_msg.walk(): content_disposition = part.get_content_disposition() if content_disposition and "attachment" in str(content_disposition): continue content_type = part.get_content_type() if content_type in ["text/plain", "text/html"]: try: charset = part.get_content_charset() or 'utf-8' payload = part.get_payload(decode=True) if payload: text = payload.decode(charset, errors='ignore') if isinstance(payload, bytes) else str( payload) if content_type == "text/html": text = re.sub(r'<[^>]+>', ' ', text) body_content += text + "\n" except Exception: pass else: try: charset = mime_msg.get_content_charset() or 'utf-8' payload = mime_msg.get_payload(decode=True) if payload: body_content = payload.decode(charset, errors='ignore') if isinstance(payload, bytes) else str( payload) except Exception: pass leads = extract_data_from_body(body_content) for lead in leads: lead["来源邮件时间"] = date_str_full if leads: print(f"[{subject}] -> 提取 {len(leads)} 条") all_leads.extend(leads) except Exception as e: print(f"处理邮件 ID {mail_id} 时出错:{e}") continue # ================= 新增:本地数据去重逻辑 ================= original_count = len(all_leads) if original_count > 0: seen_phones = set() unique_leads = [] for lead in all_leads: phone = str(lead.get("手机号", "")).strip() # 如果手机号为空,或者已经出现过,则跳过 if not phone or phone in seen_phones: continue seen_phones.add(phone) unique_leads.append(lead) all_leads = unique_leads removed_count = original_count - len(all_leads) if removed_count > 0: print( f"\n⚠️ 检测到重复数据,已根据【手机号】去重:原始 {original_count} 条 -> 去重后 {len(all_leads)} 条 (移除 {removed_count} 条)") else: print(f"\n✅ 数据检查完成,无重复手机号。共 {len(all_leads)} 条。") # ======================================================= df = save_to_excel(all_leads, OUTPUT_FILE) if df is not None: print(f"\n✅ 成功!共扫描 {count_processed} 封匹配邮件,最终有效线索 {len(all_leads)} 条。") print(f"文件已保存至:{OUTPUT_FILE}") else: print(f"\n⚠️ 扫描完成,但在 {count_processed} 封近期邮件中未找到符合格式的数据。") return # 如果没有数据,后续同步逻辑无需执行 # 同步至简道云 if all_leads: print("\n开始同步至简道云...") api_instance = API() payload_query = { "api_key": "66b9678280b37f8a276b1d01", "entry_id": "69b22dc5434e05c7b6b4b5b2", } try: response = api_instance.entry_data_list(payload_query) now_data = response.get("data", []) if response else [] existing_phones = set() phone_widget_id = "_widget_1692928669587" for item in now_data: phone_val = item.get(phone_widget_id) if phone_val: existing_phones.add(str(phone_val).strip()) print(f"简道云现有手机号数量:{len(existing_phones)}") new_count = 0 # 此时 df 已经是去重后的数据,且 all_leads 也是去重后的 # 再次遍历 df 确保只提交本地没有的(防止简道云已有但本地没查到的情况,虽然逻辑上上面已经过滤了) # 为了代码健壮性,这里保留原有的 existing_phones 检查逻辑 for index, row in df.iterrows(): current_phone = str(row["手机号"]).strip() if not current_phone: continue # 双重保险:如果简道云里已经有了,跳过 if current_phone in existing_phones: print(f"跳过 (云端已存在): {current_phone}") continue new_payload = { "api_key": "66b9678280b37f8a276b1d01", "entry_id": "69b22dc5434e05c7b6b4b5b2", "data": { "_widget_1690785229260": {"value": row.get("姓名", "")}, "_widget_1690785229261": {"value": row.get("公司名称", "")}, "_widget_1692928669587": {"value": row.get("手机号", "")}, "_widget_1690785229266": {"value": row.get("备注", "")}, "_widget_1690785326597": {"value": {"province": row.get("省", ""), "city": row.get("市", ""), "district": row.get("区", ""), "detail": row.get("省", "") + row.get("市", "") + row.get( "区", ""), } }, "_widget_1690785229279": {"value": row.get("市", "")}, "_widget_1773381838511": {"value": row.get("省", "")}, "_widget_1692070309987": {"value": row.get("区", "")}, } } result = api_instance.data_batch_create(new_payload) if result and (result.get("success") or result.get("code") == 200 or "error" not in result): new_count += 1 print(f"新增成功:{current_phone} ({row.get('姓名')})") else: print(f"提交结果:{current_phone}, 返回:{result}") # 如果提交成功但返回格式奇怪,也可以考虑计入成功,视具体API文档而定 # 这里保守处理,只有明确成功才算 print(f"\n✅ 同步完成,本次新增 {new_count} 条数据。") except Exception as api_err: print(f"\n❌ 简道云 API 交互错误:{api_err}") import traceback traceback.print_exc() except imaplib.IMAP4.error as e: print("\n❌ IMAP 协议错误:") print(f"错误详情:{e}") except Exception as e: print("\n❌ 发生严重错误:") import traceback traceback.print_exc() finally: if mail: try: mail.close() mail.logout() except: pass if __name__ == "__main__": main()