This commit is contained in:
2026-01-30 11:28:35 +08:00
commit f1831c31b4
399 changed files with 860978 additions and 0 deletions
+479
View File
@@ -0,0 +1,479 @@
"""
API 模块
"""
import requests
import json
from typing import Optional, List, Dict, Any
from config import Config
from decimal import Decimal
import time
import numpy as np
# from log_config import configure_task_logger, configure_error_task_logger
# 获取已经配置好的常规日志记录器
# logger = configure_task_logger()
#
# # 获取已经配置好的错误任务日志记录器
# error_task_logger = configure_error_task_logger()
class NpEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(NpEncoder, self).default(obj)
def replace_decimals(obj):
if isinstance(obj, dict):
return {k: replace_decimals(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [replace_decimals(item) for item in obj]
elif isinstance(obj, Decimal):
return float(obj) # 或者 str(obj)
return obj
class API:
def entry_data_get(self, data: dict, replace: bool = False) -> Dict: # 获取单条表单数据
"""
获取单条表单数据
:param replace: 是否替换字段,默认为关
:param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息
:return:
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/get'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 app_key
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"data_id": data['data_id'] # 数据ID
})
res = requests.post(url=url, data=payload, headers=headers)
data_get = res.json()
print(data_get)
if replace:
data_get = self.field_replacement(data, data_get) # 字段替换,由id替换为标签名
return data_get
def entry_data_list(self, data: dict, replace: bool = False, max_retries: int = 100) -> Dict: # 获取多条表单数据
"""
获取多条表单数据
:param max_retries: 最大重试次数
:param replace: 是否替换字段
:param data:
api_key: 应用id
entry_id: 表单id
:return:
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 app_key
'Content-Type': 'application/json'
}
all_data_batches = [] # 用于存储每次请求返回的数据批次
last_data_id = None
exit_flag = False
while True:
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"limit": 100,
"data_id": last_data_id
})
# res = requests.post(url=url, data=payload, headers=headers)
# data_get = res.json() # 此时返回的数据是一个字典
# if 'data' not in data_get or len(data_get['data']) == 0:
# break
#
# # 将当前批次的数据作为一个整体添加到 all_data_batches 中
# all_data_batches.extend(data_get['data'])
#
# last_data_id = data_get['data'][-1].get('_id')
# time.sleep(0.2)
# print(f"已获取 {len(all_data_batches)} 条数据")
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
# print("返回结果:", data_get)
if data_get["data"]:
all_data_batches.extend(data_get['data'])
last_data_id = data_get['data'][-1].get('_id')
print(f"已获取 {len(all_data_batches)} 条数据")
break # 成功则跳出循环
else:
if 'data' not in data_get or len(data_get['data']) == 0:
exit_flag = True
break
print("请求失败, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
if retries > max_retries:
print(f"超过最大重试次数({max_retries}),放弃此次请求")
# error_task_logger.error(f"任务 {last_data_id}组 连续{max_retries}次请求失败,放弃此次请求。")
all_data_batches.append(None) # 或者可以选择记录失败的payload以便后续处理
if exit_flag:
break
# 构建最终返回的字典
final_data = {
'data': all_data_batches # 'data' 键对应的值是列表的列表
}
if replace:
print("进行了替换")
final_data = self.field_replacement(data, final_data) # 字段替换,由id替换为标签名
return final_data
@staticmethod
def entry_widget_list(data: dict) -> Optional[Dict[str, Any]]: # 获取表单字段
"""
获取表单字段
:param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息
:return:
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/widget/list'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 app_key
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'],
"entry_id": data['entry_id'],
})
res = requests.post(url=url, data=payload, headers=headers)
# print(type(res.json()))
return res.json()
def field_replacement(self, data: dict, data_get: dict) -> dict: # 字段替换(id替换为标签名) json_str:请求后的返回数据
"""
字段替换,将id替换为标签名,即唯一值替换为表单中显示字段的名字
:param data: 简道云插件发送过来的data,包含表单id、数据id、应用id
:param data_get: 简道云请求的数据,一般是根据数据id获取到表单的数据
:return: 将根据数据id获取到的表单数据,进行替换,返回替换后的数据
"""
# 获取表单对应字段标签名称
widget_list = self.entry_widget_list(data)
# 检查widget_list是否有效
if not widget_list or 'widgets' not in widget_list or not isinstance(widget_list['widgets'], list):
raise ValueError("映射表没有接受到数据")
# 创建一个映射表,将_widget_名称映射到label
name_to_label = {widget['name']: widget['label'] for widget in widget_list['widgets']}
print(name_to_label)
# 遍历data_get对象中的"data"部分,替换键名
data = data_get.get('data', {})
new_data = {}
if isinstance(data, list): # 多数据查询返回list
# 当 data 是列表时执行的代码
for item in data:
new_data = {}
for key, value in item.items():
if key in name_to_label:
new_key = name_to_label[key]
else:
new_key = key
new_data[new_key] = value
item['data'] = new_data # 对每个item进行更新
elif isinstance(data, dict): # 单数据查询返回dict
# 当 data 是字典时执行的代码
for key, value in data.items():
if key in name_to_label:
new_key = name_to_label[key]
else:
new_key = key
new_data[new_key] = value
data_get['data'] = new_data # 更新JSON对象中的"data"部分
# 将更新后的JSON对象转换回JSON字符串
new_json = json.dumps(data_get, ensure_ascii=False, indent=2)
new_json = json.loads(new_json)
return new_json
@staticmethod
def data_batch_create(data: dict) -> Optional[requests.Response]: # 新建单条数据
"""
新建单条表单数据
:param data: 应该包含应用id、表单id,以及新建的数据data['data']
:return: 返回创建后简道云返回的信息
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/create'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 app_key
'Content-Type': 'application/json'
}
"""
data 样式 # 后续优化发送数据样式 目前输入字段,后续优化输入表单名称
jiandaoyun_data['data'] = {"_widget_1731650067055":{"value":f'{username}{password}'},
"_widget_1731650067056":{"value": f"{group}"}}
"""
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"data": data['data'],
"is_start_workflow": data.get('is_start_workflow', "false"),
"is_start_trigger": data.get('is_start_trigger', "false"),
}
)
res = requests.post(url=url, data=payload, headers=headers)
data_get = res.json()
return data_get
@staticmethod
def entry_data_batch_create(
data: dict,
chunk_size: int = 90,
max_retries: int = 100
) -> List[Optional[requests.Response]]: # 新建多条数据 注意简道云限制1次最多100条数据
"""
新建多条数据
:param max_retries: 最大重试次数,此处设置100次
:param data:应包含数据id、表单id、以及需要新建的信息,新建信息应该是一个列表
:param chunk_size: 简道云限制批量新建一次最多100条,这里默认值设置为90条一次
:return:返回请求后的结果
"""
data = replace_decimals(data)
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/batch_create'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
"""
data_list 样式 # 后续优化发送数据样式 目前输入字段,后续优化输入表单名称
jiandaoyun_data_list['data_list'] = [{"_widget_1731650067055":{"value":f'{username}{password}'},
"_widget_1731650067056":{"value": f"{group}"}},
{"_widget_1731650067055":{"value":f'{username}{password}'},
"_widget_1731650067056":{"value": f"{group}"}}]
"""
# 获取data_list长度
total_length = len(data['data_list'])
print(f"多数据写入行数: {total_length}")
# 计算需要发送的次数
num_chunks = (total_length + chunk_size - 1) // chunk_size # //整除向下取证,需要加上chunk_size - 1保证不会有缺失数据
print(num_chunks)
data_get_list = []
for i in range(num_chunks):
start_index = i * chunk_size
end_index = min(start_index + chunk_size, total_length)
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"data_list": data['data_list'][start_index:end_index],
"is_start_workflow": data.get('is_start_workflow', "false"),
"is_start_trigger": data.get('is_start_trigger', "false"),
}, cls=NpEncoder)
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
print(i, "返回结果:", data_get)
if data_get["status"] == "success":
data_get_list.append(data_get)
break # 成功则跳出循环
else:
print("请求失败, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
if retries > max_retries:
print(f"超过最大重试次数({max_retries}),放弃此次请求")
# error_task_logger.error(
# f"任务 {data['data_list'][start_index:end_index]} 连续{max_retries}次请求失败,放弃此次请求。")
data_get_list.append(None) # 或者可以选择记录失败的payload以便后续处理
return data_get_list
@staticmethod
def entry_data_update(data: dict, max_retries: int = 100) -> dict: # 修改数据
"""
修改数据
:param max_retries: 最大重试次数,此处设置100次
:param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息
:return: 修改数据后简道云返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/update'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"data_id": data['data_id'], # 数据ID
"data": data['data']
}
)
print(payload)
data_get = None
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
print("返回结果:", data_get)
if res.status_code == 200:
break # 成功则跳出循环
else:
print("请求失败, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(10) # 在重试之间稍作停顿
if retries > max_retries:
print(f"超过最大重试次数({max_retries}),放弃此次请求")
# error_task_logger.error(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。")
continue
return data_get
@staticmethod
def entry_data_delete(data: dict, max_retries: int = 100, ) -> dict:
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/delete'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"data_id": data['data_id'], # 数据ID
}
)
retries = 0
delete_status = None
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers)
delete_status = res.json()
# 手动处理状态码 4001
if delete_status == {
"code": 4001,
"msg": "Data does not exist."
}:
print("返回结果:", delete_status)
break # 成功则跳出循环
# 检查其他状态码
res.raise_for_status() # 只对非 4001 的状态码进行检查
print("返回结果:", delete_status)
if res.status_code == 200 :
break # 成功则跳出循环
else:
print("请求失败, 将重新请求")
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}, 将重新请求")
retries += 1
time.sleep(10) # 在重试之间稍作停顿
if retries > max_retries:
print(f"超过最大重试次数({max_retries}),放弃此次请求")
# error_task_logger.error(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。")
continue
return delete_status
@staticmethod
def workflow_instance_get(data: dict) -> dict:
"""
查询实例流程信息
:param data: 简道云插件发送过来的data,包含应用id
:return: 查询简道云流程实例信息返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v5/workflow/instance/get'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"instance_id": data['data_id'],
"tasks_type": 1
}
)
res = requests.post(url=url, data=payload, headers=headers)
return res.json()
@staticmethod
def workflow_task_approve(data: dict) -> dict:
"""
流程待办提交
:param data:应包含username、instance_id(data_id)、task_id等信息
:return:返回简道云流程待办提交的结果
"""
url = 'https://api.jiandaoyun.com/api/v1/workflow/task/approve'
headers = {
'Authorization': Config.JIANDAOYUN_API_TOKEN, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"username": data["username"],
"instance_id": data["instance_id"],
"task_id": data['task_id'],
"comment": "自动转交"
}
)
res = requests.post(url=url, data=payload, headers=headers)
return res.json()
@@ -0,0 +1,67 @@
import requests
import pandas as pd
import os
from typing import Dict, Any
from api import API
import time
from tqdm import tqdm
api_instance = API()
def update_jiandaoyun(data: Dict[str, Any], results: str):
""" 更新简道云表单 """
# 定义简道云数据配置
jiandaoyun_data = {
'api_key': data['api_key'],
'entry_id': data['entry_id'],
'data_id': data['data_id'],
"data": {
'_widget_1731379774828': {"value": "已执行"}, # f6系统批量操作测试 是否执行成功
'_widget_1731381334870': {"value": results} # f6系统批量操作测试 执行明细
}
}
time.sleep(1)
print(jiandaoyun_data)
api_instance.entry_data_update(jiandaoyun_data)
print('更新简道云表单成功')
def create_brand_background(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str):
# 定义请求URL
url = 'https://yunxiu.f6car.cn/camaro/brand/getOrCreate'
# 遍历DataFrame中的每一行数据
results = []
for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="创建品牌"):
brand_name = row['品牌名']
try:
response = requests.post(url, cookies=cookies, json={"brandName": brand_name})
response.raise_for_status() # 抛出HTTP错误
results.append({'品牌名': brand_name, '状态': '创建成功'})
except requests.exceptions.RequestException as e:
results.append({'品牌名': brand_name, '状态': f'创建失败: {str(e)}'})
print({'msg': '已执行', 'msg_details': f'{results}'})
os.remove(save_path)
print(f'{save_path}已删除')
print(data)
# 调用api回写改掉 执行明细与执行状态
update_jiandaoyun(data, f'{results}')
def delete_history_background(data: Dict[str, Any], cookies: Dict[str, str], org_id: str, org_name: str):
url = f'https://yunxiu.f6car.cn/maintain-dump/maintainHistory/?orgid={org_id}' # 删除url
res = requests.delete(url=url, cookies=cookies)
res_data = res.json()
if res.status_code == 200 and res_data.get('code') == 200:
results = f'{org_name} 历史维修记录已删除'
print(results)
else:
results = f'删除 {org_name} 历史维修记录失败: {res_data.get("message")}'
print(results)
# 调用api回写改掉 执行明细与执行状态
time.sleep(1)
update_jiandaoyun(data, f'{results}')
+33
View File
@@ -0,0 +1,33 @@
# config.py
"""
配置类,负责加载和提供项目配置。
"""
class Config:
CONN_INFO = {
"database": "f6_bi",
"user": "BASIC$ro_caowei",
"password": "!ro_caowei123",
"host": "hgprecn-cn-nif1vnv0y002-cn-shanghai.hologres.aliyuncs.com",
"port": "80"
} # SaaS-NGV 数据库链接配置-postgresql
# 接车宝数据库链接配置-mysql
JCB_CONN_INFO_database = "f6insight_gzczj"
JCB_CONN_INFO_user = "rw_insight_gzczj"
JCB_CONN_INFO_password = "wEBT5LBHzbbhJisheCsE"
JCB_CONN_host = "rm-uf6r230vbtxf5gdz63o.mysql.rds.aliyuncs.com"
JIANDAOYUN_API_TOKEN = 'Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN' # token
EFFICIENT_CAR_PICKUP_APP_ID = "6717470a0b3975ef583c6df1" # 接车宝应用id
SCHEDULED_TASKS_APP_ID = "6694d3c4fcb69ca9a111a6c4" # 数据支持应用id
SaaS_Tasks_APP_ID = "675b900991ad2491c69389ca" # saas日常回访应用id
SCHEDULED_TASKS_ENTRY_ID = "6760e5e7672af4621ab8128a" # 定时任务表单id
NGV_TASKS_ENTRY_ID = "675bb02bd2d53c2034c665e4" # NGV任务表单id
EFFICIENT_CAR_PICKUP_ENTRY_ID = "67174710da507490d8ac12c1" # 接车宝表单id
EFFICIENT_CAR_PICKUP_CUSTOMER_SERVICE_ID = "67b6f2462f9ac03b783d409a" # 接车宝客服表单id
EFFICIENT_CAR_PICKUP_CUSTOMER_HISTORY_ID = "67c156ba635191b64af8a110" # 接车宝历史派发表单
+171
View File
@@ -0,0 +1,171 @@
from flask import Flask, request, jsonify
from module import module, F6_Plugin_module
from urllib.parse import unquote
from config import Config
import queue
import threading
import logging
import os
from logging.handlers import RotatingFileHandler
from apscheduler.schedulers.background import BackgroundScheduler
import requests
import atexit
app = Flask(__name__)
app.config.from_object(Config)
f6_module = module.F6Module()
f6_plugin_module = F6_Plugin_module.F6PluginModule()
# 定义一个映射表,将操作与处理函数关联起来
action_map = {
'login_in': f6_module.accept_login_message,
'get_company_information': f6_module.get_company_information,
'get_store_information': f6_module.get_store_information,
"keep_alive": f6_module.get_keep_heart,
'check_file': f6_plugin_module.check_file,
'create_brand': f6_plugin_module.create_brand,
'delete_history': f6_plugin_module.delete_history,
# 'delete_customer': f6_plugin_module.delete_customer,
# 'delete_cars': f6_plugin_module.delete_cars,
}
# 创建一个消息队列
task_queue = queue.Queue()
# 创建一个线程来处理队列中的任务
def process_tasks():
while True:
task = task_queue.get()
if task is None:
logger.error("任务处理线程已终止")
break # 停止处理任务
try:
result = task['handler'](task['data'])
task['response'].put(result)
except Exception as e:
logger.error(f"任务执行失败: {str(e)}")
task['response'].put({'msg': f'任务执行失败: {str(e)}'})
finally:
task_queue.task_done()
logger.info("任务处理完成")
# 启动任务处理线程
task_thread = threading.Thread(target=process_tasks, daemon=True)
task_thread.start()
# 配置日志记录器
log_dir = './日志' # 日志文件夹路径
if not os.path.exists(log_dir):
os.makedirs(log_dir) # 如果日志文件夹不存在,则创建它
log_file = os.path.join(log_dir, '简道云日志.log')
# 设置日志格式和级别
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# 创建RotatingFileHandler,支持日志文件滚动
file_handler = RotatingFileHandler(log_file, maxBytes=1024 * 1024 * 5, backupCount=5)
file_handler.setLevel(logging.INFO)
# 定义日志格式
formatter = logging.Formatter('%(asctime)s %(levelname)s:%(name)s:%(message)s')
file_handler.setFormatter(formatter)
# 将处理器添加到记录器
logger.addHandler(file_handler)
# 将Flask的默认日志记录器也配置为使用相同的处理器
app.logger.addHandler(file_handler)
# 创建一个后台调度器
scheduler = BackgroundScheduler()
# 定义一个定时任务,定期发送心跳请求以保持服务活跃
def keep_alive():
try:
data = {
'Action': 'keep_alive'
}
header = {
'Action': 'keep_alive'
}
# 发送一个GET请求到自己的API端点,模拟活动
response = requests.post(url='http://192.168.10.86:5000/webhook', data=data, headers=header, timeout=5)
if response.status_code == 200:
logger.info("Heartbeat sent successfully")
else:
logger.warning(f"Heartbeat failed with status code: {response.status_code}")
except requests.RequestException as e:
logger.error(f"Failed to send heartbeat: {e}")
# 添加定时任务,每隔1分钟执行一次
scheduler.add_job(keep_alive, 'interval', minutes=1)
# 启动调度器
scheduler.start()
# 确保在应用退出时关闭调度器
atexit.register(lambda: scheduler.shutdown())
@app.route('/webhook', methods=['POST'])
def webhook():
logging.info('Received POST request to /webhook')
"""
接受前端请求后将任务放入消息队列
Returns:
any: 返回任务处理的结果
"""
# 获取请求体中的 JSON 数据
data = request.json
header = request.headers
header = decode_headers(header)
action = header.get('Action')
if action == 'F6_Plugin':
check = header.get('Check')
if check == '':
handler = f6_plugin_module.check_file
elif check == '':
sub_action = data.get('Action')
handler = action_map.get(sub_action, lambda x: {'msg': '未执行'})
else:
return jsonify({'msg': '未知的操作'})
else:
handler = action_map.get(action, lambda x: {'msg': '未知的操作'})
# 创建一个队列用于存储任务处理结果
response_queue = queue.Queue()
# 将任务放入消息队列
task_queue.put({
'handler': handler,
'data': data,
'response': response_queue
})
# 等待任务处理结果
result = response_queue.get()
return jsonify(result)
def decode_headers(headers):
"""
解码包含中文字符的 HTTP 请求头。
:param headers: 包含编码后头部字段的字典
:return: 解码后的头部字段字典
"""
return {key: unquote(value, encoding='utf-8') for key, value in headers.items()}
if __name__ == '__main__':
# from waitress import serve
# serve(app, host='192.168.10.86', port=5000)
app.run(debug=True, port=5000)
@@ -0,0 +1,315 @@
import requests
from urllib.parse import quote
import pandas as pd
import os
import urllib.parse
import time
from datetime import datetime
from api import API
from typing import Optional, Dict, Any, Tuple
from config import Config
from module.module import F6Module
import threading
import back_ground_tasks
api_instance = API()
class F6PluginModule:
@staticmethod
def accept_file(data: Dict[str, Any]) -> Tuple[Optional[str], Dict[str, Any]]: # 接收文件
"""
接收文件。
此方法用于处理前端上传的文件,下载文件并保存到指定目录。主要步骤包括:
1. 处理前端传递的数据,获取文件的URL。
2. 解析URL以获取文件名。
3. 根据当前时间生成新的文件名,以避免文件名冲突。
4. 下载文件并保存到指定目录。
5. 返回文件保存路径和处理后的数据。
Args:
data (dict): 包含文件URL和其他必要信息的字典。
Returns:
tuple: 包含文件保存路径和处理后的数据的元组。如果文件保存成功,返回保存路径和数据;如果失败,返回 None 和数据。
"""
data = api_instance.entry_data_get(data=data)
try:
url = data['data']['附件'][0]['url']
print(url)
except Exception as e:
print(f'上传url未读取到,或无上传文件:{e}')
save_path = ''
return save_path, data
parsed_url = urllib.parse.urlparse(url)
query_params = urllib.parse.parse_qs(parsed_url.query)
attname = query_params.get('attname', [''])[0]
filename = urllib.parse.unquote(attname)
# 获取当前时间并格式化为指定格式的字符串
timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
# 分离文件名和扩展名
name_part, ext_part = filename.rsplit('.', 1) if '.' in filename else (filename, '')
# 构建新文件名
new_filename = f"{name_part}{timestamp}.{ext_part}" if ext_part else f"{name_part}{timestamp}"
save_path = os.path.join(Config.SAVE_DIRECTORY, new_filename)
print(save_path)
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(save_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
file.write(chunk)
return save_path, data
else:
return None, data
def check_file(self, data: Dict[str, Any]) -> Dict[str, str]: # 校验上传文件
"""
校验上传文件。
此方法负责接收前端上传的文件,并根据文件类型和操作指令进行相应的校验。主要步骤包括:
1. 调用 `accept_file` 方法处理前端传递的数据,获取文件保存路径和处理后的数据。
2. 根据数据中的 `Action` 字段判断需要执行的操作类型。
3. 如果文件保存路径有效,继续执行以下步骤:
- 如果操作类型为 `create_brand`,则读取文件和模板文件,校验文件格式是否正确。
- 如果文件格式正确,返回成功消息;否则返回错误消息。
4. 如果文件保存路径无效,返回相应的错误消息。
5. 如果读取文件过程中发生异常,捕获异常并返回错误消息。
Args:
data (dict): 前端请求发送过来的数据,包含文件信息和其他必要参数。
Returns:
dict: 包含文件校验结果的消息字典。如果校验成功,则返回文件路径和校验标志;如果失败,则返回错误消息。
"""
save_path, data1 = self.accept_file(data)
action = data1['data']['Action(隐藏)']
if save_path:
try:
if action == 'create_brand':
template_path = os.path.join(Config.MODE_DIRECTORY, '品牌名.xlsx')
# 读取指定的工作表
df1 = pd.read_excel(save_path, sheet_name='品牌名')
df2 = pd.read_excel(template_path, sheet_name='品牌名')
if df1.columns.tolist() == df2.columns.tolist(): # 校验表头名字
print('文件校验成功')
return {'msg': f'{save_path}', 'check': ''}
else:
print("'msg':'文件上传格式错误'")
return {'msg': '文件上传格式错误'}
elif action == 'delete_customer':
pass
elif action == 'delete_cars':
pass
else:
pass
except Exception as e:
return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'}
else:
return {'msg': '当前节点无附件上传', 'check': ''} # 注意前端上传url必须为必填的,否则这个返回会出现错误
@staticmethod
def create_brand(data: Dict[str, Any]) -> Dict[str, str]: # 创建品牌
"""
创建品牌函数。
此方法用于处理前端发送的品牌创建请求,并执行相应的品牌创建逻辑。
Args:
data (dict): 包含品牌信息及用户凭证等数据的字典。
Returns:
dict: 包含操作结果的消息字典。如果操作成功,则返回成功消息和详情;如果失败,则返回错误消息。
Process:
1. 调用 `entry_data_get` 方法处理前端传递的数据。
2. 从处理后的数据中提取用户名、密码、公司名称以及文件保存地址。
3. 使用提供的用户名和密码尝试登录。
4. 如果登录失败,立即返回错误消息。
5. 登录成功后,读取指定路径的Excel文件,该文件应包含待创建品牌的列表。
6. 若读取Excel文件失败,返回相应的错误消息。
7. 利用登录获取到的cookies构造请求头,准备发起创建品牌的POST请求。
8. 遍历Excel文件中的每一条记录(即每个品牌),向指定的API发送创建请求。
9. 每个品牌的创建结果(成功或失败)都会被记录下来。
10. 所有品牌的创建请求完成后,删除原始的Excel文件。
11. 最后,将所有操作的结果汇总成字典并返回给调用者。
"""
entry_data = api_instance.entry_data_get(data=data) # 获取表单数据
# print(data)
print('执行 品牌批量新建')
username = entry_data['data']['账号']
password = entry_data['data']['密码']
company_name = entry_data['data']['公司名称']
save_path = entry_data['data']['文件保存地址']
# 尝试登录
login_response = F6Module.login_in(username, password, company_name)
if login_response is None:
return {'msg': '登录失败'}
# 读取Excel文件
try:
df = pd.read_excel(save_path, dtype='string')
except Exception as e:
return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'}
# 获取登录cookies
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
# 启动后台线程执行品牌创建任务
try:
thread = threading.Thread(target=back_ground_tasks.create_brand_background, args=(data, cookies, df, save_path))
thread.start()
except Exception as e:
print(f'创建线程失败: {str(e)}')
# 立即返回消息
return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}
@staticmethod
def delete_history(data: Dict[str, Any]) -> Dict[str, str]: # 删除历史维修记录
# endregion
"""
删除历史维修记录。
此方法用于处理用户的请求,删除指定门店的历史维修记录。主要步骤包括:
1. 处理前端传递的数据,提取必要的登录信息和门店信息。
2. 尝试登录系统,获取登录cookies。
3. 获取所有门店的信息,找到与用户提供的门店名称匹配的门店ID。
4. 使用门店ID发送删除请求,删除历史维修记录。
5. 返回操作结果。
Args:
data (dict): 包含登录信息和门店信息的字典。
Returns:
dict: 包含操作结果的消息字典。如果删除成功,返回成功消息和详细信息;如果失败,返回错误消息。
"""
entry_data = api_instance.entry_data_get(data=data) # 获取表单数据,将ID替换为中文
username = entry_data['data']['账号']
password = entry_data['data']['密码']
company_name = entry_data['data']['公司名称']
org_name = entry_data['data']['门店名称']
login_response = F6Module.login_in(username, password, company_name) # 尝试登录
if login_response is None:
return {'msg': '未执行', 'msg_details': '登录失败'}
# 获取登录cookies
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
# 获取门店列表
url = 'https://yunxiu.f6car.cn/hive/org/getPageOrgGroupMembers?currentPage=1&pageSize=1000&name=' # 获取门店名称url
res = requests.get(url=url, cookies=cookies)
store_data = res.json()
org_lists = store_data['data']['list']
org_id = ''
org_name1 = ''
for org in org_lists: # 确认删除门店信息
org_name1 = org['orgName']
if org_name == org['orgName']:
org_id = org['orgId']
if org_id:
# 启动后台线程执行删除任务
thread = threading.Thread(target=back_ground_tasks.delete_history_background, args=(data, cookies, org_id, org_name1))
thread.start()
# 立即返回消息
return {'msg': '正在执行中', 'msg_details': '请稍后查看结果'}
else:
return {'msg': '未执行', 'msg_details': '门店信息错误'}
@staticmethod
def delete_customer(data): # 客户信息删除(删除所有客户信息)
username = data['username']
password = data['password']
company_name = data['company_name']
res = F6Module.login_in(username, password, company_name)
if res is not None:
cookies = requests.utils.dict_from_cookiejar(res.cookies)
url = "https://yunxiu.f6car.cn/member/customer/listForPermission?pageSize=5&pageNo=1" # 获取客户信息列表
res = requests.get(url, cookies=cookies)
json = res.json()
for item in json['data']['data']:
idCustomer = item['idCustomer']
phone = item['cellPhone']
try:
url = f"https://yunxiu.f6car.cn/member/customer/{idCustomer}" # 客户信息删除url
res = requests.delete(url, cookies=cookies) # 客户信息删除
print(res.json, idCustomer, phone)
time.sleep(0.2)
except Exception as e:
print("删除失败,", res.json, idCustomer, phone, e)
@staticmethod
def delete_cars(data):
"""
删除客户车辆信息
参数:
data (dict): 包含用户名、密码和公司名称的字典。
- username (str): 用户名
- password (str): 密码
- company_name (str): 公司名称
返回:
功能描述:
1. 使用提供的用户名、密码和公司名称登录系统。
2. 如果登录成功,获取登录后的 cookies 和操作组织 ID。
3. 发送 POST 请求获取客户车辆信息列表。
4. 遍历车辆信息列表,逐个删除客户车辆信息。
5. 每次删除操作后,打印删除成功或失败的信息,并暂停 0.2 秒。
"""
username = data['username']
password = data['password']
company_name = data['company_name']
res = F6Module.login_in(username, password, company_name)
if res is not None:
cookies = requests.utils.dict_from_cookiejar(res.cookies)
operateOrgId = cookies['prodOrg']
url = "https://yunxiu.f6car.cn/member/car/carListForPermission" # 获取客户车辆信息列表
header = {
'Referer': 'https://yunxiu.f6car.cn/erp/view/index.html',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0'
}
data = {"pageSize": 10, "pageNo": 2} # 根据需求修改
res = requests.post(url=url, cookies=cookies, json=data, headers=header) # 注意需要用json发送数据
data = res.json()
for item in data['data']['data']:
carId = item['tmCarInfo']['pkId']
customerId = item['tmCustomerInfo']['pkId']
try:
url = f"https://yunxiu.f6car.cn/macan/coupon/car/batchRemove?customerId={customerId}&carId={carId}&operateOrgId={operateOrgId}" # 删除客户车辆信息url
res = requests.delete(url, cookies=cookies) # 客户车辆信息删除
print("删除成功:", res.json, customerId, carId)
time.sleep(0.2)
except Exception as e:
print("删除失败:", res.json, customerId, operateOrgId, e)
+232
View File
@@ -0,0 +1,232 @@
import requests
import hashlib
from urllib.parse import quote
from datetime import datetime
from api import API
from typing import Optional, Dict
api_instance = API()
class F6Module:
@staticmethod
def login_in(username: str, password: str, company_name: str = '默认门店') -> Optional[requests.Response]:
"""
登录模块。
此方法用于处理用户的登录请求,包括认证和选择门店的整个过程。
Args:
username (str): 用户名。
password (str): 密码。
company_name (str, optional): 默认为 '默认门店',表示用户要登录的门店名称。
Returns:
Response or None: 如果登录成功,返回包含登录信息的响应对象;如果登录失败,返回 None。
"""
url = "https://yunxiu.f6car.com/kzf6/login/confirm"
session = requests.Session()
header = {
'Referer': url,
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/129.0.0.0'
}
data = {
'username': username,
'password': hashlib.md5(password.encode('utf-8')).hexdigest(),
}
try:
res = session.post(url=url, headers=header, data=data)
res_json = res.json()
if res_json.get("data") is None:
return res
else:
group_id = ''
for group in res_json.get('data', []):
if group["groupName"] == company_name:
group_id = group.get("groupId")
token = quote(res_json['token']) # URL 编码
url = f'https://yunxiu.f6car.cn/kzf6/user/loginAfterChooseGroup?token={token}&groupId={group_id}&macAddress='
res1 = session.get(url, cookies=res.cookies)
return res1
except Exception as e:
print(f"Error during login: {e}")
return None
def accept_login_message(self, data: Dict[str, str]) -> Dict[str, str]: # 获取登录信息
"""
获取登录信息。
此方法用于处理用户的登录请求,验证登录信息的正确性,并返回相应的登录状态。
Args:
data (dict): 包含用户名、密码和门店名称的字典。
Returns:
dict: 包含登录状态的消息字典。
"""
username = data['username']
password = data['password']
company_name = data['company_name']
res = self.login_in(username, password, company_name)
if res is not None:
cookies = requests.utils.dict_from_cookiejar(res.cookies)
json = res.json()
url = 'https://yunxiu.f6car.cn/hive/company/getGroupName'
res1 = requests.get(url=url, cookies=cookies)
data1 = res1.json()
if data1['code'] == 200:
if data1['data'] == company_name:
if json['status'] == 'success':
json['status'] = '登录成功'
elif json['status'] == 'Error':
json['status'] = '登录失败,请检查账号密码'
else:
json['status'] = '公司名称不正确,请重试'
else:
json['status'] = '请输入正确的账号密码并选择公司名称'
return json
else:
return {"status": "登录失败,请检查公司名称"}
def get_company_information(self, data: Dict[str, str]) -> Dict[str, str]: # 获取登录账号下公司信息
"""
获取登录账号下公司信息。前端事件触发
此方法用于处理用户的登录请求,获取与该用户关联的所有门店名称。
Args:
data (dict): 包含用户名和密码的字典。
Returns:
dict: 包含门店信息的消息字典。如果成功获取门店信息,返回门店列表;如果失败,返回错误消息。
"""
username = data['username']
password = data['password']
timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print(username)
url = "https://yunxiu.f6car.com/kzf6/login/confirm"
session = requests.Session()
header = {
'Referer': url,
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0'
}
data = {
'username': username,
'password': hashlib.md5(password.encode('utf-8')).hexdigest(),
}
try:
res = session.post(url=url, headers=header, data=data)
res_json = res.json()
if res_json.get('message') == '请输入图形验证码':
pass
# 添加公司名称映射表单ID应用ID
jiandaoyun_data = {'api_key': '6694d3c4fcb69ca9a111a6c4', 'entry_id': '6736e2112ad50045f041a827'}
if res_json.get("data") is None:
print('单店')
res = self.login_in(username, password)
if res is not None:
cookies = requests.utils.dict_from_cookiejar(res.cookies)
url = 'https://yunxiu.f6car.cn/hive/company/getGroupName' # 获取门店名称url
res = requests.get(url=url, cookies=cookies)
data = res.json()
store_name = data['data'] # 获取店铺名称
jiandaoyun_data['data_list'] = [
{"_widget_1731650067055": {"value": f'{username}{password}{timestamp}'}, # 公司名称映射表 公司名称唯一值
"_widget_1731650067056": {"value": f"{store_name}"}}] # 公司名称映射表 公司名称
api_instance.entry_data_batch_create(jiandaoyun_data) # 调用api写入数据
res = {'msg': f'{username}{password}{timestamp}'}
else:
jiandaoyun_data_list = []
for group in res_json.get('data', []):
append_data = {"_widget_1731650067055": {"value": f'{username}{password}{timestamp}'},
"_widget_1731650067056": {"value": f"{group['groupName']}"}}
jiandaoyun_data_list.append(append_data)
jiandaoyun_data['data_list'] = jiandaoyun_data_list
res = api_instance.entry_data_batch_create(jiandaoyun_data) # 调用api写入数据
print(res)
res = {'msg': f'{username}{password}{timestamp}'}
return res
except Exception as e:
print(f"获取公司名称失败: {e}")
res = {'msg': '获取公司名称失败,请重新获取'}
return res
def get_store_information(self, data: Dict[str, str]) -> Dict[str, str]: # 获取登录公司下门店信息
"""
获取门店登录信息。前端事件触发
此方法用于处理用户的登录请求,获取与该用户关联的所有门店名称,并将这些信息连同用户登录信息一并记录至简道云表单中。
Args:
data (dict): 包含用户名、密码及公司名称的字典。
Returns:
dict: 包含操作结果的消息字典。如果成功获取门店信息并记录至简道云,返回成功消息及记录信息;如果登录或数据记录过程中出现错误,返回相应的错误消息。
"""
username = data['username']
password = data['password']
company_name = data['company_name']
timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
login_response = self.login_in(username, password, company_name) # 尝试登录
if login_response is None:
return {'msg': '未执行', 'msg_details': '登录失败'}
# 获取登录cookies
cookies = requests.utils.dict_from_cookiejar(login_response.cookies)
# 获取门店列表
url = 'https://yunxiu.f6car.cn/hive/org/getPageOrgGroupMembers?currentPage=1&pageSize=100&name=' # 获取门店名称url
res = requests.get(url=url, cookies=cookies)
data = res.json()
org_lists = data['data']['list']
# print(org_lists)
# noinspection SpellCheckingInspection
jiandaoyun_data = {'api_key': '6694d3c4fcb69ca9a111a6c4',
'entry_id': '673c38ccca57a5cf266eb18c'} # 添加公司名称映射表单ID应用ID
jiandaoyun_data_list = []
for org in org_lists:
append_data = {"_widget_1731999948708": {"value": f'{username}{password}{company_name}{timestamp}'},
# 公司门店映射表 公司名称唯一值
"_widget_1731999948709": {"value": f"{org['orgName']}"}} # 公司门店映射表 公司名称
jiandaoyun_data_list.append(append_data)
jiandaoyun_data['data_list'] = jiandaoyun_data_list
api_instance.entry_data_batch_create(jiandaoyun_data) # 调用api写入数据
res = {'msg': f'{username}{password}{company_name}{timestamp}'}
return res
@staticmethod
def get_keep_heart(data: Dict[str, str]) -> Dict[str, str]:
"""
获取心跳信息。前端事件触发
Args:
data(dict):包含心跳内容
returns:
dict:返回心跳内容
"""
return data
+17
View File
@@ -0,0 +1,17 @@
from api import API
import pandas as pd
from config import Config
api_instance = API()
df = pd.read_excel(r"C:\Users\Administrator.DESKTOP-7IC2USJ\Downloads\接车宝日常回访单_20250313091252.xlsx",
sheet_name='Sheet3', dtype='string')
for index,row in df.iterrows():# 在列表格式固定
data_id = row['data_id']
delete_data = {"api_key": Config.EFFICIENT_CAR_PICKUP_APP_ID,
"entry_id": "67174710da507490d8ac12c1",
"data_id": data_id}
api_instance.entry_data_delete(delete_data)