""" F6 后台执行模块 本模块提供 F6 插件相关的功能,包括: - 文件上传和校验 - 品牌批量创建 - 历史记录删除 - 客户信息管理 - 车辆信息管理 - 项目信息批量启停 - 材料信息批量修改 依赖: - requests: HTTP 请求 - pandas: Excel 文件处理 - threading: 后台任务处理 """ import requests from urllib.parse import quote import pandas as pd import os import urllib.parse from datetime import datetime from typing import Optional, Dict, Any, Tuple import threading from app.api import API from app.config import Config from app.module.module import F6Module from app.tasks.brand_tasks import create_brand_background from app.tasks.delete_tasks import ( delete_history_background, delete_customer_background, delete_car_background ) from app.tasks.material_tasks import ( batch_disable_projects, batch_modify_materials, ) from app.tasks.customer_tasks import modify_customer_info_background from app.tasks.bi_tasks import bi_task_background # 简道云 API 实例,用于调用简道云 API api_instance = API() class F6PluginModule: """ F6 插件模块类 提供 F6 插件相关的所有功能,包括文件处理、品牌管理、数据删除等。 """ @staticmethod def accept_file(data: Dict[str, Any]) -> Tuple[Optional[str], Dict[str, Any]]: """ 接收文件 处理前端上传的文件,下载文件并保存到指定目录。 此方法用于处理前端上传的文件,下载文件并保存到指定目录。主要步骤包括: 1. 处理前端传递的数据,获取文件的URL。 2. 解析URL以获取文件名。 3. 根据当前时间生成新的文件名,以避免文件名冲突。 4. 下载文件并保存到指定目录。 5. 返回文件保存路径和处理后的数据。 Args: data (dict): 包含文件URL和其他必要信息的字典。 Returns: tuple: 包含文件保存路径和处理后的数据的元组。如果文件保存成功,返回保存路径和数据;如果失败,返回 None 和数据。 """ data = api_instance.entry_data_get(data=data, replace=True) print(data) try: # 安全地访问附件信息 data_dict = data.get('data', {}) attachments = data_dict.get('附件', []) if not attachments or len(attachments) == 0: print('上传url未读取到,或无上传文件: 附件列表为空') save_path = '' return save_path, data first_attachment = attachments[0] url = first_attachment.get('url') if not url: print('上传url未读取到,或无上传文件: URL为空') save_path = '' return save_path, data print(url) except (KeyError, IndexError, TypeError) as e: print(f'上传url未读取到,或无上传文件:{e}') save_path = '' return save_path, data parsed_url = urllib.parse.urlparse(url) query_params = urllib.parse.parse_qs(parsed_url.query) attname = query_params.get('attname', [''])[0] filename = urllib.parse.unquote(attname) # 获取当前时间并格式化为指定格式的字符串 timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S") # 分离文件名和扩展名 name_part, ext_part = filename.rsplit('.', 1) if '.' in filename else (filename, '') # 构建新文件名 new_filename = f"{name_part}{timestamp}.{ext_part}" if ext_part else f"{name_part}{timestamp}" save_path = os.path.join(Config.SAVE_DIRECTORY, new_filename) print(save_path) response = requests.get(url, stream=True) if response.status_code == 200: with open(save_path, 'wb') as file: for chunk in response.iter_content(chunk_size=1024): if chunk: file.write(chunk) return save_path, data else: return None, data def check_file(self, data: Dict[str, Any]) -> dict[str, str] | None: # 校验上传文件 """ 校验上传文件。 此方法负责接收前端上传的文件,并根据文件类型和操作指令进行相应的校验。主要步骤包括: 1. 调用 `accept_file` 方法处理前端传递的数据,获取文件保存路径和处理后的数据。 2. 根据数据中的 `Action` 字段判断需要执行的操作类型。 3. 如果文件保存路径有效,继续执行以下步骤: - 如果操作类型为 `create_brand`,则读取文件和模板文件,校验文件格式是否正确。 - 如果文件格式正确,返回成功消息;否则返回错误消息。 4. 如果文件保存路径无效,返回相应的错误消息。 5. 如果读取文件过程中发生异常,捕获异常并返回错误消息。 Args: data (dict): 前端请求发送过来的数据,包含文件信息和其他必要参数。 Returns: dict: 包含文件校验结果的消息字典。如果校验成功,则返回文件路径和校验标志;如果失败,则返回错误消息。 """ save_path, data1 = self.accept_file(data) # 安全地获取 Action 字段 data_dict = data1.get('data', {}) action = data_dict.get('Action(隐藏)') if not action: return {'msg': '缺少Action字段,无法校验文件'} if save_path: try: if action == 'create_brand': df1 = pd.read_excel(save_path, sheet_name=0) if "品牌" in df1.columns[0]: # 校验表头名字 print('文件校验成功') return {'msg': f'{save_path}', 'check': '是'} else: print("'msg':'文件上传格式错误'") return {'msg': '文件上传格式错误'} elif action == 'modify_customer_info': df = pd.read_excel(save_path, sheet_name=0) if "客户手机号" in df.columns[0]: # 校验表头名字 print('文件校验成功') return {'msg': f'{save_path}', 'check': '是'} else: print("'msg':'文件上传格式错误'") return {'msg': '文件上传格式错误'} elif action == 'delete_cars': pass else: pass except Exception as e: return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'} else: return {'msg': '当前节点无附件上传', 'check': '是'} @staticmethod def create_brand(data: Dict[str, Any]) -> Dict[str, str]: """ 创建品牌 从简道云获取品牌创建请求,读取 Excel 文件,并在后台线程中批量创建品牌。 立即返回"正在执行"的提示,实际创建在后台线程中执行。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 Returns: Dict[str, str]: 包含执行状态的字典,{'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'} """ entry_data = api_instance.entry_data_get(data=data, replace=True) print('执行 品牌批量新建') username = entry_data['data']['账号'] password = entry_data['data']['密码'] company_name = entry_data['data']['公司名称'] save_path = entry_data['data']['文件保存地址'] login_response = F6Module.login_in(username, password, company_name) if login_response is None: return {'msg': '登录失败'} try: df = pd.read_excel(save_path, sheet_name=0, dtype='string') except Exception as e: return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'} cookies = requests.utils.dict_from_cookiejar(login_response.cookies) try: thread = threading.Thread(target=create_brand_background, args=(data, cookies, df, save_path)) thread.start() except Exception as e: print(f'创建线程失败: {str(e)}') return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'} @staticmethod def delete_history(data: Dict[str, Any]) -> Dict[str, str]: """ 删除历史记录 从简道云获取删除历史记录请求,在后台线程中删除指定门店的历史维修记录。 立即返回"正在执行中"的提示,实际删除在后台线程中执行。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 Returns: Dict[str, str]: 包含执行状态的字典 """ entry_data = api_instance.entry_data_get(data=data, replace=True) username = entry_data['data']['账号'] password = entry_data['data']['密码'] company_name = entry_data['data']['公司名称'] org_name = entry_data['data']['门店名称'] login_response = F6Module.login_in(username, password, company_name) if login_response is None: return {'msg': '未执行', 'msg_details': '登录失败'} cookies = requests.utils.dict_from_cookiejar(login_response.cookies) url = 'https://yunxiu.f6car.cn/hive/org/getPageOrgGroupMembers?currentPage=1&pageSize=1000&name=' res = requests.get(url=url, cookies=cookies) store_data = res.json() org_lists = store_data['data']['list'] org_id = '' org_name1 = '' for org in org_lists: org_name1 = org['orgName'] if org_name == org['orgName']: org_id = org['orgId'] if org_id: thread = threading.Thread(target=delete_history_background, args=(data, cookies, org_id, org_name1)) thread.start() return {'msg': '正在执行中', 'msg_details': '请稍后查看结果'} else: return {'msg': '未执行', 'msg_details': '门店信息错误'} @staticmethod def delete_customer(data): """ 删除客户 从简道云获取删除客户请求,在后台线程中批量删除客户信息。 立即返回"正在执行中"的提示,实际删除在后台线程中执行。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 Returns: Dict[str, str]: 包含执行状态的字典 """ print('执行 删除客户') entry_data = api_instance.entry_data_get(data=data, replace=True) username = entry_data['data']['账号'] password = entry_data['data']['密码'] company_name = entry_data['data']['公司名称'] res = F6Module.login_in(username, password, company_name) print(res.json()) if res is not None: cookies = requests.utils.dict_from_cookiejar(res.cookies) url = "https://yunxiu.f6car.cn/member/customer/listForPermission?pageSize=100&pageNo=1" res = requests.get(url, cookies=cookies) total = res.json().get('data', {}).get('total', 0) if total: total = int(total) thread = threading.Thread(target=delete_customer_background, args=(data, cookies, total,)) thread.start() return {'msg': '正在执行中', 'msg_details': f'总计{total}条数据,8-20点3.5s一条数据,其余时间1.5s一条数据'} else: return {'msg': '未执行', 'msg_details': '无客户信息'} else: return {'msg': '未执行', 'msg_details': '登录失败'} @staticmethod def delete_cars(data): """ 删除车辆 从简道云获取删除车辆请求,在后台线程中批量删除客户车辆信息。 立即返回"正在执行中"的提示,实际删除在后台线程中执行。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 Returns: Dict[str, str]: 包含执行状态的字典 """ entry_data = api_instance.entry_data_get(data=data, replace=True) username = entry_data['data']['账号'] password = entry_data['data']['密码'] company_name = entry_data['data']['公司名称'] res = F6Module.login_in(username, password, company_name) if res is not None: cookies = requests.utils.dict_from_cookiejar(res.cookies) url = "https://yunxiu.f6car.cn/member/car/carListForPermission" header = { 'Referer': 'https://yunxiu.f6car.cn/erp/view/index.html', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0' } json_data = {"pageSize": 100, "pageNo": 1} res = requests.post(url=url, cookies=cookies, json=json_data, headers=header) res_data = res.json() total_items = int(res_data["data"]['total']) all_page = total_items // 100 + (total_items % 100 > 0) if res_data: thread = threading.Thread(target=delete_car_background, args=(data, url, cookies, header, all_page)) thread.start() return {'msg': '正在执行中', 'msg_details': '8-20点3.5s一条数据,其余时间1.5s一条数据'} else: return {'msg': '未执行', 'msg_details': '无客户车辆信息'} else: return {'msg': '未执行', 'msg_details': '登录失败'} @staticmethod def modify_customer_info(data: Dict[str, str]): """ 修改客户信息 从简道云获取修改客户信息请求,读取 Excel 文件,并在后台线程中批量修改客户信息。 立即返回"正在执行中"的提示,实际修改在后台线程中执行。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 Returns: Dict[str, str]: 包含执行状态的字典 """ entry_data = api_instance.entry_data_get(data=data, replace=True) username = entry_data['data']['账号'] password = entry_data['data']['密码'] company_name = entry_data['data']['公司名称'] save_path = entry_data['data']['文件保存地址'] login_response = F6Module.login_in(username, password, company_name) if login_response is None: return {'msg': '未执行', 'msg_details': '登录失败'} cookies = requests.utils.dict_from_cookiejar(login_response.cookies) try: df = pd.read_excel(save_path, sheet_name=0, dtype='string') except Exception as e: return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'} if cookies: thread = threading.Thread(target=modify_customer_info_background, args=(data, cookies, df, save_path)) thread.start() return {'msg': '正在执行中', 'msg_details': '请稍后查看结果'} else: return {'msg': '未执行', 'msg_details': 'cookies获取失败'} @staticmethod def disable_projects(data: Dict[str, Any]) -> Dict[str, str]: """ 项目批量启停 从简道云获取项目批量启停请求,读取 Excel 文件,并在后台线程中批量启停项目。 立即返回"正在执行"的提示,实际创建在后台线程中执行。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 Returns: Dict[str, str]: 包含执行状态的字典,{'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'} """ entry_data = api_instance.entry_data_get(data=data, replace=True) print('执行 项目批量停用/启用') username = entry_data['data']['账号'] password = entry_data['data']['密码'] company_name = entry_data['data']['公司名称'] save_path = entry_data['data']['文件保存地址'] option = entry_data['data']['项目材料批量操作'] login_response = F6Module.login_in(username, password, company_name) if login_response is None: return {'msg': '登录失败'} try: df = pd.read_excel(save_path, sheet_name=0, dtype='string') except Exception as e: return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'} cookies = requests.utils.dict_from_cookiejar(login_response.cookies) try: thread = threading.Thread(target=batch_disable_projects, args=(data, cookies, df, save_path,option)) thread.start() except Exception as e: print(f'创建线程失败: {str(e)}') return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'} @staticmethod def disable_material(data: Dict[str, Any]) -> Dict[str, str]: """ 材料批量启停 从简道云获取材料批量启停请求,读取 Excel 文件,并在后台线程中批量启停材料。 立即返回"正在执行"的提示,实际创建在后台线程中执行。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 Returns: Dict[str, str]: 包含执行状态的字典,{'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'} """ entry_data = api_instance.entry_data_get(data=data, replace=True) print('执行 材料批量停用/启用') username = entry_data['data']['账号'] password = entry_data['data']['密码'] company_name = entry_data['data']['公司名称'] save_path = entry_data['data']['文件保存地址'] option = entry_data['data']['项目材料批量操作'] login_response = F6Module.login_in(username, password, company_name) if login_response is None: return {'msg': '登录失败'} try: df = pd.read_excel(save_path, sheet_name=0, dtype='string') except Exception as e: return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'} cookies = requests.utils.dict_from_cookiejar(login_response.cookies) try: thread = threading.Thread(target=batch_modify_materials, args=(data, cookies, df, save_path, option)) thread.start() except Exception as e: print(f'创建线程失败: {str(e)}') return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'} @staticmethod def bi_task(data: Dict[str, Any]) -> Dict[str, str]: """ BI任务 从简道云获取BI任务请求,读取 Excel 文件(如果需要),并在后台线程中执行BI任务。 立即返回"正在执行"的提示,实际执行在后台线程中完成。 Args: data: 包含表单ID(api_key)、表单ID(entry_id)、数据ID(data_id)的字典 Returns: Dict[str, str]: 包含执行状态的字典,{'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'} """ entry_data = api_instance.entry_data_get(data=data, replace=True) print('执行 BI任务') # 获取必要的参数(根据实际需求调整) username = entry_data['data'].get('账号') password = entry_data['data'].get('密码') company_name = entry_data['data'].get('公司名称') save_path = entry_data['data'].get('文件保存地址') # 如果需要登录F6系统 cookies = None if username and password and company_name: login_response = F6Module.login_in(username, password, company_name) if login_response is None: return {'msg': '登录失败', 'msg_details': '无法登录F6系统'} cookies = requests.utils.dict_from_cookiejar(login_response.cookies) # 如果需要读取Excel文件 df = None if save_path: try: df = pd.read_excel(save_path, sheet_name=0, dtype='string') except Exception as e: return {'msg': f'读取Excel文件失败: {str(e)},文件路径:{save_path}'} # 启动后台线程执行BI任务 try: thread = threading.Thread(target=bi_task_background, args=(data, cookies, df, save_path)) thread.start() except Exception as e: print(f'创建线程失败: {str(e)}') return {'msg': '任务启动失败', 'msg_details': f'无法启动后台任务: {str(e)}'} return {'msg': '正在执行', 'msg_details': '正在执行,请稍后看结果'}