修复因无新增客户导致NGV数据新增异常

This commit is contained in:
2026-01-12 16:27:15 +08:00
parent 923c035fd5
commit 1ef81def0f
9 changed files with 507 additions and 78 deletions
+189
View File
@@ -0,0 +1,189 @@
import os
from datetime import datetime, timezone, timedelta
import pandas as pd
from tqdm import tqdm
import json
from yd_api import YDAPI
from api import API
import time
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)
api_instance = API()
yd_api_instance = YDAPI()
def generate_monthly_ranges(start: str, end: str):
"""
生成按自然月划分的时间段列表(左闭右开)
例如: [('2025-11-01T00:00:00Z', '2025-12-01T00:00:00Z'), ...]
"""
start_dt = datetime.fromisoformat(start.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(end.replace("Z", "+00:00"))
ranges = []
current = start_dt
while current < end_dt:
# 下一个月的第一天
if current.month == 12:
next_month = current.replace(year=current.year + 1, month=1, day=1)
else:
next_month = current.replace(month=current.month + 1, day=1)
# 不超过 end_dt
segment_end = min(next_month, end_dt)
ranges.append((
current.strftime("%Y-%m-%dT00:00:00Z"),
segment_end.strftime("%Y-%m-%dT00:00:00Z")
))
current = next_month
return ranges
class GetYDData:
def __init__(self):
self.FORMID = "FORM-PE866MD1MJMU0WGLYRFLYEN5YN9L1I55Z7ZUK22"
self.appType = "APP_UYZ0KG6L0CCNV80GZ66O"
self.systemToken = "XA966F81JAJOFCVVVKO64E9MIIZV1EWE5SFMKJ2"
# 第一段:2025-01-01 到 2025-11-01
first_segment = ("2025-01-01T00:00:00Z", "2025-11-01T00:00:00Z")
# 第二段:2025-11-01 到当前时间(按月拆分)
now_utc_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
monthly_segments = generate_monthly_ranges("2025-11-01T00:00:00Z", now_utc_str)
# 合并所有时间段
self.time_ranges = [first_segment] + monthly_segments
print("📅 计划拉取以下时间段:")
for i, (s, e) in enumerate(self.time_ranges, 1):
print(f" {i}. {s}{e}")
def build_value_to_label_map(self, form_structure):
value_to_label_map = {}
fields = form_structure.get("result", [])
for field in fields:
field_id = field.get("fieldId")
component = field.get("componentName")
props = field.get("props", {})
data_source = props.get("dataSource", [])
if component in ["SelectField", "RadioField"] and data_source:
option_map = {}
for opt in data_source:
val = opt.get("value")
if val is None:
continue
text_obj = opt.get("text", {})
if isinstance(text_obj, dict):
zh_text = text_obj.get("zh_CN")
if zh_text is None and "value" in text_obj:
raw = text_obj["value"]
if isinstance(raw, str) and raw.startswith('"') and raw.endswith('"'):
zh_text = raw[1:-1]
else:
zh_text = str(val)
elif zh_text is None:
zh_text = str(val)
else:
zh_text = str(text_obj)
option_map[str(val)] = zh_text
if option_map:
value_to_label_map[field_id] = option_map
return value_to_label_map
def convert_record_values(self, record, value_map):
converted = {}
for key, val in record.items():
if key in value_map and val is not None:
str_val = str(val)
converted[key] = value_map[key].get(str_val, val)
else:
converted[key] = val
return converted
def fetch_records_in_range(self, token, start_time, end_time):
"""拉取指定时间范围内的所有记录"""
try:
first_page = yd_api_instance.read_processes_instances(
token=token,
formUuid=self.FORMID,
page=1,
n=100,
appType=self.appType,
systemToken=self.systemToken,
instanceStatus="RUNNING",
modifiedFromTimeGMT=start_time,
modifiedToTimeGMT=end_time,
)
except Exception as e:
print(f"❌ 首页请求失败 ({start_time} {end_time}): {e}")
return []
total_count = first_page.get("totalCount", 0)
total_pages = (total_count // 100) + (1 if total_count % 100 else 0)
print(f"📊 [{start_time[:10]} {end_time[:10]}] 总记录数: {total_count}, 共 {total_pages}")
all_records = []
if total_count > 0:
all_records.extend(first_page.get("data", []))
for page in tqdm(range(2, total_pages + 1), desc=f"{start_time[:7]}"):
try:
resp = yd_api_instance.read_processes_instances(
token=token,
formUuid=self.FORMID,
page=page,
n=100,
appType=self.appType,
systemToken=self.systemToken,
instanceStatus="RUNNING",
modifiedFromTimeGMT=start_time,
modifiedToTimeGMT=end_time,
)
page_data = resp.get("data", [])
all_records.extend(page_data)
time.sleep(0.15) # 稍微增加间隔,更安全
except Exception as e:
print(f"⚠️ 第 {page} 页失败 ({start_time[:10]}): {e}")
continue
return all_records
def main(self):
# Step 1: 获取表单结构
token = yd_api_instance.generateToken()
form_struct = yd_api_instance.get_form_structures(
token=token,
formUuid=self.FORMID
)
value_map = self.build_value_to_label_map(form_struct)
print("\n✅ 表单选项映射构建完成")
# Step 2: 按时间段拉取
all_records = []
for start_time, end_time in self.time_ranges:
print(f"\n⏳ 拉取: {start_time}{end_time}")
records = self.fetch_records_in_range(token, start_time, end_time)
all_records.extend(records)
print(f"\n📥 总共获取 {len(all_records)} 条流程实例")
# Step 3: 转换 formData
converted_records = []
for inst in all_records:
form_data = inst.get("formData", {})
converted = self.convert_record_values(form_data, value_map)
converted_records.append(converted)
# Step 4: 保存
if converted_records:
df = pd.DataFrame(converted_records)
output_path = os.path.join(output_dir, "converted_yd_data.csv")
df.to_csv(output_path, index=False, encoding="utf_8_sig")
print(f"\n✅ 成功保存 {len(converted_records)} 条记录至: {output_path}")
else:
print("\n❌ 无有效数据")
if __name__ == "__main__":
GetYDData().main()
+158
View File
@@ -0,0 +1,158 @@
{
"cells": [
{
"cell_type": "code",
"id": "initial_id",
"metadata": {
"collapsed": true,
"ExecuteTime": {
"end_time": "2026-01-07T02:17:11.661841100Z",
"start_time": "2026-01-07T02:17:11.600589500Z"
}
},
"source": [
"from back_ground_module import CommonModule\n",
"from api import API\n",
"from log_config import configure_task_logger, configure_error_task_logger\n",
"from datetime import datetime, timedelta, timezone\n",
"import pandas as pd\n",
"import os\n",
"\n",
"# 获取已经配置好的常规日志记录器\n",
"logger = configure_task_logger()\n",
"# 获取已经配置好的错误任务日志记录器\n",
"error_task_logger = configure_error_task_logger()\n",
"# 保存为CSV文件\n",
"output_dir = \"output\" # 设置输出目录\n",
"# 创建输出目录(如果不存在)\n",
"import os\n",
"\n",
"os.makedirs(output_dir, exist_ok=True)\n",
"common_module = CommonModule()\n",
"api_instance = API()\n",
"\n",
"data_JCB = common_module.get_jcb_details()\n",
"current_local = datetime.now() + timedelta(days=-1) # tz-naive,代表本地时间\n",
"current_date_str = current_local.strftime(\"%Y-%m-%d\")\n",
"# 计算30天前的本地日期(用于开户日判断)\n",
"thirty_days_ago_local = (current_local - timedelta(days=30)).date()\n",
"payload = {\"api_key\": \"6717470a0b3975ef583c6df1\",\n",
" \"entry_id\": \"67174710da507490d8ac12c1\",\n",
" }\n",
"daily_revisit = api_instance.entry_data_list(payload)\n",
"daily_revisit_list = daily_revisit.get(\"data\") # api请求格式,将数据封装在data字典里\n",
"abnormal_data = []\n",
"for index, row in data_JCB.iterrows():\n",
" try:\n",
" # 开户日是本地日期字符串,解析为 date 对象\n",
" open_date = datetime.strptime(str(row['开户日']), \"%Y-%m-%d\").date()\n",
" except (ValueError, TypeError):\n",
" continue # 跳过无效日期\n",
"\n",
" if (\n",
" open_date < thirty_days_ago_local\n",
" and row['近30天开单天数'] == 0\n",
" and row['客户状态'] == \"留存\"\n",
" ):\n",
" new_row = row.copy()\n",
" new_row[\"日期\"] = open_date.strftime(\"%Y-%m-%d\")\n",
" abnormal_data.append(new_row)\n",
"\n",
"abnormal_data = pd.DataFrame(abnormal_data) if abnormal_data else pd.DataFrame()\n",
"\n",
"if not abnormal_data.empty:\n",
" abnormal_data[\"表单类型\"] = \"异常待办\"\n",
" abnormal_data[\"派发日期\"] = current_date_str\n",
"\n",
" # 清洗手机号(仅去除浮点型 .0)\n",
" def clean_phone(x):\n",
" if pd.isna(x) or x == \"\" or x == \"None\":\n",
" return \"\"\n",
" s = str(x)\n",
" if s.endswith('.0') and s[:-2].isdigit():\n",
" return s[:-2]\n",
" return s\n",
"\n",
" abnormal_data['联系手机号'] = abnormal_data['联系手机号'].apply(clean_phone)\n",
"\n",
"# 构建云端已派发记录 DataFrame\n",
"df_cloud = pd.DataFrame([\n",
" {\n",
" \"数据id\": item.get(\"_id\", \"\"),\n",
" \"账号\": item.get(\"_widget_1739258942667\", \"\"),\n",
" \"提交时间\": item.get(\"createTime\", \"\"),\n",
" \"表单类型\": item.get(\"_widget_1739951204545\", \"\")\n",
" }\n",
" for item in daily_revisit_list\n",
"])\n",
"\n",
"recent_accounts = set()\n",
"if not df_cloud.empty and not abnormal_data.empty:\n",
" # 将 createTime 转为 UTC 时间(强制统一时区)\n",
" df_cloud[\"提交时间\"] = pd.to_datetime(df_cloud[\"提交时间\"], utc=True, errors=\"coerce\")\n",
" df_cloud = df_cloud.dropna(subset=[\"提交时间\"])\n",
"\n",
" # 筛选“异常待办”\n",
" df_abnormal_cloud = df_cloud[df_cloud[\"表单类型\"] == \"异常待办\"]\n",
"\n",
" if not df_abnormal_cloud.empty:\n",
" # 每个账号保留最新一条\n",
" df_recent = df_abnormal_cloud.sort_values(\"提交时间\").groupby(\"账号\", as_index=False).tail(1)\n",
"\n",
" current_utc = datetime.now(timezone.utc)\n",
" cutoff_utc = pd.Timestamp(current_utc) - pd.Timedelta(days=30)\n",
"\n",
" # 安全比较:两边都是 UTC\n",
" recent_accounts = set(df_recent[df_recent[\"提交时间\"] > cutoff_utc][\"账号\"])\n",
"\n",
"# 剔除已派发账号 + 过滤有效手机号\n",
"if not abnormal_data.empty:\n",
" abnormal_data = abnormal_data[\n",
" (~abnormal_data[\"账号\"].isin(recent_accounts)) &\n",
" (abnormal_data[\"联系手机号\"].notna()) &\n",
" (abnormal_data[\"联系手机号\"] != \"\") &\n",
" (abnormal_data[\"联系手机号\"] != \"None\")\n",
" ]\n",
"\n",
"# # 保存结果\n",
"output_path = os.path.join(output_dir, \"异常待办1.csv\")\n",
"abnormal_data.to_csv(output_path, index=False)"
],
"outputs": [
{
"ename": "ModuleNotFoundError",
"evalue": "No module named 'back_ground_module'",
"output_type": "error",
"traceback": [
"\u001B[31m---------------------------------------------------------------------------\u001B[39m",
"\u001B[31mModuleNotFoundError\u001B[39m Traceback (most recent call last)",
"\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[3]\u001B[39m\u001B[32m, line 1\u001B[39m\n\u001B[32m----> \u001B[39m\u001B[32m1\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mback_ground_module\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m CommonModule\n\u001B[32m 2\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mapi\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m API\n\u001B[32m 3\u001B[39m \u001B[38;5;28;01mfrom\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[34;01mlog_config\u001B[39;00m\u001B[38;5;250m \u001B[39m\u001B[38;5;28;01mimport\u001B[39;00m configure_task_logger, configure_error_task_logger\n",
"\u001B[31mModuleNotFoundError\u001B[39m: No module named 'back_ground_module'"
]
}
],
"execution_count": 3
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
+112
View File
@@ -0,0 +1,112 @@
from back_ground_module import CommonModule
from api import API
from log_config import configure_task_logger, configure_error_task_logger
from datetime import datetime, timedelta, timezone
import pandas as pd
import os
# 获取已经配置好的常规日志记录器
logger = configure_task_logger()
# 获取已经配置好的错误任务日志记录器
error_task_logger = configure_error_task_logger()
# 保存为CSV文件
output_dir = "output" # 设置输出目录
# 创建输出目录(如果不存在)
import os
os.makedirs(output_dir, exist_ok=True)
common_module = CommonModule()
api_instance = API()
data_JCB = common_module.get_jcb_details()
output_path = os.path.join(output_dir, "借车包明细.csv")
data_JCB.to_csv(output_path, index=False)
current_local = datetime.now() + timedelta(days=-1) # tz-naive,代表本地时间
current_date_str = current_local.strftime("%Y-%m-%d")
# 计算30天前的本地日期(用于开户日判断)
thirty_days_ago_local = (current_local - timedelta(days=30)).date()
payload = {"api_key": "6717470a0b3975ef583c6df1",
"entry_id": "67174710da507490d8ac12c1",
}
daily_revisit = api_instance.entry_data_list(payload)
daily_revisit_list = daily_revisit.get("data") # api请求格式,将数据封装在data字典里
abnormal_data = []
for index, row in data_JCB.iterrows():
try:
# 开户日是本地日期字符串,解析为 date 对象
open_date = datetime.strptime(str(row['开户日']), "%Y-%m-%d").date()
except (ValueError, TypeError):
continue # 跳过无效日期
if (
open_date < thirty_days_ago_local
and row['近30天开单天数'] == 0
and row['客户状态'] == "留存"
):
new_row = row.copy()
new_row["日期"] = open_date.strftime("%Y-%m-%d")
abnormal_data.append(new_row)
abnormal_data = pd.DataFrame(abnormal_data) if abnormal_data else pd.DataFrame()
output_path = os.path.join(output_dir, "异常待办.csv")
abnormal_data.to_csv(output_path, index=False)
if not abnormal_data.empty:
abnormal_data["表单类型"] = "异常待办"
abnormal_data["派发日期"] = current_date_str
# 清洗手机号(仅去除浮点型 .0
def clean_phone(x):
if pd.isna(x) or x == "" or x == "None":
return ""
s = str(x)
if s.endswith('.0') and s[:-2].isdigit():
return s[:-2]
return s
abnormal_data['联系手机号'] = abnormal_data['联系手机号'].apply(clean_phone)
# 构建云端已派发记录 DataFrame
df_cloud = pd.DataFrame([
{
"数据id": item.get("_id", ""),
"账号": item.get("_widget_1739258942667", ""),
"提交时间": item.get("createTime", ""),
"表单类型": item.get("_widget_1739951204545", "")
}
for item in daily_revisit_list
])
output_path = os.path.join(output_dir, "异常待办云端.csv")
df_cloud.to_csv(output_path, index=False)
recent_accounts = set()
if not df_cloud.empty and not abnormal_data.empty:
# 将 createTime 转为 UTC 时间(强制统一时区)
df_cloud["提交时间"] = pd.to_datetime(df_cloud["提交时间"], utc=True, errors="coerce")
df_cloud = df_cloud.dropna(subset=["提交时间"])
# 筛选“异常待办”
df_abnormal_cloud = df_cloud[df_cloud["表单类型"] == "异常待办"]
if not df_abnormal_cloud.empty:
# 每个账号保留最新一条
df_recent = df_abnormal_cloud.sort_values("提交时间").groupby("账号", as_index=False).tail(1)
current_utc = datetime.now(timezone.utc)
cutoff_utc = pd.Timestamp(current_utc) - pd.Timedelta(days=30)
# 安全比较:两边都是 UTC
recent_accounts = set(df_recent[df_recent["提交时间"] > cutoff_utc]["账号"])
# 剔除已派发账号 + 过滤有效手机号
if not abnormal_data.empty:
abnormal_data = abnormal_data[
(~abnormal_data["账号"].isin(recent_accounts)) &
(abnormal_data["联系手机号"].notna()) &
(abnormal_data["联系手机号"] != "") &
(abnormal_data["联系手机号"] != "None")
]
# # 保存结果
output_path = os.path.join(output_dir, "异常待办1.csv")
abnormal_data.to_csv(output_path, index=False)
-33
View File
@@ -1,33 +0,0 @@
from datetime import datetime
import os
from config import Config
import pandas as pd
from back_ground_module import CommonModule
from api import API
from log_config import configure_task_logger, configure_error_task_logger
logger = configure_task_logger()
error_task_logger = configure_error_task_logger()
output_dir = "output" # 设置输出目录
os.makedirs(output_dir, exist_ok=True)
common_module = CommonModule()
api_instance = API()
class YdToJDYRenewalToDo:
def __init__(self):
pass
def load_all_data(self):
"""
从各类来源加载数据上加载数据
:return:
"""
def main(self):
pass
if __name__ == '__main__':
yd_to_jd_renewal_to_do = YdToJDYRenewalToDo()
yd_to_jd_renewal_to_do.main()