from datetime import datetime, timezone, timedelta, date, UTC import time from typing import Optional, List, Dict, Any import requests import json import pandas as pd token = "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN" class API: def entry_data_list(self, data: dict, replace: bool = False, max_retries: int = 20) -> Dict: # 获取多条表单数据 """ 获取多条表单数据 :param max_retries: 最大重试次数 :param replace: 是否替换字段 :param data: api_key: 应用id entry_id: 表单id :return: """ url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list' headers = { 'Authorization': token, # 曹伟应用api测试 app_key 'Content-Type': 'application/json' } all_data_batches = [] # 用于存储每次请求返回的数据批次 last_data_id = None exit_flag = False while True: payload = json.dumps({ "app_id": data['api_key'], # 应用ID "entry_id": data['entry_id'], # 表单ID "filter": data.get('filter', {}), "limit": 100, "data_id": last_data_id }) retries = 0 while retries <= max_retries: try: res = requests.post(url=url, data=payload, headers=headers, timeout=10) res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常 data_get = res.json() if data_get["data"]: all_data_batches.extend(data_get['data']) last_data_id = data_get['data'][-1].get('_id') break # 成功则跳出循环 else: if 'data' not in data_get or len(data_get['data']) == 0: exit_flag = True break retries += 1 time.sleep(0.1) # 在重试之间稍作停顿 except requests.exceptions.RequestException as e: retries += 1 time.sleep(0.1) # 在重试之间稍作停顿 if retries > max_retries: all_data_batches.append(None) # 或者可以选择记录失败的payload以便后续处理 if exit_flag: break # 构建最终返回的字典 final_data = { 'data': all_data_batches # 'data' 键对应的值是列表的列表 } return final_data @staticmethod def workflow_instance_get(data: dict, max_retries: int = 20) -> dict: """ 查询实例流程信息 :param max_retries: :param data: 简道云插件发送过来的data,包含应用id :return: 查询简道云流程实例信息返回的结果 """ url = 'https://api.jiandaoyun.com/api/v5/workflow/instance/get' headers = { 'Authorization': token, # 曹伟应用api测试 appKey 'Content-Type': 'application/json' } payload = json.dumps({ "instance_id": data['data_id'], "tasks_type": 1 } ) data_get = None retries = 0 while retries <= max_retries: try: res = requests.post(url=url, data=payload, headers=headers, timeout=10) res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常 data_get = res.json() # print( "返回结果:", data_get) if res.status_code == 200: break # 成功则跳出循环 else: retries += 1 time.sleep(3) # 在重试之间稍作停顿 except requests.exceptions.RequestException as e: retries += 1 time.sleep(0.1) # 在重试之间稍作停顿 if retries > max_retries: break return data_get @staticmethod def workflow_task_approve(data: dict) -> dict: """ 流程待办提交 :param data:应包含username、instance_id(data_id)、task_id等信息 :return:返回简道云流程待办提交的结果 """ url = 'https://api.jiandaoyun.com/api/v1/workflow/task/approve' headers = { 'Authorization': token, # 曹伟应用api测试 appKey 'Content-Type': 'application/json' } payload = json.dumps({ "username": data["username"], "instance_id": data["instance_id"], "task_id": data['task_id'], "comment": "" } ) res = requests.post(url=url, data=payload, headers=headers, timeout=10) return res.json() @staticmethod def data_batch_create(data: dict, max_retries: int = 5) -> Optional[requests.Response]: # 新建单条数据 """ 新建单条表单数据 :param max_retries: 最大重试次数 :param data: 应该包含应用id、表单id,以及新建的数据data['data'] :return: 返回创建后简道云返回的信息 """ url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/create' headers = { 'Authorization': token, # 曹伟应用api测试 app_key 'Content-Type': 'application/json' } """ data 样式 # 后续优化发送数据样式 目前输入字段,后续优化输入表单名称 jiandaoyun_data['data'] = {"_widget_1731650067055":{"value":f'{username}{password}'}, "_widget_1731650067056":{"value": f"{group}"}} """ # noinspection DuplicatedCode payload = json.dumps({ "app_id": data['api_key'], # 应用ID "entry_id": data['entry_id'], # 表单ID "data": data['data'], "is_start_workflow": data.get('is_start_workflow', "false"), "is_start_trigger": data.get('is_start_trigger', "false"), "transaction_id": data.get('transaction_id', "") } ) retries = 0 while retries <= max_retries: try: res: requests.Response = requests.post(url=url, data=payload, headers=headers, timeout=10) res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常 data_get = res.json() if res.status_code == 200: return data_get else: retries += 1 time.sleep(3) # 在重试之间稍作停顿 except requests.exceptions.RequestException as e: retries += 1 time.sleep(3) # 在重试之间稍作停顿 if retries > max_retries: print( f"任务 {data['data_list']} 连续{max_retries}次请求失败,放弃此次请求。") return None @staticmethod def entry_data_update(data: dict, max_retries: int = 20) -> dict: # 修改数据 """ 修改数据 :param max_retries: 最大重试次数,此处设置100次 :param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息 :return: 修改数据后简道云返回的结果 """ url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/update' headers = { 'Authorization': token, # 曹伟应用api测试 appKey 'Content-Type': 'application/json' } payload = json.dumps({ "app_id": data['api_key'], # 应用ID "entry_id": data['entry_id'], # 表单ID "data_id": data['data_id'], # 数据ID "data": data['data'] } ) data_get = None retries = 0 while retries <= max_retries: try: res: requests.Response = requests.post(url=url, data=payload, headers=headers, timeout=10) res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常 data_get = res.json() # print("返回结果:", data_get) # print(res.status_code) if res.status_code == 200: break # 成功则跳出循环 else: retries += 1 time.sleep(1) # 在重试之间稍作停顿 except requests.exceptions.RequestException as e: retries += 1 time.sleep(2) # 在重试之间稍作停顿 if retries > max_retries: print(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。") continue return data_get class BossPermissionAutoApproval: """ Boss权限自动审批 """ def __init__(self): self.boss_perm_switch_data = None self.task_data = None self.boss_switch_fields_map = { "权限功能": "_widget_1757483372023", "一级分类": "_widget_1757554068594", "二级分类": "_widget_1757553855187", "权限": "_widget_1757483372025", "权限开关": "_widget_1757483372026", } self.task_data_map = { "当日新签开户": "_widget_1754896018130", "新开户门店名": "_widget_1754896018132", "新开户门店编码": "_widget_1754896018133", "关联数据": "_widget_1751522231978", "公司名": "_widget_1751521185014", "门店名": "_widget_1751521185015", "门店编码": "_widget_1751521185016", "分类": "_widget_1751521319795", "权限功能": "_widget_1751521319796", "申请理由": "_widget_1751599915625", "处理人": "_widget_1751521319797", "提交人": "_widget_1751596512100", "需开项目-图片": "_widget_1751521386658", "需开项目-说明": "_widget_1751600131107", "是否开通": "_widget_1751522755921", "拒绝原因": "_widget_1754895638981", "完成时间": "_widget_1751521573695", "是否新功能": "_widget_1755137980288", "流水号": "_widget_1751596707486", } def load_data(self): self.task_data = API().entry_data_list(data={ "api_key": "675b900991ad2491c69389ca", # 应用ID "entry_id": "6866179f6a73c0d879208bcc", # 表单ID "filter": { "rel": "and", "cond": [{"field": "flowState", "type": "flowstate", "method": "eq", "value": 0}] } }).get("data") # 获取最新权限列表 self.boss_perm_switch_data = API().entry_data_list(data={ "api_key": "675b900991ad2491c69389ca", # 应用ID "entry_id": "68c1116b51730ebbc690ae40" # 表单ID }).get("data") # 权限对应功能 self.get_cookies = API().entry_data_list(data={ "api_key": "675b900991ad2491c69389ca", # 应用ID "entry_id": "68c237d565832158571e4ff4" }).get("data") def map_switch_data(self): """ 映射功能开关 :return: df """ df = pd.DataFrame(self.task_data) reverse_map = {v: k for k, v in self.task_data_map.items()} df = df.rename(columns=reverse_map) # df.to_csv(fr"D:\Idea Project\F6+宜搭+其它(1)\张阳脚本\文件输出\门店开通权限.csv", index=False) df1 = pd.DataFrame(self.boss_perm_switch_data) boss_switch_reverse_map = {v: k for k, v in self.boss_switch_fields_map.items()} df1 = df1.rename(columns=boss_switch_reverse_map) # df1.to_csv(fr"D:\Idea Project\F6+宜搭+其它(1)\张阳脚本\文件输出\Boss权限对应功能.csv", index=False) all_data = [] # 获取门店编码与权限功能,并匹配对应的多条功能数据 for index, row in df.iterrows(): # 过滤出 审批节点 的数据 data_id = row["_id"] get_task_id = API().workflow_instance_get({"data_id": data_id}) flow_name = get_task_id.get("tasks")[-1].get("flow_name") if flow_name != "审批节点": continue # 确定门店编码 org_code = row["新开户门店编码"] if row["当日新签开户"] == "是" else row["门店编码"] # 获取当前行的权限功能 perm_functions = row["权限功能"] # 获取任务的数据id data_id = row["_id"] # 在df1中查找匹配的所有功能 matched_functions = df1[df1['权限功能'] == perm_functions] # 假设df1中有'权限功能'列 # 如果没有匹配项,至少添加权限功能本身 if matched_functions.empty: all_data.append({ "数据id": data_id, "门店编码": org_code, "权限功能": perm_functions, "一级分类": None # 或者可以设为perm本身 }) else: # 添加所有匹配的功能 for _, func_row in matched_functions.iterrows(): all_data.append({ "数据id": data_id, "门店编码": org_code, "权限功能": perm_functions, "一级分类": func_row["一级分类"], "二级分类": func_row["二级分类"], "权限": func_row["权限"], "开关": func_row["权限开关"] }) # 如果你想要将这些数据也保存为CSV result_df = pd.DataFrame(all_data) # result_df.to_csv(fr"D:\Idea Project\F6+宜搭+其它(1)\张阳脚本\文件输出\门店权限功能映射.csv", index=False) return result_df def get_company_id(self, df): """ 获取公司id :param df: 含门店编码的df :return: df """ if df is None or df.empty: if df is None: df = pd.DataFrame() if "company_id" not in df.columns: df["company_id"] = [] return df unique_codes = df["门店编码"].unique().tolist() # 初始化存储结果的字典 code_to_company_id = {} cookie_str = self.get_cookies[0].get("_widget_1757558743223") print(cookie_str) cookies = {} items = cookie_str.split('; ') for item in items: if '=' in item: key, value = item.split('=', 1) cookies[key.strip()] = value.strip() headers = { 'accept': 'application/json, text/plain, */*', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'content-type': 'application/json;charset=UTF-8', 'origin': 'https://manage.f6yc.com', 'priority': 'u=1, i', 'referer': 'https://manage.f6yc.com/', 'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Microsoft Edge";v="140"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0', 'x-requested-with': 'XMLHttpRequest', # 'cookie': 'hive-adminSESSIONID=91f4217e-3007-4762-adc5-afc2c4d11bdd; _yg_prod=EkX_f7K7eYt61spccZtpCE7FHwA2I5PROPsPCa8-iC3ASlLYfszIPQQqjcJjPPEZL0J2pO07cfp8VG-4XSfqBfYzvRdjXVIjT2D3fZ_a80KTwgID0oqvVma_W4j_-PTD1I79TjPNGx-FjivcHXdACpJVZSu_NgJB; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%222268275546837446%22%2C%22first_id%22%3A%221989d4783646e1-07c9f1a149173e8-4c657b58-2073600-1989d4783651ade%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E7%9B%B4%E6%8E%A5%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC_%E7%9B%B4%E6%8E%A5%E6%89%93%E5%BC%80%22%2C%22%24latest_referrer%22%3A%22%22%7D%2C%22%24device_id%22%3A%221989d4783646e1-07c9f1a149173e8-4c657b58-2073600-1989d4783651ade%22%7D', } for code in unique_codes: json_data = { 'currentPage': 1, 'pageSize': 10, 'licenseNo': '', 'licenseName': '', 'groupName': '', 'code': code, 'name': '', 'orgId': '', 'busiType': '', 'saasVersion': '', 'saasStatus': '', 'province': '', 'city': '', 'area': '', 'orgStatus': 0, 'orgScale': '', 'mainOrgOnly': 0, 'saasEdition': '', 'storeId': '', 'franchiseGroupId': '', 'minCreateTime': '', 'maxCreateTime': '', 'serviceImplPrincipalList': [], 'containTest': True, } try: response = requests.post( 'https://manage.f6yc.com/hive-admin/org/getAllOrgList', cookies=cookies, headers=headers, json=json_data, timeout=10 ) response.raise_for_status() # 检查请求是否成功 # 兼容废弃与停用门店 if len(response.json().get("data").get("records",[]))==0: json_data['orgStatus'] = 1 # 停用 response = requests.post( 'https://manage.f6yc.com/hive-admin/org/getAllOrgList', cookies=cookies, headers=headers, json=json_data, timeout=10 ) if len(response.json().get("data").get("records",[]))==0: json_data['orgStatus'] = 2 # 废弃 response = requests.post( 'https://manage.f6yc.com/hive-admin/org/getAllOrgList', cookies=cookies, headers=headers, json=json_data, timeout=10 ) # 提取company_id并存储到字典 company_id = response.json()['data']['records'][0]['groupId'] code_to_company_id[code] = company_id print(f"成功查询: 门店编码 {code} -> company_id {company_id}") except Exception as e: print(f"查询失败: 门店编码 {code}, 错误: {str(e)}") code_to_company_id[code] = None # 失败时存储None df['company_id'] = df['门店编码'].map(code_to_company_id) # df.to_csv(fr"D:\Idea Project\F6+宜搭+其它(1)\张阳脚本\文件输出\门店含id权限功能映射.csv", index=False) return df def send_task_error(self, task_start_time: str, task_name: str, error_message: str, df: pd.DataFrame = None) -> None: """ 将任务失败情况发送到简道云(影响业务数据时调用) :param df: 失败文件 :param task_start_time: 任务开始时间(字符串格式:"%Y-%m-%d %H:%M:%S",表示北京时间 UTC+8) :param task_name: 任务名称 :param error_message: 失败详情 """ try: # 1. 获取当前 UTC 时间(时区感知对象) end_time_utc = datetime.now(UTC) # ✅ 替代 utcnow() # 2. 解析传入的北京时间(UTC+8) task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S") # 3. 转换为 UTC 时间(减去 8 小时,并附加 UTC 时区) task_start_utc = task_start_naive - timedelta(hours=8) task_start_utc = task_start_utc.replace(tzinfo=timezone.utc) # 显式标记为 UTC # 4. 计算运行时间(时区感知对象可直接相减) run_time = end_time_utc - task_start_utc run_time_sec = int(run_time.total_seconds()) # 5. 格式化时间为 UTC 的 ISO 8601 格式(带 "Z") today_utc = end_time_utc.strftime("%Y-%m-%d") task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ") task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ") # 6.上传附件 UUid = time.strftime("%Y%m%d%H%M%S", time.localtime()) if df is not None: df.to_excel("upload_file.xlsx", index=False) file_path = "upload_file.xlsx" up_data = API().get_upload_token( {"api_key": "6694d3c4fcb69ca9a111a6c4", "entry_id": "689ae65da00c17578e27cd74", "transaction_id": UUid}) upload_url = up_data.get("upload_url") upload_token = up_data.get("upload_token") upload_result = API().upload_file( {"upload_url": upload_url, "upload_token": upload_token, "file_path": file_path}) upload_key = upload_result.get("key") else: upload_key = "" # 7. 构造请求数据(所有时间以 UTC 格式发送) payload = { "api_key": "6694d3c4fcb69ca9a111a6c4", "entry_id":"689ae65da00c17578e27cd74", "data": { "_widget_1744873387500": {"value": today_utc}, # UTC 日期 "_widget_1743644977694": {"value": task_name}, "_widget_1744873387501": {"value": task_start_iso}, # UTC 开始时间 "_widget_1744873387502": {"value": task_end_iso}, # UTC 结束时间 "_widget_1744873387504": {"value": run_time_sec}, "_widget_1754981992215": {"value": error_message}, # 错误信息 "_widget_1764830825356": {"value": [upload_key]} }, "transaction_id": UUid } # 8. 发送请求 response = API().data_batch_create(payload) except Exception as e: print(e) def send_task_status(self, task_start_time: str, task_name: str) -> None: """ 将任务状态发送到简道云(开始时间为北京时间,需转换到 UTC) :param task_start_time: 任务开始时间(字符串格式:"%Y-%m-%d %H:%M:%S",表示北京时间 UTC+8) :param task_name: 任务名称 """ try: # 1. 获取当前 UTC 时间(时区感知对象) end_time_utc = datetime.now(UTC) # ✅ 替代 utcnow() # 2. 解析传入的北京时间(UTC+8) task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S") # 3. 转换为 UTC 时间(减去 8 小时,并附加 UTC 时区) task_start_utc = task_start_naive - timedelta(hours=8) task_start_utc = task_start_utc.replace(tzinfo=timezone.utc) # 显式标记为 UTC # 4. 计算运行时间(时区感知对象可直接相减) run_time = end_time_utc - task_start_utc run_time_sec = int(run_time.total_seconds()) # 5. 格式化时间为 UTC 的 ISO 8601 格式(带 "Z") today_utc = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ") task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ") task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ") # 6. 构造请求数据(所有时间以 UTC 格式发送) payload = { "api_key": "6694d3c4fcb69ca9a111a6c4", "entry_id": "67ede908eb9c22261016466e", "data": { "_widget_1744873387500": {"value": today_utc}, # UTC 日期 "_widget_1743644977694": {"value": task_name}, "_widget_1744873387501": {"value": task_start_iso}, # UTC 开始时间 "_widget_1744873387502": {"value": task_end_iso}, # UTC 结束时间 "_widget_1744873387504": {"value": run_time_sec}, } } # 7. 发送请求 response = API().data_batch_create(payload) except Exception as e: print(e) def update_permission(self, df): if df is None or df.empty: return pd.DataFrame(columns=["数据id", "门店编码", "一级分类", "二级分类", "权限", "开关", "状态"]) cookie_str = self.get_cookies[0].get("_widget_1757558743223") cookies = {} items = cookie_str.split('; ') for item in items: if '=' in item: key, value = item.split('=', 1) cookies[key.strip()] = value.strip() headers = { 'accept': 'application/json, text/plain, */*', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'priority': 'u=1, i', 'referer': 'https://manage.f6yc.com/', 'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Microsoft Edge";v="140"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0', 'x-requested-with': 'XMLHttpRequest', # 'cookie': 'hive-adminSESSIONID=531d2ecb-893b-471c-8cf9-e76a9dcfa789; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%224210192048793363%22%2C%22first_id%22%3A%221989d4783646e1-07c9f1a149173e8-4c657b58-2073600-1989d4783651ade%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E7%9B%B4%E6%8E%A5%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC_%E7%9B%B4%E6%8E%A5%E6%89%93%E5%BC%80%22%2C%22%24latest_referrer%22%3A%22%22%7D%2C%22%24device_id%22%3A%221989d4783646e1-07c9f1a149173e8-4c657b58-2073600-1989d4783651ade%22%7D; _yg_prod=EkX_f7K7eYt61spccZtpCE7FGQAxK5DfP_4PB6A0jCrCT1LYfszIPQQqjcJjPPEZL0J2pO07cfp8VG-4XSfqBfYzvRdjXVIjT2D3fZ7X80GeyA8J24qvVma_W4j__vTE3I3zSTbNFRCPiizeGHdACpZXbli5aTRA; _oauth2_proxy=GdOm_1P1BJAoCcjMngUI40McDBpFqkL2vO_GTniJq-QGX6z9l3MieVkOjJJVJiDbEh5U-j6h3SmxsEjWuaOlfsJyWPx6muVdQOPavFTnd8TCr5-0Hz5xYOQncD1IqIGILgkejOJ3127iUvvu--dYjyqKiDwbtEGCN-Gb21Ik6hkb61C4OBppUra1XbJQ7kOXJnZdlBMe8FiMnlkYVOL8zJ1ZDLIzUcsyzq2s1ACRdVZFT8WU7JLhXKRvoiQSSm6qkCYPsuxGLudtHMeJibtvT8IGNp-jdvv8SF5gzomGhd1Abt_4YfCjiGznNbu421biQI866hEiPTjAZPai0EC-5E0VOi8NmfrBIxaqWx4ecIEGM6Z_8G-4ZWJVBNIv3v9AfUPANYR37yC-w8rOYYv9X2FV0YMkGqFoOKJWBnw4JbdxtdI-8UodAIQGUYVeJGLt6zOWWRNdzKeK57Z8X3Aki0CmLaP0SSUjoGW1pMJayMar94eQpzXszBVQxX_-_buXoDtXYg-u5q5jnQrXX3SLXC8W5JNY_sbDWm2ix7H2ezcxp2gypyyPnFMA5gRneE9wcLAWFtkb_YwT3Q_1KD4rUltqNBBV85KCOR77lxGXMYSEANPvJRW8mC9jg9ARD0JyvA1ZHWefjNU_Y_6RsKSiHrH5Cg5k8QU8skF-KgQrFUKGVfjWfpRfDO6IpR_z2lD-6qfwAshliwrFKyqJi1M91bdZuLR1dXubkgSNVXzyKkgjnGZ-4Ma9K8M3iLQwgBE1DoCRv92C-ie7sI3MmJ58vwu3OllkcOkiz7NjYzv1LKIIKTVEU7wgGe0SGKuEoRO2vRBOZcw_NUUW6s9SbwKleCXdsHkth2EFigiF9E7CGGdyw2G5qO1nx5nKIgEhhqPEBs6yq1uPEcza_BT_J5BauG_ffjpWrRR8TaoMFrmJFPdOf4JDON6FPukb4sbo_uANPG3tSpwNiefTv0E4QV7jNyNNOeGlhLxhcWyN2E6LZTOWH8lnVNUl6oLbSnrkcDGAV9Q76nebHhn5Yf6qJY4S1CceJwFlpIlooiz1bUWzt906c1UUcrFgxEydHMKbmScaG2S_dSjvfB76ZFgozWIGQiiZnd1bt8G7pc2VxMVW-szebBoeUf0HzPpzkej-ILRunt-E9Mpq-3afCOtbda0RfZWqR8cQsAt-cABPIdi4WT5v-Cg_jrjxBb8mueXbWZ5Q4gNPjsVmkfnpW5svBf8bKdChTbNyq_KOm2NV9itI6RSAVkIo6ANUBe9npnCs66v6hSQmmqjqbZ3H4GX1HiyT6VlFpTxT1xDry3tZsTZqw9lzvnT7Tin1lIxArQc0LnlZAoVbbLDJJlbfrXXcIF1kcClfXw5d_Smy_tfGOo5kYE9iTmfMOMh-8ZfhhjA43P4kMgSrYdQAvW1fHFMVQtqtHTmUiCSsKQ78Phh3-ampoI83iJooVzH34Yg21cv5wHpAJAn_wVC_Weto-bol3XXZkDHG0DWmoMGROViNkxc0vKxpMubwiG36E-U-1tQXsyhqqa6VqE_Vnn3fizk9m-ZgozyN-VlS5ALV6djm1wa-ISaLv9juHjQlfA0joY6ZLOxM6mcGsZliw9ZQ-mhiy1C9c3vOuPLW3ZWRCeJUX96BaA-5am-3Z3bjK1mp391cXle3idTEPFlMCKNVUUANNFwxeH0W6CKL4rcRkgiEnhWuBEADySvpfCFuYxojH2uOSfS8mCERpZ7KNEgPCIVs9VumX68w8QonxClCQLWG6JRAmfmCfYKi6UeeWWvsqQDPqQiMkowThSv_TmLgFh5baZsB9bJwFqojc6pOzM3ogHuHuY_67VsA8E_cmeeWvBzqWBV9umeciWq5fbC3Cqe_fAsHxtKb39D2vQyqWVjRk9qukzxBmcd8bXzKUV1r6lg0QgVm2PG8dyfAV_ymsR6Y4b_gtPWcVx83J5H8RgN43ofGXwdUFsqi5xnc_Bf-KO_psGgYImKnqU25ftV8eX0ss0K7jsyhafsWDwPSg2k7qHPNK2AbtLGZIQcRcjXe7efTICAeHOn9AJ4vUbU1UpO53WsY8gic1I7tEH0LKkg7Pz7rR7drDuYkDg200gXiO7y5FXl1dqGa-tilF9IL57hx3UcuM7mLZa5zU0bPprhmCelSA4CwG8la5yGUdpvBqBSz22-X-DgkrwCzLpchi_LnpAkx2_jza2yTO5PtCSjn7hFbvuNFm6Neb_AuRg6-TpolwGfl-Bu3nUySmnCoCH8IvAoA4BpXzFH7o21cAlEHdKAndnoEljtyoyxa6DAZoG5Osttn71tbudyEFh26eOiA-wGAbpHJnsg3Slp35gicLgAepJ8oDFAaqdhtyP7-jizBCCB-Bt29hqvWwH22J30wgoXKnJgjFl6F9L4hxmvRi2QHWKV35ITGA1JqrS0VayT9DIrbTUKoRIKzTme0Z5deBxB8YFmiINAEqdS86PCbQi28-tK2RYuvFJ4O_RWxrMxD1fnDhbPWhnDGiY9dGmDjPgWB70vtIVaikGr_M_Bwm_3oa7ec00eTaSnjmX1qzLCLdvX_Ie6hBFgDii7ZU-KrQndqcbLYU9KtkI2yVwXWMihlLzdxNukzuxnU4IK334Mtebpz6ykLX8e8UfvbHJx4D_JgQQKB7Gfoxb-O02owUY6bvR-tgTp8n9-Mic9osDWeBgmmPHmvyNjcWbi4SXdBSVVIML12iS7qvjHlekG01r_EalttUe000hU1MLqE_Kd1XcINRq0vZaed5m6fXEUVcPchVYN9GC7e36HMo7ZlgE0fGNsWY6pFr-seXI2_23UJW9l_edhMqpvJJh5CAJYMRQ-Rtvzfbe8n|1757554489|NPRgqlI6umo48ScNrekwcxSuAvgFZbVOWWnkmHcAJww=', } result_list = [] df.to_csv(fr"D:\Idea Project\F6+宜搭+其它(1)\张阳脚本\文件输出\门店含id权限功能映射.csv", index=False) if "开关" not in df.columns: return pd.DataFrame(columns=["数据id", "门店编码", "一级分类", "二级分类", "权限", "开关", "状态"]) df.dropna(subset=['开关'], inplace=True) for index, row in df.iterrows(): print(row) if row["company_id"] is None: continue if row["门店编码"] == "空门店编码" or row["门店编码"] is None: result_list.append( { "数据id": row["数据id"], "门店编码": row["门店编码"], "一级分类": row["一级分类"], "二级分类": row["二级分类"], "权限": row["权限"], "开关": row["开关"], "状态": "空门店编码跳过执行", } ) continue if row["一级分类"] is not None: params = { 'groupId': row["company_id"], } response = requests.get( 'https://manage.f6yc.com/hive-admin/company/role/getGroupAdminRoleMenusScope', params=params, cookies=cookies, headers=headers, ) # print(response.json()) if response.status_code == 200: pkId = response.json().get("data").get("role").get("id") user_name = response.json().get("data").get("role").get("name") menus_list = response.json().get("data").get("menus") for menu in menus_list: if menu.get("name") == row["一级分类"]: nodes_list = menu.get("nodes") if row["开关"] == "开": # 确保一级权限为开 menu["isChecked"] = 1 for node in nodes_list: if node.get("name") == row["二级分类"]: child_nodes_list = node.get("nodes", []) print(row["二级分类"], row["权限"], row["开关"]) if row["开关"] == "开": # 确保二级权限为开 node["isChecked"] = 1 if row["权限"] != "无": # 处理有三级权限的情况 for child_node in child_nodes_list: if child_node.get("name") == row["权限"]: if row["开关"] == "开": print("更新权限:", child_node["id"], "为开") child_node["isChecked"] = 1 else: child_node["isChecked"] = 0 break # 找到对应的权限后跳出循环 else: print(row["二级分类"], row["权限"], row["开关"], 11111111111) # 处理只有二级权限的情况 if row["开关"] == "开": node["isChecked"] = 1 else: node["isChecked"] = 0 # 同时更新所有子节点的状态 for child_node in child_nodes_list: child_node["isChecked"] = node["isChecked"] # 查询isChecked为1的id checked_ids = [] def collect_checked_ids(items): for item in items: if item.get("isChecked") == 1: checked_ids.append(str(item.get("id"))) if "nodes" in item: collect_checked_ids(item["nodes"]) collect_checked_ids(menus_list) checked_ids_str = ",".join(checked_ids) print(f"更新后选中项的ID: {checked_ids_str}") new_json_data = { "pkId": pkId, "name": user_name, 'isValid': 1, "menuStr": checked_ids_str, } response = requests.post( 'https://manage.f6yc.com/hive-admin/company/role/update', cookies=cookies, headers=headers, json=new_json_data, ) time.sleep(0.5) print(f"请求结果:{response.json()}") if response.json().get("code", 1000) == 200: result_list.append( { "数据id": row["数据id"], "门店编码": row["门店编码"], "一级分类": row["一级分类"], "二级分类": row["二级分类"], "权限": row["权限"], "开关": row["开关"], "状态": "更新成功", } ) else: result_list.append( { "数据id": row["数据id"], "门店编码": row["门店编码"], "一级分类": row["一级分类"], "二级分类": row["二级分类"], "权限": row["权限"], "开关": row["开关"], "状态": "更新失败", } ) else: payload = { "api_key": "6694d3c4fcb69ca9a111a6c4", "entry_id": "68cb745753594c2570ba4f70", "data": {"_widget_1758164058473": {"value": "Boss权限自动审批"}, # 预警类型 "_widget_1758164058474": {"value": f"cookies信息失效,请更新cookie"} # 预警内容 }, "is_start_workflow": "true" } res = API().data_batch_create(payload) result_df = pd.DataFrame(result_list) return result_df def send_status_and_approval(self, df): all_success_ids = [] # 全部成功的 数据id failed_records_dict = {} # 不成功的记录 {数据id: [不成功的记录]} # 按 "数据id" 分组,并遍历每个分组 for data_id, group in df.groupby("数据id"): if (group["状态"] == "更新成功").all(): all_success_ids.append(data_id) print(f"数据id {data_id} 全部成功,执行提交操作") get_task_id = API().workflow_instance_get({"data_id": data_id}) # print(get_task_id) task_id = get_task_id.get("tasks")[-1].get("task_id") user_name = get_task_id.get("tasks")[-1].get("assignee").get("username") # 表单提交到下一步 res = API().workflow_task_approve({ "username": user_name, "task_id": task_id, "instance_id": data_id, }) # print(res) new_res = API().entry_data_update({ "api_key": "675b900991ad2491c69389ca", # 应用ID "entry_id": "68c1116b51730ebbc690ae40", # 表单ID "data_id": data_id, "data": {"_widget_1751522755921": {"value": "是"}, # 是否开通 }, }) elif (group["状态"] == "空门店编码跳过执行").all(): print(f"数据id {data_id} 存在空门店编码,跳过执行") else: # 情况2:有不成功的 -> 执行操作 b(如组合不成功的记录) failed_records = group[group["状态"] != "更新成功"].to_dict("records") failed_records_dict[data_id] = failed_records print(f"数据id {data_id} 有不成功的记录:{failed_records}") payload = { "api_key": "6694d3c4fcb69ca9a111a6c4", "entry_id": "68cb745753594c2570ba4f70", "data": {"_widget_1758164058473": {"value": "Boss权限自动审批"}, # 预警类型 "_widget_1758164058474": {"value": f"数据id {data_id} 有不成功的记录:{failed_records}"} # 预警内容 }, "is_start_workflow": "true" } res = API().data_batch_create(payload) # print(f"已成功提交数据预警信息:{res}") print(f"全部成功的数据id:{all_success_ids}") def main(self): task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: # step1 获取简道云上任务列表 self.load_data() if not self.task_data: print("当前没有需要执行的任务,结束。") self.send_task_status(task_start_time, "boss权限自动审批") return # step2 根据权限开通列表获取权限对应功能 payload_df = self.map_switch_data() if payload_df is None or payload_df.empty: print("当前没有匹配到需要执行的审批节点数据,结束。") self.send_task_status(task_start_time, "boss权限自动审批") return # step3 根据门店编码查询公司id。 payload_df = self.get_company_id(payload_df) # step4 批量修改权限 result_df = self.update_permission(payload_df) # step5 简道云发送状态与同意 self.send_status_and_approval(result_df) self.send_task_status(task_start_time, "boss权限自动审批") except Exception as e: self.send_task_error(task_start_time, "boss权限自动审批失败", str(e)) if __name__ == '__main__': BossPermissionAutoApproval().main()