Files
saas/test/展会线索登记.py
T
2026-03-25 09:34:48 +08:00

446 lines
18 KiB
Python

# -*- coding: utf-8 -*-
import sys
import io
import imaplib
import email
import re
from datetime import datetime, timedelta
from email.header import decode_header
import pandas as pd
# 假设 api.py 在当前目录下,且包含 API 类
import requests
from typing import Optional, List, Dict, Any
from decimal import Decimal
import time
import numpy as np
from log_config import configure_task_logger, configure_error_task_logger
import json
# === 强制标准输出为 UTF-8 (兼容不同运行环境) ===
# 注意:在部分 IDE 中重新包装 sys.stdout 可能会导致乱码,若报错可注释掉以下两行
try:
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
except AttributeError:
pass
# ================= 配置区域 =================
EMAIL_ACCOUNT = "zhangyang@f6car.cn"
PASSWORD = "RGBdMggmJ4s2FzZK" # ⚠️ 生产环境建议使用环境变量,不要硬编码
IMAP_SERVER = "imap.qiye.aliyun.com"
IMAP_PORT = 993
SUBJECT_KEYWORD = "展会线索登记"
DAYS_TO_SCAN = 30 # 扫描最近30天
OUTPUT_FILE = f"展会线索_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
# 定义标准字段顺序
FIELD_KEYS = ["姓名", "手机号", "", "", "", "公司名称", "备注"]
class API:
def entry_data_list(self, data: dict, replace: bool = False, max_retries: int = 20) -> Dict: # 获取多条表单数据
"""
获取多条表单数据
:param max_retries: 最大重试次数
:param replace: 是否替换字段
:param data:
api_key: 应用id
entry_id: 表单id
:return:
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
headers = {
'Authorization': "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN", # 曹伟应用api测试 app_key
'Content-Type': 'application/json'
}
all_data_batches = [] # 用于存储每次请求返回的数据批次
last_data_id = None
exit_flag = False
while True:
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"limit": 90,
"data_id": last_data_id,
"filter": data.get('filter', None)
})
retries = 0
while retries <= max_retries:
data_get = None
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
if data_get["data"]:
all_data_batches.extend(data_get['data'])
last_data_id = data_get['data'][-1].get('_id')
print(f"已获取 {len(all_data_batches)} 条数据")
break # 成功则跳出循环
else:
if 'data' not in data_get or len(data_get['data']) == 0:
exit_flag = True
break
retries += 1
time.sleep(0.5) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
retries += 1
time.sleep(0.5) # 在重试之间稍作停顿
if retries > max_retries:
all_data_batches.append(None) # 或者可以选择记录失败的payload以便后续处理
if exit_flag:
break
# 构建最终返回的字典
final_data = {
'data': all_data_batches # 'data' 键对应的值是列表的列表
}
return final_data
@staticmethod
def data_batch_create(data: dict, max_retries: int = 20) -> Optional[requests.Response]: # 新建单条数据
"""
新建单条表单数据
:param max_retries: 最大重试次数
:param data: 应该包含应用id、表单id,以及新建的数据data['data']
:return: 返回创建后简道云返回的信息
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/create'
headers = {
'Authorization': "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN", # 曹伟应用api测试 app_key
'Content-Type': 'application/json'
}
# noinspection DuplicatedCode
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"data": data['data'],
"is_start_workflow": data.get('is_start_workflow', "false"),
"is_start_trigger": data.get('is_start_trigger', "false"),
"transaction_id": data.get('transaction_id', "")
}
)
retries = 0
while retries <= max_retries:
try:
res: requests.Response = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
if res.status_code == 200:
return data_get
else:
retries += 1
time.sleep(3) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
retries += 1
time.sleep(3) # 在重试之间稍作停顿
if retries > max_retries:
print(
f"任务 {data['data_list']} 连续{max_retries}次请求失败,放弃此次请求。")
return None
# ===========================================
def decode_mime_words(s):
if not s:
return ""
decoded_parts = []
# decode_header 返回的是 list of (bytes/str, encoding)
for part, encoding in decode_header(s):
if isinstance(part, bytes):
decoded_parts.append(part.decode(encoding or 'utf-8', errors='ignore'))
else:
decoded_parts.append(str(part))
return "".join(decoded_parts)
def extract_data_from_body(body_text):
"""
从邮件正文中提取线索数据。
格式:姓名 | 手机号 | 省 | 市 | 区 | 公司 | 备注
"""
if not body_text:
return []
data_list = []
# 【修复点 1】splitlines() 是方法,需要加括号
lines = body_text.splitlines()
for line in lines:
line = line.strip()
# 如果行中没有分隔符,跳过
if '|' not in line:
continue
# 按 '|' 分割并去除首尾空格
parts = [p.strip() for p in line.split('|')]
# 【关键校验】至少需要前两个字段(姓名、手机号)非空
if len(parts) < 2 or not parts[0] or not parts[1]:
continue
# 构建字典,动态映射
record = {}
for i, key in enumerate(FIELD_KEYS):
if i < len(parts):
record[key] = parts[i]
else:
record[key] = "" # 缺失的字段填空字符串
data_list.append(record)
return data_list
def save_to_excel(leads, filename):
if not leads:
return None
df = pd.DataFrame(leads)
# 定义期望的列顺序
cols = ["姓名", "手机号", "", "", "", "公司名称", "备注", "来源邮件时间"]
# 确保列存在且顺序正确
# 先保留所有现有列中在 cols 里的,按 cols 顺序
ordered_cols = [c for c in cols if c in df.columns]
# 再加上可能存在的其他列(虽然逻辑上不应该有,但以防万一)
other_cols = [c for c in df.columns if c not in cols]
final_cols = ordered_cols + other_cols
df = df[final_cols]
# df.to_excel(filename, index=False)
return df
def main():
print(f"正在连接 IMAP 服务器:{IMAP_SERVER} ...")
mail = None
start_date = datetime.now() - timedelta(days=DAYS_TO_SCAN)
date_str = start_date.strftime("%d-%b-%Y").upper()
all_leads = []
count_processed = 0
try:
mail = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
mail.login(EMAIL_ACCOUNT, PASSWORD)
mail.select("INBOX")
print(f"正在搜索 [{date_str}] 之后的邮件...")
search_query = f'(SINCE "{date_str}")'
status, messages = mail.search(None, search_query)
if status != "OK":
print("❌ 搜索失败")
return
mail_ids = messages[0].split()
if not mail_ids:
print(f"✅ 未找到 {date_str} 之后的新邮件。")
return
print(f"📩 找到 {len(mail_ids)} 封近期邮件,开始详细扫描...")
for mail_id in mail_ids:
try:
status, msg_data = mail.fetch(mail_id, "(RFC822)")
if status != "OK":
continue
raw_email = msg_data[0][1]
if isinstance(raw_email, bytes):
mime_msg = email.message_from_bytes(raw_email)
else:
mime_msg = email.message_from_string(raw_email.decode('utf-8', errors='ignore'))
subject = decode_mime_words(mime_msg.get("Subject"))
if SUBJECT_KEYWORD not in subject:
continue
count_processed += 1
date_str_full = mime_msg.get("Date")
body_content = ""
if mime_msg.is_multipart():
for part in mime_msg.walk():
content_disposition = part.get_content_disposition()
if content_disposition and "attachment" in str(content_disposition):
continue
content_type = part.get_content_type()
if content_type in ["text/plain", "text/html"]:
try:
charset = part.get_content_charset() or 'utf-8'
payload = part.get_payload(decode=True)
if payload:
text = payload.decode(charset, errors='ignore') if isinstance(payload,
bytes) else str(
payload)
if content_type == "text/html":
text = re.sub(r'<[^>]+>', ' ', text)
body_content += text + "\n"
except Exception:
pass
else:
try:
charset = mime_msg.get_content_charset() or 'utf-8'
payload = mime_msg.get_payload(decode=True)
if payload:
body_content = payload.decode(charset, errors='ignore') if isinstance(payload,
bytes) else str(
payload)
except Exception:
pass
leads = extract_data_from_body(body_content)
for lead in leads:
lead["来源邮件时间"] = date_str_full
if leads:
print(f"[{subject}] -> 提取 {len(leads)}")
all_leads.extend(leads)
except Exception as e:
print(f"处理邮件 ID {mail_id} 时出错:{e}")
continue
# ================= 新增:本地数据去重逻辑 =================
original_count = len(all_leads)
if original_count > 0:
seen_phones = set()
unique_leads = []
for lead in all_leads:
phone = str(lead.get("手机号", "")).strip()
# 如果手机号为空,或者已经出现过,则跳过
if not phone or phone in seen_phones:
continue
seen_phones.add(phone)
unique_leads.append(lead)
all_leads = unique_leads
removed_count = original_count - len(all_leads)
if removed_count > 0:
print(
f"\n⚠️ 检测到重复数据,已根据【手机号】去重:原始 {original_count} 条 -> 去重后 {len(all_leads)} 条 (移除 {removed_count} 条)")
else:
print(f"\n✅ 数据检查完成,无重复手机号。共 {len(all_leads)} 条。")
# =======================================================
df = save_to_excel(all_leads, OUTPUT_FILE)
if df is not None:
print(f"\n✅ 成功!共扫描 {count_processed} 封匹配邮件,最终有效线索 {len(all_leads)} 条。")
print(f"文件已保存至:{OUTPUT_FILE}")
else:
print(f"\n⚠️ 扫描完成,但在 {count_processed} 封近期邮件中未找到符合格式的数据。")
return # 如果没有数据,后续同步逻辑无需执行
# 同步至简道云
if all_leads:
print("\n开始同步至简道云...")
api_instance = API()
payload_query = {
"api_key": "66b9678280b37f8a276b1d01",
"entry_id": "69b22dc5434e05c7b6b4b5b2",
}
try:
response = api_instance.entry_data_list(payload_query)
now_data = response.get("data", []) if response else []
existing_phones = set()
phone_widget_id = "_widget_1692928669587"
for item in now_data:
phone_val = item.get(phone_widget_id)
if phone_val:
existing_phones.add(str(phone_val).strip())
print(f"简道云现有手机号数量:{len(existing_phones)}")
new_count = 0
# 此时 df 已经是去重后的数据,且 all_leads 也是去重后的
# 再次遍历 df 确保只提交本地没有的(防止简道云已有但本地没查到的情况,虽然逻辑上上面已经过滤了)
# 为了代码健壮性,这里保留原有的 existing_phones 检查逻辑
for index, row in df.iterrows():
current_phone = str(row["手机号"]).strip()
if not current_phone:
continue
# 双重保险:如果简道云里已经有了,跳过
if current_phone in existing_phones:
print(f"跳过 (云端已存在): {current_phone}")
continue
new_payload = {
"api_key": "66b9678280b37f8a276b1d01",
"entry_id": "69b22dc5434e05c7b6b4b5b2",
"data": {
"_widget_1690785229260": {"value": row.get("姓名", "")},
"_widget_1690785229261": {"value": row.get("公司名称", "")},
"_widget_1692928669587": {"value": row.get("手机号", "")},
"_widget_1690785229266": {"value": row.get("备注", "")},
"_widget_1690785326597": {"value": {"province": row.get("", ""),
"city": row.get("", ""),
"district": row.get("", ""),
"detail": row.get("", "") + row.get("",
"") + row.get(
"", ""),
}
},
"_widget_1690785229279": {"value": row.get("", "")},
"_widget_1773381838511": {"value": row.get("", "")},
"_widget_1692070309987": {"value": row.get("", "")},
}
}
result = api_instance.data_batch_create(new_payload)
if result and (result.get("success") or result.get("code") == 200 or "error" not in result):
new_count += 1
print(f"新增成功:{current_phone} ({row.get('姓名')})")
else:
print(f"提交结果:{current_phone}, 返回:{result}")
# 如果提交成功但返回格式奇怪,也可以考虑计入成功,视具体API文档而定
# 这里保守处理,只有明确成功才算
print(f"\n✅ 同步完成,本次新增 {new_count} 条数据。")
except Exception as api_err:
print(f"\n❌ 简道云 API 交互错误:{api_err}")
import traceback
traceback.print_exc()
except imaplib.IMAP4.error as e:
print("\n❌ IMAP 协议错误:")
print(f"错误详情:{e}")
except Exception as e:
print("\n❌ 发生严重错误:")
import traceback
traceback.print_exc()
finally:
if mail:
try:
mail.close()
mail.logout()
except:
pass
if __name__ == "__main__":
main()