简道云V2.0

This commit is contained in:
z66
2025-11-14 11:04:01 +08:00
parent 073f0646a1
commit 49fc75214f
33 changed files with 1811 additions and 4454 deletions
-186
View File
@@ -1,186 +0,0 @@
# API模块更新说明
## 更新内容
根据 `fastapi_app/api.py`(最新版)更新了 `fastapi_app/app/api.py`,主要更新包括:
### 1. 添加失败重试机制
所有API函数都已添加重试机制,参考 `entry_data_list` 的实现模式:
- **默认重试次数**:20次(部分函数为10次)
- **重试逻辑**
- 捕获 `requests.exceptions.RequestException` 异常
- 重试前等待时间:0.1秒(快速请求)或3-10秒(慢速请求)
- 超过最大重试次数后记录错误日志并返回None或抛出异常
### 2. 新增功能函数
添加了以下新函数(原版本中没有的):
- `entry_data_banch_update()` - 批量修改数据
- `entry_data_delete()` - 删除单条数据
- `entry_data_batch_delete()` - 批量删除数据
- `workflow_task_hand_over()` - 流程待办转交
- `get_upload_token()` - 获取文件上传凭证
- `upload_file()` - 上传文件
### 3. 功能增强
- **`entry_data_get()`**
- 添加 `replace` 参数(默认True,保持向后兼容)
- 添加 `max_retries` 参数
- 添加重试机制
- **`entry_data_list()`**
- 添加 `replace` 参数(默认True,保持向后兼容)
- 添加 `max_retries` 参数
- 改进重试逻辑,支持分页重试
- **`entry_widget_list()`**
- 添加 `max_retries` 参数
- 添加重试机制
- **`data_batch_create()`**
- 添加 `max_retries` 参数
- 添加重试机制
- 支持 `is_start_workflow``is_start_trigger``transaction_id` 参数
- **`entry_data_batch_create()`**
- 添加 `max_retries` 参数
- 添加重试机制
- 支持 `is_start_workflow``is_start_trigger` 参数
- 使用 `NpEncoder` 处理NumPy数据类型
- **`entry_data_update()`**
- 添加 `max_retries` 参数
- 添加重试机制
- **`workflow_instance_get()`**
- 添加 `max_retries` 参数
- 添加重试机制
- **`workflow_task_approve()`**
- 添加 `max_retries` 参数
- 添加重试机制
- `comment` 参数支持自定义(默认"自动转交")
### 4. 工具函数
- **`NpEncoder`**:JSON编码器,处理NumPy数据类型
- **`replace_decimals()`**:递归替换Decimal类型为float
### 5. 字段替换优化
`field_replacement()` 方法使用递归实现,更优雅地处理嵌套数据结构。
## 兼容性保证
### 向后兼容
1. **默认参数**
- `entry_data_get()``entry_data_list()``replace` 参数默认为 `True`,保持原有行为
- 所有新增的 `max_retries` 参数都有合理的默认值
2. **导入路径**
- 使用 `from app.config import Config`(而非 `from config import Config`
- 使用 `logging.getLogger('app')`(而非 `log_config`
3. **函数签名**
- 所有原有函数的调用方式保持不变
- 新增参数都是可选参数
### 日志系统
- 使用 `logging.getLogger('app')` 作为常规日志记录器
- 使用 `logging.getLogger('app.error')` 作为错误日志记录器
- 与 fastapi_app 项目的日志系统完全兼容
## 使用示例
### 基本使用(保持原有方式)
```python
from app.api import API
api = API()
# 获取单条数据(自动替换字段)
data = api.entry_data_get({
'api_key': 'xxx',
'entry_id': 'xxx',
'data_id': 'xxx'
})
# 获取多条数据(自动替换字段)
data_list = api.entry_data_list({
'api_key': 'xxx',
'entry_id': 'xxx'
})
```
### 使用新功能
```python
# 不替换字段
data = api.entry_data_get({
'api_key': 'xxx',
'entry_id': 'xxx',
'data_id': 'xxx'
}, replace=False)
# 自定义重试次数
data = api.entry_data_get({
'api_key': 'xxx',
'entry_id': 'xxx',
'data_id': 'xxx'
}, max_retries=10)
# 批量删除
result = api.entry_data_batch_delete({
'api_key': 'xxx',
'entry_id': 'xxx',
'data_ids': ['id1', 'id2', 'id3']
}, chunk_size=90, max_retries=20)
```
## 重试机制说明
### 重试策略
1. **快速请求**0.1秒延迟):
- `entry_data_list()` - 分页请求
- `entry_data_batch_create()` - 批量创建
- `entry_data_batch_delete()` - 批量删除
- `workflow_instance_get()` - 流程查询
2. **慢速请求**3-10秒延迟):
- `data_batch_create()` - 3秒延迟
- `entry_data_update()` - 10秒延迟
- `entry_data_banch_update()` - 10秒延迟
- `entry_data_delete()` - 10秒延迟
- `workflow_task_approve()` - 3秒延迟
- `workflow_task_hand_over()` - 3秒延迟
- `get_upload_token()` - 3秒延迟
- `upload_file()` - 3秒延迟
### 错误处理
- 所有重试失败的操作都会记录到错误日志
- 部分函数在重试失败后返回 `None`,部分会抛出异常
- 错误日志包含失败的任务标识信息
## 注意事项
1. **超时设置**:所有请求都设置了 `timeout=10`
2. **状态码检查**:使用 `res.raise_for_status()` 检查HTTP状态码
3. **文件上传**`upload_file()` 使用 `with` 语句确保文件正确关闭
4. **数据类型处理**:批量操作函数使用 `NpEncoder``replace_decimals()` 处理特殊数据类型
## 测试建议
1. 测试所有原有功能的兼容性
2. 测试新添加的函数
3. 测试重试机制(可以模拟网络错误)
4. 测试 `replace=False` 参数的使用
-2
View File
@@ -1,2 +0,0 @@
__all__ = []
-793
View File
@@ -1,793 +0,0 @@
"""
API 模块 - 简道云API接口封装
支持失败重试机制,兼容现有代码
"""
import requests
import json
import time
import logging
from typing import Optional, List, Dict, Any
from decimal import Decimal
import numpy as np
from app.config import Config
# 获取日志记录器
logger = logging.getLogger('app')
error_logger = logging.getLogger('app.error') # 错误日志记录器
class NpEncoder(json.JSONEncoder):
"""NumPy数据类型JSON编码器"""
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(NpEncoder, self).default(obj)
def replace_decimals(obj):
"""递归替换Decimal类型为float"""
if isinstance(obj, dict):
return {k: replace_decimals(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [replace_decimals(item) for item in obj]
elif isinstance(obj, Decimal):
return float(obj)
return obj
class API:
def entry_data_get(self, data: dict, replace: bool = True, max_retries: int = 20) -> Dict:
"""
获取单条表单数据
:param replace: 是否替换字段,默认为True(保持向后兼容)
:param max_retries: 最大重试次数
:param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息
:return: 表单数据
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/get'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data_id": data['data_id']
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
data_get = res.json()
print(data_get)
if replace:
data_get = self.field_replacement(data, data_get)
return data_get
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
if retries <= max_retries:
time.sleep(0.1)
if retries > max_retries:
error_logger.error(f"任务 {data.get('data_id')} 连续{max_retries}次请求失败,放弃此次请求。")
raise Exception(f"获取单条表单数据失败,已重试{max_retries}")
def entry_data_list(self, data: dict, replace: bool = True, max_retries: int = 20) -> Dict:
"""
获取多条表单数据
:param max_retries: 最大重试次数
:param replace: 是否替换字段,默认为True(保持向后兼容)
:param data: 简道云插件发送过来的data,包含应用id、表单id等信息
:return: 表单数据列表
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
all_data_batches = []
last_data_id = None
exit_flag = False
while True:
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"limit": 100,
"data_id": last_data_id
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
data_get = res.json()
if data_get.get("data"):
all_data_batches.extend(data_get['data'])
last_data_id = data_get['data'][-1].get('_id')
print(f"已获取 {len(all_data_batches)} 条数据")
break
else:
if 'data' not in data_get or len(data_get['data']) == 0:
exit_flag = True
break
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(0.1)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1)
if retries > max_retries:
error_logger.error(f"任务 {last_data_id}组 连续{max_retries}次请求失败,放弃此次请求。")
all_data_batches.append(None)
if exit_flag:
break
final_data = {
'data': all_data_batches
}
logger.info(f"获取了{len(all_data_batches)}条数据")
if replace:
print("进行了替换")
return self.field_replacement(data, final_data)
else:
return final_data
@staticmethod
def entry_widget_list(data: dict, max_retries: int = 20) -> Optional[Dict[str, Any]]:
"""
获取表单字段
:param max_retries: 最大重试次数
:param data: 简道云插件发送过来的data,包含应用id、表单id等信息
:return: 表单字段信息
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/widget/list'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
return res.json()
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
if retries <= max_retries:
time.sleep(0.1)
if retries > max_retries:
error_logger.error(f"获取表单字段失败,已重试{max_retries}")
return None
def field_replacement(self, data: dict, data_get: dict) -> dict:
"""
字段替换,将id替换为标签名,即唯一值替换为表单中显示字段的名字
:param data: 简道云插件发送过来的data,包含表单id、数据id、应用id
:param data_get: 简道云请求的数据,一般是根据数据id获取到表单的数据
:return: 替换后的数据
"""
widget_list = self.entry_widget_list(data)
if not widget_list or 'widgets' not in widget_list or not isinstance(widget_list['widgets'], list):
raise ValueError("映射表没有接受到数据")
name_to_label = {widget['name']: widget['label'] for widget in widget_list['widgets']}
def replace_keys(obj):
"""递归替换字典中的键名"""
if isinstance(obj, dict):
new_dict = {}
for key, value in obj.items():
new_key = name_to_label.get(key, key)
new_dict[new_key] = replace_keys(value)
return new_dict
elif isinstance(obj, list):
return [replace_keys(item) for item in obj]
else:
return obj
data_get_copy = json.loads(json.dumps(data_get))
if 'data' in data_get_copy:
data_get_copy['data'] = replace_keys(data_get_copy['data'])
return data_get_copy
@staticmethod
def data_batch_create(data: dict, max_retries: int = 20) -> Optional[Dict]:
"""
新建单条表单数据
:param max_retries: 最大重试次数
:param data: 应该包含应用id、表单id,以及新建的数据data['data']
:return: 返回创建后简道云返回的信息
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/create'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data": data['data'],
"is_start_workflow": data.get('is_start_workflow', "false"),
"is_start_trigger": data.get('is_start_trigger', "false"),
"transaction_id": data.get('transaction_id', "")
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
data_get = res.json()
if res.status_code == 200:
return data_get
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(3)
if retries > max_retries:
error_logger.error(f"任务 {data.get('data')} 连续{max_retries}次请求失败,放弃此次请求。")
return None
@staticmethod
def entry_data_batch_create(
data: dict,
chunk_size: int = 90,
max_retries: int = 20
) -> List[Optional[Dict]]:
"""
新建多条数据
:param max_retries: 最大重试次数
:param data: 应包含数据id、表单id、以及需要新建的信息,新建信息应该是一个列表
:param chunk_size: 简道云限制批量新建一次最多100条,这里默认值设置为90条一次
:return: 返回请求后的结果
"""
data = replace_decimals(data)
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_create'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
total_length = len(data['data_list'])
logger.info(f"多数据写入行数: {total_length}")
num_chunks = (total_length + chunk_size - 1) // chunk_size
data_get_list = []
for i in range(num_chunks):
start_index = i * chunk_size
end_index = min(start_index + chunk_size, total_length)
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data_list": data['data_list'][start_index:end_index],
"is_start_workflow": data.get('is_start_workflow', "false"),
"is_start_trigger": data.get('is_start_trigger', "false"),
}, cls=NpEncoder)
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
data_get = res.json()
if data_get.get("status") == "success":
data_get_list.append(data_get)
break
else:
logger.warning(f"请求异常,将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1)
if retries > max_retries:
error_logger.error(
f"任务 {data['data_list'][start_index:end_index]} 连续{max_retries}次请求失败,放弃此次请求。")
data_get_list.append(None)
return data_get_list
@staticmethod
def entry_data_update(data: dict, max_retries: int = 20) -> dict:
"""
修改数据
:param max_retries: 最大重试次数
:param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息
:return: 修改数据后简道云返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/update'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data_id": data['data_id'],
"data": data['data']
})
data_get = None
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
data_get = res.json()
if res.status_code == 200:
break
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(10)
if retries > max_retries:
error_logger.error(f"任务 {data.get('data_id')} 连续{max_retries}次请求失败,放弃此次请求。")
return data_get
@staticmethod
def entry_data_banch_update(data: dict, max_retries: int = 20, chunk_size: int = 90) -> List[dict]:
"""
批量修改数据
:param chunk_size: 批量修改块大小
:param max_retries: 最大重试次数
:param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息
:return: 修改数据后简道云返回的结果列表
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_update'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
total_length = len(data['data_ids'])
logger.info(f"多数据修改行数: {total_length}")
num_chunks = (total_length + chunk_size - 1) // chunk_size
data_get_list = []
for i in range(num_chunks):
start_index = i * chunk_size
end_index = min(start_index + chunk_size, total_length)
payload = {
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data_ids": data['data_ids'][start_index:end_index],
"data": data['data']
}
if "transaction_id" in data:
payload["transaction_id"] = data["transaction_id"]
payload = json.dumps(payload, cls=NpEncoder)
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
data_get = res.json()
if res.status_code == 200:
data_get_list.append(data_get)
break
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(10)
if retries > max_retries:
error_logger.error(f"任务 {data.get('data_ids')} 连续{max_retries}次请求失败,放弃此次请求。")
continue
return data_get_list
@staticmethod
def entry_data_delete(data: dict, max_retries: int = 20) -> dict:
"""
删除单条数据
:param data: 应包含应用ID、表单ID、数据ID
:param max_retries: 最大重试次数,默认20
:return: 删除结果
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/delete'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data_id": data['data_id'],
})
retries = 0
delete_status = None
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
delete_status = res.json()
# 手动处理状态码 4001(数据不存在)
if delete_status == {
"code": 4001,
"msg": "Data does not exist."
}:
logger.info(f"返回结果: {delete_status}")
break
res.raise_for_status()
if res.status_code == 200:
break
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(10)
if retries > max_retries:
error_logger.error(f"任务 {data.get('data_id')} 连续{max_retries}次请求失败,放弃此次请求。")
continue
return delete_status
@staticmethod
def entry_data_batch_delete(
data: dict,
chunk_size: int = 90,
max_retries: int = 20
) -> List[Optional[Dict]]:
"""
批量删除数据
:param data: 应包含应用ID、表单ID、数据ID列表
:param chunk_size: 单次删除最大条数,默认90
:param max_retries: 重试次数,默认20
:return: 删除结果列表
"""
data = replace_decimals(data)
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_delete'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
total_length = len(data['data_ids'])
logger.info(f"多数据删除行数: {total_length}")
num_chunks = (total_length + chunk_size - 1) // chunk_size
data_get_list = []
for i in range(num_chunks):
start_index = i * chunk_size
end_index = min(start_index + chunk_size, total_length)
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"data_ids": data['data_ids'][start_index:end_index],
}, cls=NpEncoder)
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
data_get = res.json()
logger.info(f"{i}页 返回结果: {data_get}")
if data_get.get("status") == "success":
data_get_list.append(data_get)
break
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1)
if retries > max_retries:
error_logger.error(
f"批量删除任务第{i+1}批 连续{max_retries}次请求失败,放弃此次请求。")
data_get_list.append(None)
return data_get_list
@staticmethod
def workflow_instance_get(data: dict, max_retries: int = 20) -> dict:
"""
查询实例流程信息
:param max_retries: 最大重试次数
:param data: 简道云插件发送过来的data,包含应用id
:return: 查询简道云流程实例信息返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v5/workflow/instance/get'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"instance_id": data['data_id'],
"tasks_type": 1
})
data_get = None
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
data_get = res.json()
if res.status_code == 200:
break
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1)
if retries > max_retries:
error_logger.error(f"任务 {data.get('data_id')} 连续{max_retries}次请求失败,放弃此次请求。")
return data_get
@staticmethod
def workflow_task_approve(data: dict, max_retries: int = 20) -> dict:
"""
流程待办提交
:param max_retries: 最大重试次数
:param data: 应包含username、instance_id(data_id)、task_id等信息
:return: 返回简道云流程待办提交的结果
"""
url = 'https://api.jiandaoyun.com/api/v1/workflow/task/approve'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"username": data["username"],
"instance_id": data["instance_id"],
"task_id": data['task_id'],
"comment": data.get("comment", "自动转交")
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
if res.status_code == 200:
return res.json()
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(3)
if retries > max_retries:
error_logger.error(f"任务 {data.get('task_id')} 连续{max_retries}次请求失败,放弃此次请求。")
return {}
@staticmethod
def workflow_task_hand_over(data: dict, max_retries: int = 10) -> Optional[dict]:
"""
流程待办转交
:param max_retries: 最大重试次数
:param data: 应包含username、instance_id(data_id)、task_id、transfer_username等信息
:return: 返回简道云流程待办转交的结果
"""
url = 'https://api.jiandaoyun.com/api/v1/workflow/task/transfer'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"username": data["username"],
"instance_id": data["instance_id"],
"task_id": data['task_id'],
"transfer_username": data['transfer_username'],
"comment": data.get("comment", "转交")
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
if res.status_code == 200:
return res.json()
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(3)
if retries > max_retries:
error_logger.error(f"任务转交失败,已重试{max_retries}")
return None
@staticmethod
def get_upload_token(data: dict, max_retries: int = 10) -> Optional[Dict[str, Any]]:
"""
获取文件上传凭证
:param max_retries: 最大重试次数
:param data: 应包含应用ID、表单ID、事务ID
:return: 返回upload_url、upload_token
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/file/get_upload_token'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
"transaction_id": data['transaction_id'],
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status()
res_j = res.json()
# 检查 token_and_url_list 是否存在且不为空
token_list = res_j.get('token_and_url_list', [])
if not token_list or len(token_list) == 0:
logger.warning(f"未获取到上传凭证列表,将重新请求")
retries += 1
time.sleep(3)
continue
token_item = token_list[0]
upload_url = token_item.get('url')
upload_token = token_item.get('token')
if not upload_url or not upload_token:
logger.warning(f"上传凭证信息不完整,将重新请求")
retries += 1
time.sleep(3)
continue
logger.info(f"返回结果: {upload_url}, {upload_token}")
if res.status_code == 200:
return {
'upload_url': upload_url,
'upload_token': upload_token
}
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except (requests.exceptions.RequestException, KeyError, IndexError, TypeError) as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(3)
if retries > max_retries:
error_logger.error(f"获取上传凭证失败,已重试{max_retries}")
return None
@staticmethod
def upload_file(data: dict, max_retries: int = 10) -> Optional[Any]:
"""
上传文件
:param max_retries: 最大重试次数
:param data: 应包含上传文件路径、上传文件url、上传文件token
:return: 返回上传文件结果
"""
url = data['upload_url']
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN,
}
file_path = data['file_path']
payload = {
"token": data['upload_token'],
}
retries = 0
result = None
while retries <= max_retries:
try:
with open(file_path, 'rb') as f:
files = {"file": f}
res = requests.post(url=url, data=payload, headers=headers, files=files, timeout=10)
res.raise_for_status()
data_get = res.json()
logger.info(f"返回结果: {data_get}")
if res.status_code == 200:
result = data_get
break
else:
logger.warning(f"请求异常, 将重新请求")
retries += 1
time.sleep(3)
except requests.exceptions.RequestException as e:
logger.warning(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(3)
if retries > max_retries:
error_logger.error(f"上传文件失败,已重试{max_retries}")
return result
+148
View File
@@ -0,0 +1,148 @@
"""
FastAPI 依赖注入模块
本模块提供 FastAPI 的依赖注入函数,用于在路由中注入可复用的依赖项。
使用依赖注入可以减少代码重复,提高可测试性和可维护性。
提供的依赖项:
- get_logger: 获取日志记录器
- get_app_tools: 获取应用工具实例
- get_f6_module: 获取 F6Module 实例
- get_f6_plugin_module: 获取 F6PluginModule 实例
- get_other_module: 获取 OtherPluginModule 实例
- get_action_map: 获取操作映射表
"""
from fastapi import Request
from typing import Optional
import logging
def get_logger(request: Request) -> logging.Logger:
"""
获取日志记录器依赖项
从应用状态中获取日志记录器,如果未初始化则返回一个基本的 logger。
Args:
request: FastAPI 请求对象
Returns:
logging.Logger: 日志记录器实例
"""
logger = getattr(request.app.state, 'logger', None)
if logger is None:
# 如果 logger 未初始化,返回一个基本的 logger
logger = logging.getLogger('app')
return logger
def get_app_tools(request: Request):
"""
获取应用工具实例依赖项
从应用状态中获取 AppTools 实例,用于任务队列管理等操作。
Args:
request: FastAPI 请求对象
Returns:
AppTools: 应用工具实例
Raises:
RuntimeError: 如果 AppTools 未初始化
"""
from app.utils.app_tools import AppTools
app_tools = getattr(request.app.state, 'app_tools', None)
if app_tools is None:
raise RuntimeError("AppTools 未初始化")
return app_tools
def get_f6_module(request: Request):
"""
获取 F6Module 实例依赖项
从应用状态中获取 F6Module 实例,用于 F6 系统相关操作。
Args:
request: FastAPI 请求对象
Returns:
F6Module: F6Module 实例
Raises:
RuntimeError: 如果 F6Module 未初始化
"""
f6_module = getattr(request.app.state, 'f6_module', None)
if f6_module is None:
raise RuntimeError("F6Module 未初始化")
return f6_module
def get_f6_plugin_module(request: Request):
"""
获取 F6PluginModule 实例依赖项
从应用状态中获取 F6PluginModule 实例,用于 F6 插件相关操作。
Args:
request: FastAPI 请求对象
Returns:
F6PluginModule: F6PluginModule 实例
Raises:
RuntimeError: 如果 F6PluginModule 未初始化
"""
f6_plugin_module = getattr(request.app.state, 'f6_plugin_module', None)
if f6_plugin_module is None:
raise RuntimeError("F6PluginModule 未初始化")
return f6_plugin_module
def get_other_module(request: Request):
"""
获取 OtherPluginModule 实例依赖项
从应用状态中获取 OtherPluginModule 实例,用于其他插件相关操作。
Args:
request: FastAPI 请求对象
Returns:
OtherPluginModule: OtherPluginModule 实例
Raises:
RuntimeError: 如果 OtherPluginModule 未初始化
"""
other_module = getattr(request.app.state, 'other_module', None)
if other_module is None:
raise RuntimeError("OtherPluginModule 未初始化")
return other_module
def get_action_map(request: Request) -> dict:
"""
获取操作映射表依赖项
从模块注册表中获取所有注册的操作,并转换为字典格式(操作名 -> 处理函数)。
Args:
request: FastAPI 请求对象
Returns:
dict: 操作映射表,格式为 {操作名: 处理函数}
"""
from app.core.module_registry import registry
# 从 registry 获取所有注册的操作
actions = registry.get_all_actions()
# 转换为字典格式(handler 函数)
action_map = {
action_name: config.handler
for action_name, config in actions.items()
}
return action_map
+171
View File
@@ -0,0 +1,171 @@
"""
API 路由定义模块
本模块定义所有 API 路由端点,包括:
- /health: 健康检查端点
- /webhook: Webhook 端点,处理简道云插件的请求
所有路由都使用 FastAPI 的依赖注入系统,通过 dependencies.py 中的函数注入依赖项。
"""
from fastapi import APIRouter, Request, HTTPException, status, Depends
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from typing import Dict, Any
import json
import anyio
import asyncio
import logging
from app.schemas import WebhookRequest, WebhookResponse, HealthResponse
from app.api.dependencies import (
get_logger,
get_app_tools,
get_f6_plugin_module,
get_action_map
)
from app.utils.app_tools import AppTools
# 创建路由器
# 使用 APIRouter 分离路由,便于管理和维护
router = APIRouter()
@router.get("/health", response_model=HealthResponse, tags=["系统"])
async def healthcheck():
"""
健康检查端点
用于检查服务是否正常运行
"""
return HealthResponse(status="ok", version="1.0.0")
@router.post("/webhook", response_model=WebhookResponse, tags=["业务"])
async def webhook(
request: Request,
logger: logging.Logger = Depends(get_logger),
app_tools: AppTools = Depends(get_app_tools),
f6_plugin_module = Depends(get_f6_plugin_module),
action_map: Dict[str, Any] = Depends(get_action_map)
):
"""
接受前端请求后将任务放入消息队列
此端点接收简道云插件的请求,根据请求头中的 Action 字段路由到相应的处理函数。
支持的操作包括:登录、获取公司信息、文件校验、品牌创建等。
Args:
request: FastAPI 请求对象,包含请求体和请求头
logger: 日志记录器
app_tools: 应用工具实例
f6_plugin_module: F6插件模块实例
action_map: 操作映射表
Returns:
WebhookResponse: 任务处理结果
Raises:
HTTPException: 当操作类型无效或任务执行超时时抛出
"""
try:
# 获取请求数据并验证
try:
raw_data = await request.json()
# 使用 Pydantic 进行数据验证(允许额外字段)
webhook_data = WebhookRequest(**raw_data)
data = webhook_data.dict(exclude_none=True)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="请求体必须是有效的 JSON 格式"
)
except Exception as e:
logger.warning(f"请求数据验证失败: {str(e)}")
# 如果验证失败,仍然尝试使用原始数据(向后兼容)
data = raw_data if 'raw_data' in locals() else {}
# 获取并解码请求头
header = request.headers
decoded_header = app_tools.decode_headers(header)
# 验证 Action 字段
action = decoded_header.get('Action')
if not action:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="请求头中缺少必需的 Action 字段"
)
# 处理 F6_Plugin 特殊逻辑
if action == 'F6_Plugin':
check = decoded_header.get('Check')
if check == '':
handler = f6_plugin_module.check_file
elif check == '':
sub_action = data.get('Action')
if not sub_action:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="F6_Plugin 操作需要提供 Action 字段"
)
handler = action_map.get(sub_action)
if not handler:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"未知的子操作类型: {sub_action}"
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"F6_Plugin 操作需要提供有效的 Check 字段(是/否),当前值: {check}"
)
else:
handler = action_map.get(action)
if not handler:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"未知的操作类型: {action}。支持的操作: {', '.join(action_map.keys())}"
)
logger.info(f"接收到操作请求: {action}, 数据ID: {data.get('data_id', 'N/A')}")
# 将任务放入消息队列
response_queue = app_tools.enqueue_task(handler, data)
# 等待任务处理结果(添加超时保护,简道云默认60秒)
try:
# 使用 asyncio.wait_for 添加超时
result = await asyncio.wait_for(
anyio.to_thread.run_sync(response_queue.get),
timeout=55.0
)
except asyncio.TimeoutError:
logger.error(f"任务执行超时: {action}, 数据ID: {data.get('data_id', 'N/A')}")
raise HTTPException(
status_code=status.HTTP_504_GATEWAY_TIMEOUT,
detail="任务执行超时,请稍后重试"
)
# 验证返回结果格式
if not isinstance(result, dict):
result = {"msg": str(result)}
if "msg" not in result:
result["msg"] = "操作完成"
logger.info(f"操作完成: {action}, 结果: {json.dumps(result, ensure_ascii=False)}")
# 返回响应(使用 Pydantic 模型验证)
return WebhookResponse(**result)
except HTTPException:
# 重新抛出 HTTP 异常
raise
except Exception as e:
# 捕获其他未预期的异常
logger.error(f"处理请求时发生未预期的错误: {type(e).__name__} - {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"处理请求时发生错误: {str(e)}"
)
-24
View File
@@ -1,24 +0,0 @@
"""
后台任务模块 - 向后兼容入口
此文件保持向后兼容,实际功能已拆分到 app.tasks 模块中
"""
# 从新的 tasks 模块导入所有函数,保持向后兼容
from app.tasks import (
update_jiandaoyun,
approve_workflow,
create_brand_background,
delete_history_background,
delete_customer_background,
delete_car_background,
modify_customer_info_background,
)
__all__ = [
'update_jiandaoyun',
'approve_workflow',
'create_brand_background',
'delete_history_background',
'delete_customer_background',
'delete_car_background',
'modify_customer_info_background',
]
+31 -1
View File
@@ -1,29 +1,59 @@
"""
应用配置模块
本模块负责管理应用的所有配置项,包括:
- 目录路径配置
- API Token 配置
- 日志配置
注意:生产环境建议将敏感信息(如 API Token)移至环境变量。
"""
from pathlib import Path
# 获取当前文件所在的目录
# 当前文件位于 app/config.pyparent.parent 获取项目根目录
BASE_DIR = Path(__file__).resolve().parent.parent # 项目根目录
# 构建保存下载文件的目录路径
# 用于存储从简道云下载的文件
SAVE_DIRECTORY = BASE_DIR / '下载文件'
# 构建保存模板文件的目录路径
# 用于存储模板文件
MODE_DIRECTORY = BASE_DIR / '模板文件'
# 构建日志文件的目录路径
# 用于存储应用日志文件
LOGS_DIRECTORY = BASE_DIR / 'logs'
# 日志文件路径
LOG_FILE = LOGS_DIRECTORY / '简道云.log'
# 确保目录存在,如果不存在则创建
# 在应用启动时自动创建必要的目录
SAVE_DIRECTORY.mkdir(parents=True, exist_ok=True)
MODE_DIRECTORY.mkdir(parents=True, exist_ok=True)
# API 配置
# 简道云 API Token,用于调用简道云 API
# 注意:生产环境建议使用环境变量管理此配置
JIANDAOYUN_API_TOKEN = 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN' # 曹伟应用api测试 app_key
# 导出配置
class Config:
"""
应用配置类
统一管理应用的所有配置项,方便在应用中使用。
属性:
BASE_DIR: 项目根目录路径
SAVE_DIRECTORY: 下载文件保存目录
MODE_DIRECTORY: 模板文件保存目录
JIANDAOYUN_API_TOKEN: 简道云 API Token
LOGS_DIRECTORY: 日志文件目录
LOG_FILE: 日志文件路径
"""
BASE_DIR = BASE_DIR
SAVE_DIRECTORY = SAVE_DIRECTORY
MODE_DIRECTORY = MODE_DIRECTORY
+16 -5
View File
@@ -1,17 +1,28 @@
"""
核心模块初始化
统一初始化请求头管理器、模块注册表等核心组件
本模块统一初始化和管理核心组件,包括:
- ModuleRegistry: 模块注册表
- CoreManager: 核心管理器
提供统一的接口来管理这些核心组件。
"""
from typing import Dict, Any, Callable
from app.core.header_manager import HeaderManager
from app.core.module_registry import ModuleRegistry, registry
class CoreManager:
"""核心管理器 - 统一管理所有核心组件"""
"""
核心管理器
统一管理所有核心组件,提供便捷的方法来初始化和注册模块。
属性:
registry: 模块注册表实例
"""
def __init__(self):
self.header_manager = HeaderManager
"""初始化核心管理器"""
self.registry = registry
def initialize_modules(self, modules: Dict[str, Any]):
@@ -49,12 +60,12 @@ class CoreManager:
# 全局核心管理器实例
# 在应用启动时使用此实例来注册模块和操作
core_manager = CoreManager()
# 导出常用类和函数
__all__ = [
'core_manager',
'HeaderManager',
'ModuleRegistry',
'registry',
]
-118
View File
@@ -1,118 +0,0 @@
"""
请求头管理器
统一管理不同模块的请求头配置
"""
from typing import Dict, Optional
from dataclasses import dataclass, field
@dataclass
class HeaderConfig:
"""请求头配置"""
referer: Optional[str] = None
user_agent: Optional[str] = None
content_type: Optional[str] = None
authorization: Optional[str] = None
custom_headers: Dict[str, str] = field(default_factory=dict)
def to_dict(self) -> Dict[str, str]:
"""转换为字典格式"""
headers = {}
if self.referer:
headers['Referer'] = self.referer
if self.user_agent:
headers['User-Agent'] = self.user_agent
if self.content_type:
headers['Content-Type'] = self.content_type
if self.authorization:
headers['Authorization'] = self.authorization
# 添加自定义请求头
headers.update(self.custom_headers)
return headers
class HeaderManager:
"""请求头管理器"""
# 默认请求头配置
DEFAULT_HEADERS = HeaderConfig(
referer='https://yunxiu.f6car.cn/erp/view/index.html',
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0'
)
# F6系统登录请求头
F6_LOGIN_HEADERS = HeaderConfig(
referer='https://yunxiu.f6car.com/kzf6/login/confirm',
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/129.0.0.0'
)
# 简道云API请求头
JIANDAOYUN_API_HEADERS = HeaderConfig(
content_type='application/json',
# authorization 应该从配置中获取,这里只是示例
)
# 模块特定的请求头配置
_module_headers: Dict[str, HeaderConfig] = {
'default': DEFAULT_HEADERS,
'f6_login': F6_LOGIN_HEADERS,
'jiandaoyun_api': JIANDAOYUN_API_HEADERS,
}
@classmethod
def get_headers(cls, module_name: str = 'default', **overrides) -> Dict[str, str]:
"""
获取指定模块的请求头
Args:
module_name: 模块名称
**overrides: 覆盖的请求头配置
Returns:
请求头字典
"""
# 获取基础配置
config = cls._module_headers.get(module_name, cls.DEFAULT_HEADERS)
# 创建配置副本
header_config = HeaderConfig(
referer=overrides.get('referer', config.referer),
user_agent=overrides.get('user_agent', config.user_agent),
content_type=overrides.get('content_type', config.content_type),
authorization=overrides.get('authorization', config.authorization),
custom_headers={**config.custom_headers, **overrides.get('custom_headers', {})}
)
return header_config.to_dict()
@classmethod
def register_module_headers(cls, module_name: str, config: HeaderConfig):
"""
注册模块的请求头配置
Args:
module_name: 模块名称
config: 请求头配置
"""
cls._module_headers[module_name] = config
@classmethod
def update_module_headers(cls, module_name: str, **updates):
"""
更新模块的请求头配置
Args:
module_name: 模块名称
**updates: 要更新的配置项
"""
if module_name in cls._module_headers:
config = cls._module_headers[module_name]
for key, value in updates.items():
if hasattr(config, key):
setattr(config, key, value)
else:
config.custom_headers[key] = value
+38 -5
View File
@@ -1,6 +1,12 @@
"""
模块注册和路由管理
提供统一的模块注册机制,方便添加新功能模块
模块注册表模块
本模块提供统一的模块注册机制,用于管理所有业务模块和操作。
使用注册表模式可以:
- 统一管理所有操作
- 方便添加新功能模块
- 支持动态路由和操作查找
- 提供操作元数据管理
"""
from typing import Dict, Callable, Optional, Any
from dataclasses import dataclass
@@ -8,7 +14,18 @@ from dataclasses import dataclass
@dataclass
class ActionConfig:
"""操作配置"""
"""
操作配置数据类
存储操作的配置信息,包括处理函数、所属模块、描述等。
属性:
handler: 处理函数,执行具体业务逻辑
module_name: 所属模块名称
description: 操作描述,用于文档和日志
requires_auth: 是否需要认证,默认 True
header_module: 使用的请求头模块名称,可选
"""
handler: Callable # 处理函数
module_name: str # 所属模块名称
description: Optional[str] = None # 描述
@@ -17,9 +34,19 @@ class ActionConfig:
class ModuleRegistry:
"""模块注册表"""
"""
模块注册表类
统一管理所有业务模块和操作的注册表。
支持注册模块实例和操作,并提供查询功能。
属性:
_actions: 操作字典,格式为 {操作名: ActionConfig}
_modules: 模块字典,格式为 {模块名: 模块信息}
"""
def __init__(self):
"""初始化模块注册表"""
self._actions: Dict[str, ActionConfig] = {}
self._modules: Dict[str, Dict[str, Any]] = {}
@@ -65,7 +92,12 @@ class ModuleRegistry:
return self._actions.get(action_name)
def get_all_actions(self) -> Dict[str, ActionConfig]:
"""获取所有注册的操作"""
"""
获取所有注册的操作
Returns:
Dict[str, ActionConfig]: 所有注册的操作字典的副本
"""
return self._actions.copy()
def register_module(self, module_name: str, module_instance: Any, **metadata):
@@ -113,4 +145,5 @@ class ModuleRegistry:
# 全局模块注册表实例
# 在应用启动时使用此实例来注册所有模块和操作
registry = ModuleRegistry()
+154 -13
View File
@@ -1,26 +1,57 @@
"""
F6 插件模块
本模块提供 F6 插件相关的功能,包括:
- 文件上传和校验
- 品牌批量创建
- 历史记录删除
- 客户信息管理
- 车辆信息管理
依赖:
- requests: HTTP 请求
- pandas: Excel 文件处理
- threading: 后台任务处理
"""
import requests
from urllib.parse import quote
import pandas as pd
import os
import urllib.parse
from datetime import datetime
from app.api import API
from typing import Optional, Dict, Any, Tuple
from app.config import Config
from app.module import F6Module
import threading
from app import back_ground_tasks
from app.api import API
from app.config import Config
from app.module.module import F6Module
from app.tasks.brand_tasks import create_brand_background
from app.tasks.delete_tasks import (
delete_history_background,
delete_customer_background,
delete_car_background
)
from app.tasks.customer_tasks import modify_customer_info_background
from app.tasks.bi_tasks import bi_task_background
# 简道云 API 实例,用于调用简道云 API
api_instance = API()
class F6PluginModule:
"""
F6 插件模块类
提供 F6 插件相关的所有功能,包括文件处理、品牌管理、数据删除等。
"""
@staticmethod
def accept_file(data: Dict[str, Any]) -> Tuple[Optional[str], Dict[str, Any]]: # 接收文件
def accept_file(data: Dict[str, Any]) -> Tuple[Optional[str], Dict[str, Any]]:
"""
接收文件
接收文件
处理前端上传的文件,下载文件并保存到指定目录。
此方法用于处理前端上传的文件,下载文件并保存到指定目录。主要步骤包括:
1. 处理前端传递的数据,获取文件的URL。
2. 解析URL以获取文件名。
@@ -147,7 +178,19 @@ class F6PluginModule:
@staticmethod
def create_brand(data: Dict[str, Any]) -> Dict[str, str]: # 创建品牌
def create_brand(data: Dict[str, Any]) -> Dict[str, str]:
"""
创建品牌
从简道云获取品牌创建请求,读取 Excel 文件,并在后台线程中批量创建品牌。
立即返回"正在执行"的提示,实际创建在后台线程中执行。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
Returns:
Dict[str, str]: 包含执行状态的字典,{'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
"""
entry_data = api_instance.entry_data_get(data=data)
print('执行 品牌批量新建')
username = entry_data['data']['账号']
@@ -167,7 +210,7 @@ class F6PluginModule:
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
try:
thread = threading.Thread(target=back_ground_tasks.create_brand_background,
thread = threading.Thread(target=create_brand_background,
args=(data, cookies, df, save_path))
thread.start()
except Exception as e:
@@ -177,6 +220,18 @@ class F6PluginModule:
@staticmethod
def delete_history(data: Dict[str, Any]) -> Dict[str, str]:
"""
删除历史记录
从简道云获取删除历史记录请求,在后台线程中删除指定门店的历史维修记录。
立即返回"正在执行中"的提示,实际删除在后台线程中执行。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
Returns:
Dict[str, str]: 包含执行状态的字典
"""
entry_data = api_instance.entry_data_get(data=data)
username = entry_data['data']['账号']
password = entry_data['data']['密码']
@@ -202,7 +257,7 @@ class F6PluginModule:
org_id = org['orgId']
if org_id:
thread = threading.Thread(target=back_ground_tasks.delete_history_background,
thread = threading.Thread(target=delete_history_background,
args=(data, cookies, org_id, org_name1))
thread.start()
return {'msg': '正在执行中', 'msg_details': '请稍后查看结果'}
@@ -211,6 +266,18 @@ class F6PluginModule:
@staticmethod
def delete_customer(data):
"""
删除客户
从简道云获取删除客户请求,在后台线程中批量删除客户信息。
立即返回"正在执行中"的提示,实际删除在后台线程中执行。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
Returns:
Dict[str, str]: 包含执行状态的字典
"""
entry_data = api_instance.entry_data_get(data=data)
username = entry_data['data']['账号']
password = entry_data['data']['密码']
@@ -225,7 +292,7 @@ class F6PluginModule:
json = res.json()
if json:
thread = threading.Thread(target=back_ground_tasks.delete_customer_background,
thread = threading.Thread(target=delete_customer_background,
args=(data, cookies, json['data']['data'],))
thread.start()
return {'msg': '正在执行中', 'msg_details': '8-20点3.5s一条数据,其余时间1.5s一条数据'}
@@ -236,6 +303,18 @@ class F6PluginModule:
@staticmethod
def delete_cars(data):
"""
删除车辆
从简道云获取删除车辆请求,在后台线程中批量删除客户车辆信息。
立即返回"正在执行中"的提示,实际删除在后台线程中执行。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
Returns:
Dict[str, str]: 包含执行状态的字典
"""
entry_data = api_instance.entry_data_get(data=data)
username = entry_data['data']['账号']
password = entry_data['data']['密码']
@@ -259,7 +338,7 @@ class F6PluginModule:
all_page = total_items // 100 + (total_items % 100 > 0)
if res_data:
thread = threading.Thread(target=back_ground_tasks.delete_car_background,
thread = threading.Thread(target=delete_car_background,
args=(data, url, cookies, header, all_page))
thread.start()
return {'msg': '正在执行中', 'msg_details': '8-20点3.5s一条数据,其余时间1.5s一条数据'}
@@ -270,6 +349,18 @@ class F6PluginModule:
return {'msg': '未执行', 'msg_details': '登录失败'}
def modify_customer_info(self, data: Dict[str, str]):
"""
修改客户信息
从简道云获取修改客户信息请求,读取 Excel 文件,并在后台线程中批量修改客户信息。
立即返回"正在执行中"的提示,实际修改在后台线程中执行。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
Returns:
Dict[str, str]: 包含执行状态的字典
"""
entry_data = api_instance.entry_data_get(data=data)
username = entry_data['data']['账号']
password = entry_data['data']['密码']
@@ -288,11 +379,61 @@ class F6PluginModule:
return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'}
if cookies:
thread = threading.Thread(target=back_ground_tasks.modify_customer_info_background,
thread = threading.Thread(target=modify_customer_info_background,
args=(data, cookies, df, save_path))
thread.start()
return {'msg': '正在执行中', 'msg_details': '请稍后查看结果'}
else:
return {'msg': '未执行', 'msg_details': 'cookies获取失败'}
@staticmethod
def bi_task(data: Dict[str, Any]) -> Dict[str, str]:
"""
BI任务
从简道云获取BI任务请求,读取 Excel 文件(如果需要),并在后台线程中执行BI任务。
立即返回"正在执行"的提示,实际执行在后台线程中完成。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
Returns:
Dict[str, str]: 包含执行状态的字典,{'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
"""
entry_data = api_instance.entry_data_get(data=data)
print('执行 BI任务')
# 获取必要的参数(根据实际需求调整)
username = entry_data['data'].get('账号')
password = entry_data['data'].get('密码')
company_name = entry_data['data'].get('公司名称')
save_path = entry_data['data'].get('文件保存地址')
# 如果需要登录F6系统
cookies = None
if username and password and company_name:
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
return {'msg': '登录失败', 'msg_details': '无法登录F6系统'}
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
# 如果需要读取Excel文件
df = None
if save_path:
try:
df = pd.read_excel(save_path, sheet_name=0, dtype='string')
except Exception as e:
return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'}
# 启动后台线程执行BI任务
try:
thread = threading.Thread(target=bi_task_background,
args=(data, cookies, df, save_path))
thread.start()
except Exception as e:
print(f'创建线程失败: {str(e)}')
return {'msg': '任务启动失败', 'msg_details': f'无法启动后台任务: {str(e)}'}
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
+8
View File
@@ -1,2 +1,10 @@
"""
业务模块包
本包包含所有业务模块,包括:
- module.py: F6Module - F6系统相关功能
- f6_plugin_module.py: F6PluginModule - F6插件功能
- other_module.py: OtherPluginModule - 其他功能模块
"""
__all__ = []
+96 -1
View File
@@ -1,3 +1,18 @@
"""
F6 系统模块
本模块提供 F6 系统相关的功能,包括:
- 登录和认证
- 验证码识别
- 公司信息获取
- 门店信息获取
- 保持连接
依赖:
- requests: HTTP 请求
- PIL: 图像处理
- pytesseract: OCR 识别
"""
import requests
import hashlib
from urllib.parse import quote
@@ -7,15 +22,33 @@ from typing import Optional, Dict, AnyStr
from PIL import Image, ImageEnhance
import pytesseract
import logging
from datetime import datetime
# 简道云 API 实例,用于调用简道云 API
api_instance = API()
# 日志记录器
logger = logging.getLogger('app')
class F6Module:
"""
F6 系统模块类
提供 F6 系统相关的所有功能,包括登录、信息获取等。
"""
@staticmethod
def get_captcha() -> AnyStr:
"""
获取并识别验证码
从 F6 系统获取验证码图片,使用 OCR 识别验证码文本。
Returns:
AnyStr: 识别出的验证码文本
注意:
需要系统安装 Tesseract OCR 才能正常工作
"""
captcha_url = 'https://yunxiu.f6car.cn/kzf6/login/captcha-image'
response = requests.get(captcha_url)
with open('captcha.png', 'wb') as f:
@@ -37,6 +70,23 @@ class F6Module:
@staticmethod
def login_in(username: str, password: str, company_name: str = '默认门店',) -> Optional[requests.Response]:
"""
F6 系统登录
使用用户名和密码登录 F6 系统,并选择指定的公司。
如果触发验证码,会自动识别并重试登录。
Args:
username: 用户名
password: 密码(明文,方法内部会进行 MD5 加密)
company_name: 公司名称,默认为'默认门店'
Returns:
Optional[requests.Response]: 登录响应对象,登录失败返回 None
注意:
密码会在方法内部进行 MD5 加密处理
"""
url = "https://yunxiu.f6car.com/kzf6/login/confirm"
session = requests.Session()
header = {
@@ -82,6 +132,17 @@ class F6Module:
return None
def accept_login_message(self, data: Dict[str, str]) -> Dict[str, str]:
"""
接受登录消息并处理
处理简道云插件发送的登录请求,执行登录并返回结果。
Args:
data: 包含用户名、密码、公司名称的字典
Returns:
Dict[str, str]: 登录结果,包含状态信息
"""
username = data['username']
password = data['password']
company_name = data['company_name']
@@ -110,6 +171,17 @@ class F6Module:
return {"status": "登录失败,请检查公司名称"}
def get_company_information(self, data: Dict[str, str]) -> Dict[str, str]:
"""
获取公司信息
根据用户名和密码获取 F6 系统中的公司信息,并将结果保存到简道云。
Args:
data: 包含用户名、密码的字典
Returns:
Dict[str, str]: 包含时间戳的消息,用于后续查询
"""
username = data['username']
password = data['password']
timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
@@ -173,6 +245,18 @@ class F6Module:
return res
def get_store_information(self, data: Dict[str, str]) -> Dict[str, dict[str, str]]:
"""
获取门店信息
根据用户名、密码和公司名称获取 F6 系统中的门店信息,
包括门店列表、客户车辆数量、客户数量等。
Args:
data: 包含用户名、密码、公司名称的字典
Returns:
Dict[str, dict[str, str]]: 包含时间戳、门店信息、统计数据的结果
"""
username = data['username']
password = data['password']
company_name = data['company_name']
@@ -221,6 +305,17 @@ class F6Module:
@staticmethod
def get_keep_heart(data: Dict[str, str]) -> Dict[str, str]:
"""
保持连接
用于保持连接的心跳检测,直接返回接收到的数据。
Args:
data: 接收到的数据字典
Returns:
Dict[str, str]: 原样返回接收到的数据
"""
return data
+18 -14
View File
@@ -1,22 +1,26 @@
import requests
from urllib.parse import quote
import pandas as pd
import os
import urllib.parse
from datetime import datetime
from app.api import API
from typing import Optional, Dict, Any, Tuple
from app.config import Config
from app.module import F6Module
import threading
from app import back_ground_tasks
"""
其他插件模块
api_instance = API()
本模块提供其他插件相关的功能,目前包括短信签名状态查询等功能。
"""
class OtherPluginModule:
"""
其他插件模块类
提供其他插件相关的功能,如短信签名状态等。
"""
def sms_signature_status(self):
"""
短信签名状态
查询短信签名状态(待实现)。
Returns:
待实现
"""
pass
+77
View File
@@ -0,0 +1,77 @@
"""
Pydantic 数据模型定义
用于 FastAPI 请求和响应的数据验证
"""
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field
from enum import Enum
class ActionType(str, Enum):
"""支持的操作类型枚举"""
LOGIN_IN = "login_in"
GET_COMPANY_INFORMATION = "get_company_information"
GET_STORE_INFORMATION = "get_store_information"
KEEP_ALIVE = "keep_alive"
CHECK_FILE = "check_file"
CREATE_BRAND = "create_brand"
DELETE_HISTORY = "delete_history"
DELETE_CUSTOMER = "delete_customer"
DELETE_CARS = "delete_cars"
SMS_SIGNATURE_STATUS = "sms_signature_status"
MODIFY_CUSTOMER_INFO = "modify_customer_info"
F6_PLUGIN = "F6_Plugin"
class WebhookRequest(BaseModel):
"""Webhook 请求体数据模型"""
# 通用字段
api_key: Optional[str] = Field(None, description="简道云应用ID")
entry_id: Optional[str] = Field(None, description="简道云表单ID")
data_id: Optional[str] = Field(None, description="简道云数据ID")
Action: Optional[str] = Field(None, description="操作类型")
# 登录相关字段
username: Optional[str] = Field(None, description="用户名")
password: Optional[str] = Field(None, description="密码")
company_name: Optional[str] = Field(None, description="公司名称")
# 文件相关字段
file_path: Optional[str] = Field(None, description="文件保存路径")
# 其他字段(允许任意额外字段)
class Config:
extra = "allow" # 允许额外字段,因为简道云可能传递其他字段
class WebhookHeader(BaseModel):
"""Webhook 请求头数据模型"""
Action: Optional[str] = Field(None, description="操作类型")
Check: Optional[str] = Field(None, description="检查标志(是/否)")
class Config:
extra = "allow" # 允许额外请求头
class WebhookResponse(BaseModel):
"""Webhook 响应数据模型"""
msg: str = Field(..., description="响应消息")
msg_details: Optional[str] = Field(None, description="详细信息")
check: Optional[str] = Field(None, description="检查结果")
status: Optional[str] = Field(None, description="状态")
class Config:
extra = "allow" # 允许额外字段
class HealthResponse(BaseModel):
"""健康检查响应模型"""
status: str = Field("ok", description="服务状态")
version: Optional[str] = Field(None, description="服务版本")
class ErrorResponse(BaseModel):
"""错误响应模型"""
detail: str = Field(..., description="错误详情")
error_code: Optional[str] = Field(None, description="错误代码")
-89
View File
@@ -1,89 +0,0 @@
# 后台任务模块结构说明
## 模块结构
后台任务已按功能拆分为以下模块:
```
app/tasks/
├── __init__.py # 统一导出入口
├── common.py # 通用功能模块(简道云表单更新、工作流审批)
├── brand_tasks.py # 品牌相关任务
├── delete_tasks.py # 删除相关任务
└── customer_tasks.py # 客户相关任务
```
## 模块说明
### common.py - 通用功能模块
包含所有任务共用的功能:
- `update_jiandaoyun()` - 更新简道云表单
- `approve_workflow()` - 工作流审批
### brand_tasks.py - 品牌任务模块
品牌相关的后台任务:
- `create_brand_background()` - 品牌批量创建
### delete_tasks.py - 删除任务模块
删除相关的后台任务:
- `delete_history_background()` - 删除历史维修记录
- `delete_customer_background()` - 删除客户信息
- `delete_car_background()` - 删除客户车辆信息
### customer_tasks.py - 客户任务模块
客户相关的后台任务:
- `modify_customer_info_background()` - 修改客户信息
## 向后兼容
原有的 `app.back_ground_tasks` 模块仍然可用,它现在作为向后兼容的入口,实际功能已拆分到 `app.tasks` 模块中。
## 添加新功能模块
如需添加新的功能模块,请按以下步骤:
1.`app/tasks/` 目录下创建新的模块文件,例如 `new_feature_tasks.py`
2. 在新模块中实现相关功能函数
3.`app/tasks/__init__.py` 中导入并导出新函数
4.`app/back_ground_tasks.py` 中导入新函数以保持向后兼容
示例:
```python
# app/tasks/new_feature_tasks.py
from app.tasks.common import update_jiandaoyun, approve_workflow
def new_feature_background(data, cookies):
# 实现新功能
result = "执行结果"
msg = update_jiandaoyun(data, result)
if msg.get('msg'):
approve_workflow(data)
```
```python
# app/tasks/__init__.py 中添加
from app.tasks.new_feature_tasks import new_feature_background
__all__ = [
# ... 其他函数
'new_feature_background',
]
```
## 使用方式
### 方式一:使用新的模块结构(推荐)
```python
from app.tasks import create_brand_background
from app.tasks.brand_tasks import create_brand_background # 也可以直接导入
```
### 方式二:使用向后兼容的导入方式
```python
from app import back_ground_tasks
back_ground_tasks.create_brand_background(...)
```
两种方式都可以正常工作,代码执行逻辑完全一致。
+14 -1
View File
@@ -1,6 +1,14 @@
"""
后台任务模块统一导出入口
保持向后兼容,所有原有导入方式仍然有效
本模块统一导出所有后台任务函数,保持向后兼容。
所有原有导入方式仍然有效。
导出的任务包括:
- 通用功能: update_jiandaoyun, approve_workflow
- 品牌任务: create_brand_background
- 删除任务: delete_history_background, delete_customer_background, delete_car_background
- 客户任务: modify_customer_info_background
"""
# 通用功能
from app.tasks.common import update_jiandaoyun, approve_workflow
@@ -18,6 +26,9 @@ from app.tasks.delete_tasks import (
# 客户相关任务
from app.tasks.customer_tasks import modify_customer_info_background
# BI相关任务
from app.tasks.bi_tasks import bi_task_background
__all__ = [
# 通用功能
'update_jiandaoyun',
@@ -30,5 +41,7 @@ __all__ = [
'delete_car_background',
# 客户任务
'modify_customer_info_background',
# BI任务
'bi_task_background',
]
+86
View File
@@ -0,0 +1,86 @@
"""
BI相关后台任务模块
本模块包含BI相关的后台任务,包括:
- BI数据处理
- BI报表生成
这些任务在后台线程中执行,不会阻塞主请求。
执行完成后会更新简道云表单并自动提交工作流。
"""
import logging
import os
import requests
import pandas as pd
from typing import Dict, Any
from tqdm import tqdm
from app.tasks.common import update_jiandaoyun, approve_workflow
logger = logging.getLogger('app')
def bi_task_background(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame = None, save_path: str = None):
"""
BI任务后台执行函数
在后台线程中执行BI相关任务,如数据处理、报表生成等。
执行完成后会更新简道云表单并自动提交工作流。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
cookies: 用户登录 F6 系统的 cookies 信息(如果需要)
df: Excel 文件读取的内容,DataFrame 格式(如果需要)
save_path: Excel 文件保存的地址,执行完成后会删除此文件(如果需要)
Returns:
None
注意:
- 这是一个示例函数,需要根据实际BI任务需求进行实现
- 执行完成后会自动删除上传的文件(如果提供了save_path)
- 执行结果会更新到简道云表单
"""
try:
# TODO: 在这里实现具体的BI任务逻辑
# 例如:数据处理、报表生成、数据同步等
# 示例:处理数据
results = []
if df is not None:
df = df.where(pd.notnull(df), None)
for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="处理BI数据"):
# TODO: 实现具体的数据处理逻辑
# 例如:调用BI API、生成报表、数据转换等
result_item = {
'行号': index + 1,
'状态': '处理成功'
}
results.append(result_item)
else:
# 如果没有DataFrame,执行其他BI任务
# TODO: 实现其他BI任务逻辑
results.append({'状态': 'BI任务执行成功'})
# 删除文件(如果提供了save_path)
if save_path and os.path.exists(save_path):
os.remove(save_path)
logger.info(f'{save_path}已删除')
# 格式化结果
results_str = f'{results}' if results else 'BI任务执行完成'
logger.info(f"BI任务执行结果: {results_str}")
# 调用api回写改掉 执行明细与执行状态
msg = update_jiandaoyun(data, results_str)
if msg.get('msg'):
approve_workflow(data)
logger.info('表单已自动提交至下一步')
except Exception as e:
error_msg = f'BI任务执行失败: {str(e)}'
logger.error(error_msg, exc_info=True)
msg = update_jiandaoyun(data, error_msg)
if msg.get('msg'):
approve_workflow(data)
+23 -7
View File
@@ -1,6 +1,10 @@
"""
品牌相关后台任务模块
包含品牌批量创建等功能
本模块包含品牌相关的后台任务,包括:
- 品牌批量创建
这些任务在后台线程中执行,不会阻塞主请求。
"""
import logging
import os
@@ -15,12 +19,24 @@ logger = logging.getLogger('app')
def create_brand_background(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str):
"""
品牌批量创建后台运行函数
:param data: 包含表单id、数据id等的字典
:param cookies: 用户登录f6系统的cookies信息
:param df: 表格读取到的内容,DataFrame格式
:param save_path: 文件保存的地址
:return: None
品牌批量创建后台任务
在后台线程中批量创建品牌,从 Excel 文件中读取品牌名称并创建。
执行完成后会更新简道云表单并自动提交工作流。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
cookies: 用户登录 F6 系统的 cookies 信息
df: Excel 文件读取的内容,DataFrame 格式,第一列为品牌名称
save_path: Excel 文件保存的地址,执行完成后会删除此文件
Returns:
None
注意:
- 无效的品牌名(None、空字符串)会被跳过
- 执行完成后会自动删除上传的文件
- 执行结果会更新到简道云表单
"""
df = df.where(pd.notnull(df), None)
# 定义请求URL
+136 -7
View File
@@ -1,10 +1,19 @@
"""
通用后台任务模块
包含简道云表单更新和工作流审批等通用功能
本模块包含所有后台任务通用的功能,包括:
- 简道云表单更新
- 工作流审批
- 获取门店ID
- 获取会员卡列表
这些功能被多个后台任务模块复用。
"""
import logging
import time
from typing import Dict, Any
import requests
from typing import Dict, Any, List, Optional, Callable
from tqdm import tqdm
from app.api import API
api_instance = API()
@@ -14,9 +23,15 @@ logger = logging.getLogger('app')
def update_jiandaoyun(data: Dict[str, Any], results: str):
"""
更新简道云表单
:param data: 包含表单id、应用id、数据id的字典
:param results: 执行结果信息
:return: 更新结果字典
将后台任务的执行结果更新到简道云表单中。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
results: 执行结果信息,将写入到表单的执行明细字段
Returns:
Dict: 更新结果字典,{'msg': True} 表示成功,{'msg': False} 表示失败
"""
# 定义简道云数据配置
jiandaoyun_data = {
@@ -44,8 +59,17 @@ def update_jiandaoyun(data: Dict[str, Any], results: str):
def approve_workflow(data: Dict[str, Any]):
"""
获取简道云当前流程节点并直接提交
:param data: 包含表单id、应用id、数据id的字典
:return: None
获取简道云工作流的当前待处理任务,并自动提交到下一步。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
Returns:
None
注意:
如果未找到待处理任务,函数会记录错误并返回,不会抛出异常
"""
# 获取简道云当前流程列表
json = api_instance.workflow_instance_get(data)
@@ -92,3 +116,108 @@ def approve_workflow(data: Dict[str, Any]):
except Exception as e:
logger.error(f"简道云工作流任务提交失败: {e}")
def get_operate_org_id(cookies: Dict[str, str]) -> Optional[str]:
"""
获取操作门店ID
从F6系统获取第一个门店的组织ID,用于后续操作。
Args:
cookies: 用户登录 F6 系统的 cookies 信息
Returns:
Optional[str]: 门店ID,如果获取失败返回 None
注意:
如果未获取到门店信息或门店ID为空,会记录错误日志并返回 None
"""
org_url = "https://yunxiu.f6car.cn/hive/org/getPageOrgGroupMembers?currentPage=1&pageSize=10&name="
try:
org_res = requests.get(url=org_url, cookies=cookies)
org_data = org_res.json().get("data", {})
org_list = org_data.get("list", [])
if not org_list or len(org_list) == 0:
logger.error("未获取到门店信息")
return None
operate_org_id = org_list[0].get("orgId")
if not operate_org_id:
logger.error("门店ID为空")
return None
logger.info(f"获取门店ID成功: {operate_org_id}")
return operate_org_id
except Exception as e:
logger.error(f"获取门店ID时发生错误: {e}")
return None
def get_card_list(
cookies: Dict[str, str],
operate_org_id: str,
extract_func: Callable[[Dict], Optional[str]] = None
) -> List[str]:
"""
获取会员卡列表
从F6系统获取指定门店的会员卡列表,支持自定义提取逻辑。
Args:
cookies: 用户登录 F6 系统的 cookies 信息
operate_org_id: 门店ID
extract_func: 自定义提取函数,用于从会员卡数据中提取ID
如果不提供,默认提取 idCustomer 字段
Returns:
List[str]: 会员卡ID列表
注意:
- 默认每页100条数据,会自动分页获取所有数据
- 每页请求间隔0.2秒,避免请求过快
"""
card_list = []
try:
# 获取第一页,确定总页数
card_url = f"https://yunxiu.f6car.cn/marketing/card/paging?useStationIdOwnOrgList={operate_org_id}&pageSize=100&pageNo=1"
card_res = requests.get(url=card_url, cookies=cookies)
total_card = int(card_res.json().get("data", {}).get("total", 0))
if total_card == 0:
logger.info("未找到会员卡数据")
return card_list
total_page = total_card // 100 + (total_card % 100 > 0)
logger.info(f"会员卡总数: {total_card}, 总页数: {total_page}")
# 定义默认提取函数(提取客户ID
if extract_func is None:
def default_extract(card_item: Dict) -> Optional[str]:
return card_item.get("idCustomer")
extract_func = default_extract
# 分页获取所有会员卡数据
for page in tqdm(range(1, total_page + 1), desc="查询会员卡"):
card_url = (f"https://yunxiu.f6car.cn/marketing/card/paging?useStationIdOwnOrgList={operate_org_id}"
f"&pageSize=100&pageNo={page}")
card_res = requests.get(url=card_url, cookies=cookies)
card_data_list = card_res.json().get("data", {}).get("data", [])
# 使用提取函数提取ID
for card_item in card_data_list:
extracted_id = extract_func(card_item)
if extracted_id is not None:
card_list.append(extracted_id)
time.sleep(0.2)
logger.info(f"获取会员卡列表成功,共 {len(card_list)}")
return card_list
except Exception as e:
logger.error(f"获取会员卡列表时发生错误: {e}")
return card_list
+20 -8
View File
@@ -1,6 +1,10 @@
"""
客户相关后台任务模块
包含修改客户信息等功能
本模块包含客户相关的后台任务,包括:
- 客户信息批量修改
这些任务在后台线程中执行,不会阻塞主请求。
"""
import logging
import requests
@@ -15,16 +19,24 @@ logger = logging.getLogger('app')
def modify_customer_info_background(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str):
"""
修改客户信息后台任务
此函数用于后台任务,用于修改会员信息。
修改客户信息后台任务
在后台线程中批量修改客户信息,从 Excel 文件中读取客户手机号和修改信息。
执行完成后会更新简道云表单并自动提交工作流。
Args:
data (Dict[str, Any]): 前端请求发送过来的数据,包含文件信息和其他必要参数。
cookies (Dict[str, str]): 登录用户的Cookies
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
cookies: 用户登录 F6 系统的 cookies 信息
df: Excel 文件读取的内容,DataFrame 格式,第一列为客户手机号
save_path: Excel 文件保存的地址,执行完成后会删除此文件
Returns:
None
注意:
- 根据客户手机号匹配客户信息
- 执行完成后会自动删除上传的文件
- 执行结果会更新到简道云表单
"""
df = df.where(pd.notnull(df), None)
params = {
+94 -98
View File
@@ -1,27 +1,41 @@
"""
删除相关后台任务模块
包含删除历史维修记录、删除客户信息、删除客户车辆信息等功能
本模块包含删除相关的后台任务,包括:
- 删除历史维修记录
- 删除客户信息
- 删除客户车辆信息
这些任务在后台线程中执行,不会阻塞主请求。
执行完成后会更新简道云表单并自动提交工作流。
"""
import logging
import traceback
import requests
import time
from typing import Dict, Any, List
from typing import Dict, Any, List, Optional
from datetime import datetime
from tqdm import tqdm
from app.tasks.common import update_jiandaoyun, approve_workflow
from app.tasks.common import update_jiandaoyun, approve_workflow, get_operate_org_id, get_card_list
logger = logging.getLogger('app')
def delete_history_background(data: Dict[str, Any], cookies: Dict[str, str], org_id: str, org_name: str):
"""
删除历史维修数据后台运行函数
:param data: 包含表单id、数据id等的字典
:param cookies: 用户登录F6系统的cookies信息
:param org_id: 需要删除历史维修记录的门店id
:param org_name: 需要删除历史维修记录的门店名称
:return: None
删除历史维修记录后台任务
在后台线程中删除指定门店的历史维修记录。
执行完成后会更新简道云表单并自动提交工作流。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
cookies: 用户登录 F6 系统的 cookies 信息
org_id: 需要删除历史维修记录的门店ID
org_name: 需要删除历史维修记录的门店名称
Returns:
None
"""
url = f'https://yunxiu.f6car.cn/maintain-dump/maintainHistory/?orgid={org_id}' # 删除url
res = requests.delete(url=url, cookies=cookies)
@@ -47,60 +61,37 @@ def delete_history_background(data: Dict[str, Any], cookies: Dict[str, str], org
def delete_customer_background(data: Dict[str, Any], cookies: Dict[str, str], json_data: List[Dict[str, Any]]):
"""
删除客户信息后台运行函数
:param data: 包含表单id、数据id等字典
:param cookies: 用户登录f6系统的cookies信息
:param json_data: 获取到的客户信息列表,列表最大值取决url里面的值
:return: None
删除客户信息后台任务
在后台线程中批量删除客户信息
执行完成后会更新简道云表单并自动提交工作流。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
cookies: 用户登录 F6 系统的 cookies 信息
json_data: 获取到的客户信息列表,包含要删除的客户信息
Returns:
None
注意:
- 8-20点之间每3.5秒删除一条数据,其余时间每1.5秒删除一条数据
- 执行结果会更新到简道云表单
"""
success = 0
fail = 0
# 获取门店ID
org_url = "https://yunxiu.f6car.cn/hive/org/getPageOrgGroupMembers?currentPage=1&pageSize=10&name="
org_res = requests.get(url=org_url, cookies=cookies)
# 安全地获取门店ID
org_data = org_res.json().get("data", {})
org_list = org_data.get("list", [])
if not org_list or len(org_list) == 0:
logger.error("未获取到门店信息")
operate_org_id = get_operate_org_id(cookies)
if not operate_org_id:
msg = update_jiandaoyun(data, '删除失败: 未获取到门店信息')
if msg.get('msg'):
approve_workflow(data)
return
operate_org_id = org_list[0].get("orgId")
if not operate_org_id:
logger.error("门店ID为空")
msg = update_jiandaoyun(data, '删除失败: 门店ID为空')
if msg.get('msg'):
approve_workflow(data)
return
print(operate_org_id)
# 获取会员卡列表
card_url = f"https://yunxiu.f6car.cn/marketing/card/paging?useStationIdOwnOrgList={operate_org_id}&pageSize=100&pageNo=1"
card_res = requests.get(url=card_url, cookies=cookies)
total_card = int(card_res.json().get("data").get("total"))
print(total_card)
total_page = total_card // 100 + (total_card % 100 > 0)
card_list_customers = []
for page in tqdm(range(1, total_page + 1), desc="查询会员卡"):
card_url = (f"https://yunxiu.f6car.cn/marketing/card/paging?useStationIdOwnOrgList={operate_org_id}"
f"&pageSize=100&pageNo={page}")
card_res = requests.get(url=card_url, cookies=cookies)
card_cars_list = card_res.json().get("data").get("data")
for card_customer in card_cars_list:
if card_customer.get("idCustomer") is None:
continue
else:
card_list_customers.append(card_customer.get("idCustomer", None))
time.sleep(0.2)
# 获取会员卡列表(提取客户ID
card_list_customers = get_card_list(cookies, operate_org_id)
for item in tqdm(json_data, desc="删除客户"):
id_customer = item['idCustomer']
@@ -135,10 +126,10 @@ def delete_customer_background(data: Dict[str, Any], cookies: Dict[str, str], js
continue
now = datetime.now()
if 20 <= now.hour <= 8:
time.sleep(1)
if 8 <= now.hour <= 20:
time.sleep(3.5)
else:
time.sleep(3)
time.sleep(1.5)
logger.info(f"客户删除结果: 成功次数={success}, 失败次数={fail}")
@@ -152,13 +143,26 @@ def delete_customer_background(data: Dict[str, Any], cookies: Dict[str, str], js
def delete_car_background(data: Dict[str, Any], url: str, cookies: Dict[str, str], header: Dict[str, Any],
all_page: str):
"""
删除客户车辆信息后台运行函数
:param header: 应包含账号登录的请求头
:param url: 包含请求客户车辆信息的url
:param all_page: 客户车辆信息的页数
:param data: 包含表单id、数据id等的字典
:param cookies: 登录F6系统后的请求信息
:return: None
删除客户车辆信息后台任务
在后台线程中批量删除客户车辆信息
会检查车辆是否有会员卡或最近消费记录,有则跳过删除。
执行完成后会更新简道云表单并自动提交工作流。
Args:
data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典
url: 获取车辆列表的 API URL
cookies: 用户登录 F6 系统的 cookies 信息
header: HTTP 请求头字典,应包含账号登录的请求头
all_page: 总页数(字符串或整数),用于分页获取车辆列表
Returns:
None
注意:
- 8-20点之间每3.5秒删除一条数据,其余时间每1.5秒删除一条数据
- 有会员卡或最近消费记录的车辆会被跳过
- 执行结果会更新到简道云表单
"""
print(cookies)
success = 0
@@ -168,50 +172,42 @@ def delete_car_background(data: Dict[str, Any], url: str, cookies: Dict[str, str
all_page = int(all_page)
# 获取门店ID
org_url = "https://yunxiu.f6car.cn/hive/org/getPageOrgGroupMembers?currentPage=1&pageSize=10&name="
org_res = requests.get(url=org_url, cookies=cookies)
# 安全地获取门店ID
org_data = org_res.json().get("data", {})
org_list = org_data.get("list", [])
if not org_list or len(org_list) == 0:
logger.error("未获取到门店信息")
operate_org_id = get_operate_org_id(cookies)
if not operate_org_id:
msg = update_jiandaoyun(data, '删除失败: 未获取到门店信息')
if msg.get('msg'):
approve_workflow(data)
return
operate_org_id = org_list[0].get("orgId")
if not operate_org_id:
logger.error("门店ID为空")
msg = update_jiandaoyun(data, '删除失败: 门店ID为空')
if msg.get('msg'):
approve_workflow(data)
return
print(operate_org_id)
# 获取会员卡列表
card_url = (
f"https://yunxiu.f6car.cn/marketing/card/paging?useStationIdOwnOrgList={operate_org_id}&pageSize=100&pageNo=1"
)
card_res = requests.get(url=card_url, cookies=cookies)
total_card = int(card_res.json().get("data").get("total"))
print(total_card)
total_page = total_card // 100 + (total_card % 100 > 0)
# 获取会员卡列表(提取车辆ID
# 注意:需要获取所有车辆的ID,所以不能直接使用 get_card_list
# 需要自定义提取逻辑,返回所有车辆的ID列表
card_list_cars = []
for page in tqdm(range(1, total_page + 1), desc="查询会员卡"):
card_url = (f"https://yunxiu.f6car.cn/marketing/card/paging?useStationIdOwnOrgList={operate_org_id}"
f"&pageSize=100&pageNo={page}")
try:
# 获取第一页,确定总页数
card_url = f"https://yunxiu.f6car.cn/marketing/card/paging?useStationIdOwnOrgList={operate_org_id}&pageSize=100&pageNo=1"
card_res = requests.get(url=card_url, cookies=cookies)
card_cars_list = card_res.json().get("data").get("data")
for card_car in card_cars_list:
if card_car.get("cars") is None:
continue
for car in card_car.get("cars", []):
card_list_cars.append(car.get("idCar", None))
time.sleep(0.2)
total_card = int(card_res.json().get("data", {}).get("total", 0))
if total_card > 0:
total_page = total_card // 100 + (total_card % 100 > 0)
for page in tqdm(range(1, total_page + 1), desc="查询会员卡"):
card_url = (f"https://yunxiu.f6car.cn/marketing/card/paging?useStationIdOwnOrgList={operate_org_id}"
f"&pageSize=100&pageNo={page}")
card_res = requests.get(url=card_url, cookies=cookies)
card_cars_list = card_res.json().get("data", {}).get("data", [])
for card_car in card_cars_list:
if card_car.get("cars") is None:
continue
for car in card_car.get("cars", []):
car_id = car.get("idCar")
if car_id:
card_list_cars.append(car_id)
time.sleep(0.2)
except Exception as e:
logger.error(f"获取会员卡列表时发生错误: {e}")
itemlist = []
# 使用 range() 创建一个可迭代的对象
+107
View File
@@ -1,3 +1,14 @@
"""
应用工具模块
本模块提供应用级的工具类,包括:
- 日志记录器配置(支持日志轮转)
- 后台任务调度器
- 任务队列管理
- 请求头解码工具
这些工具在整个应用中被广泛使用。
"""
import logging
import os
from logging.handlers import RotatingFileHandler
@@ -8,7 +19,27 @@ from urllib.parse import unquote
class AppTools:
"""
应用级工具集合类
提供应用级别的工具功能,包括:
- 初始化轮转日志记录器
- 初始化后台调度器(进程退出时自动关闭)
- 维护一个简单的任务队列与后台处理线程
属性:
config: 配置对象
task_queue: 任务队列
logger: 日志记录器
scheduler: 后台调度器
"""
def __init__(self, config):
"""
初始化应用工具
Args:
config: 配置对象,包含日志、目录等配置信息
"""
self.config = config
self.task_queue = Queue()
self.logger = self._setup_logger()
@@ -16,6 +47,19 @@ class AppTools:
self._start_task_thread()
def _setup_logger(self):
"""
配置带轮转的文件日志记录器
创建支持日志轮转的文件日志记录器,避免重复添加相同文件处理器。
Returns:
logging.Logger: 配置好的日志记录器
注意:
- 日志文件最大 5MB
- 保留 5 个备份文件
- 使用 UTF-8 编码
"""
log_dir = self.config.LOGS_DIRECTORY
if not os.path.exists(log_dir):
os.makedirs(log_dir)
@@ -25,6 +69,7 @@ class AppTools:
logger = logging.getLogger("app")
logger.setLevel(logging.INFO)
# 若未绑定目标日志文件的 RotatingFileHandler,则创建并绑定
if not any(isinstance(h, RotatingFileHandler) and getattr(h, 'baseFilename', None) == str(log_file)
for h in logger.handlers):
file_handler = RotatingFileHandler(log_file, maxBytes=1024 * 1024 * 5, backupCount=5, encoding='utf-8')
@@ -36,19 +81,43 @@ class AppTools:
return logger
def _setup_scheduler(self):
"""
初始化后台调度器
创建后台调度器,并在进程退出时优雅关闭,防止资源泄漏。
Returns:
BackgroundScheduler: 后台调度器实例
注意:
调度器会在进程退出时自动关闭
"""
scheduler = BackgroundScheduler()
import atexit
atexit.register(lambda: scheduler.shutdown(wait=False))
return scheduler
def _start_task_thread(self):
"""
启动后台任务处理线程
启动一个守护线程来处理任务队列中的任务,守护模式运行,随主线程退出。
"""
task_thread = threading.Thread(target=self.process_tasks, daemon=True)
task_thread.start()
def process_tasks(self):
"""
后台消费队列中的任务:
任务结构为 {'handler': callable, 'data': any, 'response': Queue}
- 正常执行时将 handler(data) 的结果放入 response 队列
- 发生异常时记录错误并将失败信息放入 response 队列
- 每次任务完成后调用 task_done()
"""
while True:
task = self.task_queue.get()
if task is None:
# 外部以 None 作为结束信号
self.logger.error("任务处理线程已终止")
break
try:
@@ -62,6 +131,18 @@ class AppTools:
self.logger.info("任务处理完成")
def enqueue_task(self, handler, data):
"""
将任务入队
将任务放入任务队列,并返回一个响应队列,调用方可以从响应队列中获取执行结果。
Args:
handler: 处理函数,执行具体业务逻辑
data: 传递给处理函数的数据
Returns:
Queue: 响应队列,用于获取任务执行结果
"""
response_queue = Queue()
self.task_queue.put({
'handler': handler,
@@ -72,13 +153,39 @@ class AppTools:
@staticmethod
def decode_headers(headers):
"""
解码请求头
对请求头字典进行 URL 解码(UTF-8),返回解码后的副本。
主要用于处理包含中文字符的请求头。
Args:
headers: 请求头字典
Returns:
dict: 解码后的请求头字典
"""
return {key: unquote(value, encoding='utf-8') for key, value in headers.items()}
# 全局日志记录器变量
# 用于存储全局日志记录器实例,避免重复初始化
logger = None
def setup_global_logger(config):
"""
设置全局日志记录器
懒加载并返回全局 logger,若未初始化则构建 AppTools 并复用其中的 logger。
避免重复初始化日志处理器。
Args:
config: 配置对象
Returns:
logging.Logger: 全局日志记录器实例
"""
global logger
if logger is None:
app_tools = AppTools(config)