Files
F6--/张阳脚本/boss/boss权限自动审批.py
2026-06-02 15:08:26 +08:00

997 lines
44 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime, timezone, timedelta, date, UTC
import time
from typing import Optional, List, Dict, Any
import requests
import json
import pandas as pd
token = "Bearer qygHulymo1fekJk4CIZyNKjyQAzG8CFN"
class API:
def entry_data_list(self, data: dict, replace: bool = False, max_retries: int = 5) -> Dict: # 获取多条表单数据
"""
获取多条表单数据
:param max_retries: 最大重试次数
:param replace: 是否替换字段
:param data:
api_key: 应用id
entry_id: 表单id
:return:
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/list'
headers = {
'Authorization': token, # 曹伟应用api测试 app_key
'Content-Type': 'application/json'
}
all_data_batches = [] # 用于存储每次请求返回的数据批次
last_data_id = None
exit_flag = False
while True:
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"filter": data.get('filter', {}),
"limit": 100,
"data_id": last_data_id
})
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
if data_get["data"]:
all_data_batches.extend(data_get['data'])
last_data_id = data_get['data'][-1].get('_id')
break # 成功则跳出循环
else:
if 'data' not in data_get or len(data_get['data']) == 0:
exit_flag = True
break
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
if retries > max_retries:
all_data_batches.append(None) # 或者可以选择记录失败的payload以便后续处理
if exit_flag:
break
# 构建最终返回的字典
final_data = {
'data': all_data_batches # 'data' 键对应的值是列表的列表
}
return final_data
@staticmethod
def workflow_instance_get(data: dict, max_retries: int = 5) -> dict:
"""
查询实例流程信息
:param max_retries:
:param data: 简道云插件发送过来的data,包含应用id
:return: 查询简道云流程实例信息返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v5/workflow/instance/get'
headers = {
'Authorization': token, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"instance_id": data['data_id'],
"tasks_type": 1
}
)
data_get = None
retries = 0
while retries <= max_retries:
try:
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
# print( "返回结果:", data_get)
if res.status_code == 200:
break # 成功则跳出循环
else:
retries += 1
time.sleep(3) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
retries += 1
time.sleep(0.1) # 在重试之间稍作停顿
if retries > max_retries:
break
return data_get
@staticmethod
def workflow_task_approve(data: dict) -> dict:
"""
流程待办提交
:param data:应包含username、instance_id(data_id)、task_id等信息
:return:返回简道云流程待办提交的结果
"""
url = 'https://api.jiandaoyun.com/api/v1/workflow/task/approve'
headers = {
'Authorization': token, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"username": data["username"],
"instance_id": data["instance_id"],
"task_id": data['task_id'],
"comment": ""
}
)
res = requests.post(url=url, data=payload, headers=headers, timeout=10)
return res.json()
@staticmethod
def data_batch_create(data: dict, max_retries: int = 5) -> Optional[requests.Response]: # 新建单条数据
"""
新建单条表单数据
:param max_retries: 最大重试次数
:param data: 应该包含应用id、表单id,以及新建的数据data['data']
:return: 返回创建后简道云返回的信息
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/create'
headers = {
'Authorization': token, # 曹伟应用api测试 app_key
'Content-Type': 'application/json'
}
"""
data 样式 # 后续优化发送数据样式 目前输入字段,后续优化输入表单名称
jiandaoyun_data['data'] = {"_widget_1731650067055":{"value":f'{username}{password}'},
"_widget_1731650067056":{"value": f"{group}"}}
"""
# noinspection DuplicatedCode
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"data": data['data'],
"is_start_workflow": data.get('is_start_workflow', "false"),
"is_start_trigger": data.get('is_start_trigger', "false"),
"transaction_id": data.get('transaction_id', "")
}
)
retries = 0
while retries <= max_retries:
try:
res: requests.Response = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
if res.status_code == 200:
return data_get
else:
retries += 1
time.sleep(3) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
retries += 1
time.sleep(3) # 在重试之间稍作停顿
if retries > max_retries:
print(
f"任务 {data['data_list']} 连续{max_retries}次请求失败,放弃此次请求。")
return None
@staticmethod
def entry_data_update(data: dict, max_retries: int = 5) -> dict: # 修改数据
"""
修改数据
:param max_retries: 最大重试次数,此处设置100次
:param data: 简道云插件发送过来的data,包含应用id、表单id、数据id等信息
:return: 修改数据后简道云返回的结果
"""
url = 'https://api.jiandaoyun.com/api/v5/app/entry/data/update'
headers = {
'Authorization': token, # 曹伟应用api测试 appKey
'Content-Type': 'application/json'
}
payload = json.dumps({
"app_id": data['api_key'], # 应用ID
"entry_id": data['entry_id'], # 表单ID
"data_id": data['data_id'], # 数据ID
"data": data['data']
}
)
data_get = None
retries = 0
while retries <= max_retries:
try:
res: requests.Response = requests.post(url=url, data=payload, headers=headers, timeout=10)
res.raise_for_status() # 检查HTTP响应状态码,如果不等于200会抛出异常
data_get = res.json()
# print("返回结果:", data_get)
# print(res.status_code)
if res.status_code == 200:
break # 成功则跳出循环
else:
retries += 1
time.sleep(1) # 在重试之间稍作停顿
except requests.exceptions.RequestException as e:
retries += 1
time.sleep(2) # 在重试之间稍作停顿
if retries > max_retries:
print(f"任务 {data['data_id']} 连续{max_retries}次请求失败,放弃此次请求。")
continue
return data_get
class BossPermissionAutoApproval:
"""
Boss权限自动审批
"""
def __init__(self):
self.boss_perm_switch_data = None
self.task_data = None
self.boss_switch_fields_map = {
"权限功能": "_widget_1757483372023",
"一级分类": "_widget_1757554068594",
"二级分类": "_widget_1757553855187",
"权限": "_widget_1757483372025",
"权限开关": "_widget_1757483372026",
}
self.task_data_map = {
"当日新签开户": "_widget_1754896018130",
"新开户门店名": "_widget_1754896018132",
"新开户门店编码": "_widget_1754896018133",
"关联数据": "_widget_1751522231978",
"公司名": "_widget_1751521185014",
"门店名": "_widget_1751521185015",
"门店编码": "_widget_1751521185016",
"分类": "_widget_1751521319795",
"权限功能": "_widget_1751521319796",
"申请理由": "_widget_1751599915625",
"处理人": "_widget_1751521319797",
"提交人": "_widget_1751596512100",
"需开项目-图片": "_widget_1751521386658",
"需开项目-说明": "_widget_1751600131107",
"是否开通": "_widget_1751522755921",
"拒绝原因": "_widget_1754895638981",
"完成时间": "_widget_1751521573695",
"是否新功能": "_widget_1755137980288",
"流水号": "_widget_1751596707486",
}
def load_data(self):
self.task_data = API().entry_data_list(data={
"api_key": "675b900991ad2491c69389ca", # 应用ID
"entry_id": "6866179f6a73c0d879208bcc", # 表单ID
"filter": {
"rel": "and",
"cond": [{"field": "flowState", "type": "flowstate", "method": "eq", "value": 0}]
}
}).get("data") # 获取最新权限列表
self.boss_perm_switch_data = API().entry_data_list(data={
"api_key": "675b900991ad2491c69389ca", # 应用ID
"entry_id": "68c1116b51730ebbc690ae40" # 表单ID
}).get("data") # 权限对应功能
self.get_cookies = API().entry_data_list(data={
"api_key": "675b900991ad2491c69389ca", # 应用ID
"entry_id": "68c237d565832158571e4ff4"
}).get("data")
def map_switch_data(self):
"""
映射功能开关
:return: df
"""
df = pd.DataFrame(self.task_data)
reverse_map = {v: k for k, v in self.task_data_map.items()}
df = df.rename(columns=reverse_map)
# df.to_csv(fr"D:\Idea Project\F6+宜搭+其它(1)\张阳脚本\文件输出\门店开通权限.csv", index=False)
df1 = pd.DataFrame(self.boss_perm_switch_data)
boss_switch_reverse_map = {v: k for k, v in self.boss_switch_fields_map.items()}
df1 = df1.rename(columns=boss_switch_reverse_map)
# df1.to_csv(fr"D:\Idea Project\F6+宜搭+其它(1)\张阳脚本\文件输出\Boss权限对应功能.csv", index=False)
all_data = []
all_data_ids = []
# 获取门店编码与权限功能,并匹配对应的多条功能数据
for index, row in df.iterrows():
# 过滤出 审批节点 的数据
data_id = row.get("_id")
if data_id is None or (isinstance(data_id, float) and pd.isna(data_id)):
print(f"警告:第 {index} 行缺少 _id 字段,跳过该行")
continue
all_data_ids.append(data_id)
get_task_id = API().workflow_instance_get({"data_id": data_id})
flow_name = get_task_id.get("tasks")[-1].get("flow_name")
if flow_name != "审批节点":
continue
# 确定门店编码
org_code = row["新开户门店编码"] if row["当日新签开户"] == "" else row["门店编码"]
# 获取当前行的权限功能
perm_functions = row["权限功能"]
# 在df1中查找匹配的所有功能
matched_functions = df1[df1['权限功能'] == perm_functions] # 假设df1中有'权限功能'列
# 如果没有匹配项,跳过这条记录
if matched_functions.empty:
print(f"跳过:权限功能 '{perm_functions}' 在映射表中找不到匹配")
continue
else:
# 添加所有匹配的功能
for _, func_row in matched_functions.iterrows():
all_data.append({
"数据id": data_id,
"门店编码": org_code,
"权限功能": perm_functions,
"一级分类": func_row["一级分类"],
"二级分类": func_row["二级分类"],
"权限": func_row["权限"],
"开关": func_row["权限开关"]
})
# 如果你想要将这些数据也保存为CSV
result_df = pd.DataFrame(all_data)
# result_df.to_csv(fr"D:\Idea Project\F6+宜搭+其它(1)\张阳脚本\文件输出\门店权限功能映射.csv", index=False)
return result_df
def get_company_id(self, df):
"""
获取公司id
:param df: 含门店编码的df
:return: df
"""
if df is None or df.empty:
if df is None:
df = pd.DataFrame()
if "company_id" not in df.columns:
df["company_id"] = []
return df
unique_codes = df["门店编码"].unique().tolist()
# 初始化存储结果的字典
code_to_company_id = {}
cookie_str = self.get_cookies[0].get("_widget_1757558743223")
print(cookie_str)
cookies = {}
items = cookie_str.split('; ')
for item in items:
if '=' in item:
key, value = item.split('=', 1)
cookies[key.strip()] = value.strip()
headers = {
'accept': 'application/json, text/plain, */*',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'content-type': 'application/json;charset=UTF-8',
'origin': 'https://manage.f6yc.com',
'priority': 'u=1, i',
'referer': 'https://manage.f6yc.com/',
'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Microsoft Edge";v="140"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0',
'x-requested-with': 'XMLHttpRequest',
# 'cookie': 'hive-adminSESSIONID=91f4217e-3007-4762-adc5-afc2c4d11bdd; _yg_prod=EkX_f7K7eYt61spccZtpCE7FHwA2I5PROPsPCa8-iC3ASlLYfszIPQQqjcJjPPEZL0J2pO07cfp8VG-4XSfqBfYzvRdjXVIjT2D3fZ_a80KTwgID0oqvVma_W4j_-PTD1I79TjPNGx-FjivcHXdACpJVZSu_NgJB; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%222268275546837446%22%2C%22first_id%22%3A%221989d4783646e1-07c9f1a149173e8-4c657b58-2073600-1989d4783651ade%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E7%9B%B4%E6%8E%A5%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC_%E7%9B%B4%E6%8E%A5%E6%89%93%E5%BC%80%22%2C%22%24latest_referrer%22%3A%22%22%7D%2C%22%24device_id%22%3A%221989d4783646e1-07c9f1a149173e8-4c657b58-2073600-1989d4783651ade%22%7D',
}
for code in unique_codes:
json_data = {
'currentPage': 1,
'pageSize': 10,
'licenseNo': '',
'licenseName': '',
'groupName': '',
'code': code,
'name': '',
'orgId': '',
'busiType': '',
'saasVersion': '',
'saasStatus': '',
'province': '',
'city': '',
'area': '',
'orgStatus': 0,
'orgScale': '',
'mainOrgOnly': 0,
'saasEdition': '',
'storeId': '',
'franchiseGroupId': '',
'minCreateTime': '',
'maxCreateTime': '',
'serviceImplPrincipalList': [],
'containTest': True,
}
# 跳过空门店编码
if not code or str(code).strip() == "" or pd.isna(code):
print(f"跳过空门店编码")
code_to_company_id[code] = None
continue
try:
response = requests.post(
'https://manage.f6yc.com/hive-admin/org/getAllOrgList',
cookies=cookies,
headers=headers,
json=json_data,
timeout=10
)
response.raise_for_status() # 检查请求是否成功
response_json = response.json()
data = response_json.get("data", {})
records = data.get("records", [])
# 兼容废弃与停用门店
if len(records) == 0:
json_data['orgStatus'] = 1 # 停用
response = requests.post(
'https://manage.f6yc.com/hive-admin/org/getAllOrgList',
cookies=cookies,
headers=headers,
json=json_data,
timeout=10
)
response_json = response.json()
data = response_json.get("data", {})
records = data.get("records", [])
if len(records) == 0:
json_data['orgStatus'] = 2 # 废弃
response = requests.post(
'https://manage.f6yc.com/hive-admin/org/getAllOrgList',
cookies=cookies,
headers=headers,
json=json_data,
timeout=10
)
response_json = response.json()
data = response_json.get("data", {})
records = data.get("records", [])
# 提取company_id并存储到字典
if len(records) > 0:
company_id = records[0].get('groupId')
if company_id:
code_to_company_id[code] = company_id
print(f"成功查询: 门店编码 {code} -> company_id {company_id}")
else:
print(f"查询结果异常: 门店编码 {code}, groupId为空")
code_to_company_id[code] = None
else:
print(f"查询失败: 门店编码 {code}, 未找到匹配的公司(可能已注销)")
code_to_company_id[code] = None
except Exception as e:
print(f"查询失败: 门店编码 {code}, 错误: {str(e)}")
code_to_company_id[code] = None # 失败时存储None
df['company_id'] = df['门店编码'].map(code_to_company_id)
# df.to_csv(fr"D:\Idea Project\F6+宜搭+其它(1)\张阳脚本\文件输出\门店含id权限功能映射.csv", index=False)
return df
def send_task_error(self, task_start_time: str, task_name: str, error_message: str,
df: pd.DataFrame = None) -> None:
"""
将任务失败情况发送到简道云(影响业务数据时调用)
:param df: 失败文件
:param task_start_time: 任务开始时间(字符串格式:"%Y-%m-%d %H:%M:%S",表示北京时间 UTC+8
:param task_name: 任务名称
:param error_message: 失败详情
"""
try:
# 1. 获取当前 UTC 时间(时区感知对象)
end_time_utc = datetime.now(UTC) # ✅ 替代 utcnow()
# 2. 解析传入的北京时间(UTC+8)
task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S")
# 3. 转换为 UTC 时间(减去 8 小时,并附加 UTC 时区)
task_start_utc = task_start_naive - timedelta(hours=8)
task_start_utc = task_start_utc.replace(tzinfo=timezone.utc) # 显式标记为 UTC
# 4. 计算运行时间(时区感知对象可直接相减)
run_time = end_time_utc - task_start_utc
run_time_sec = int(run_time.total_seconds())
# 5. 格式化时间为 UTC 的 ISO 8601 格式(带 "Z"
today_utc = end_time_utc.strftime("%Y-%m-%d")
task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
# 6.上传附件
UUid = time.strftime("%Y%m%d%H%M%S", time.localtime())
if df is not None:
df.to_excel("upload_file.xlsx", index=False)
file_path = "upload_file.xlsx"
up_data = API().get_upload_token(
{"api_key": "6694d3c4fcb69ca9a111a6c4", "entry_id": "689ae65da00c17578e27cd74",
"transaction_id": UUid})
upload_url = up_data.get("upload_url")
upload_token = up_data.get("upload_token")
upload_result = API().upload_file(
{"upload_url": upload_url, "upload_token": upload_token, "file_path": file_path})
upload_key = upload_result.get("key")
else:
upload_key = ""
# 7. 构造请求数据(所有时间以 UTC 格式发送)
payload = {
"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id":"689ae65da00c17578e27cd74",
"data": {
"_widget_1744873387500": {"value": today_utc}, # UTC 日期
"_widget_1743644977694": {"value": task_name},
"_widget_1744873387501": {"value": task_start_iso}, # UTC 开始时间
"_widget_1744873387502": {"value": task_end_iso}, # UTC 结束时间
"_widget_1744873387504": {"value": run_time_sec},
"_widget_1754981992215": {"value": error_message}, # 错误信息
"_widget_1764830825356": {"value": [upload_key]}
},
"transaction_id": UUid
}
# 8. 发送请求
response = API().data_batch_create(payload)
except Exception as e:
print(e)
def send_task_status(self, task_start_time: str, task_name: str) -> None:
"""
将任务状态发送到简道云(开始时间为北京时间,需转换到 UTC)
:param task_start_time: 任务开始时间(字符串格式:"%Y-%m-%d %H:%M:%S",表示北京时间 UTC+8
:param task_name: 任务名称
"""
try:
# 1. 获取当前 UTC 时间(时区感知对象)
end_time_utc = datetime.now(UTC) # ✅ 替代 utcnow()
# 2. 解析传入的北京时间(UTC+8)
task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S")
# 3. 转换为 UTC 时间(减去 8 小时,并附加 UTC 时区)
task_start_utc = task_start_naive - timedelta(hours=8)
task_start_utc = task_start_utc.replace(tzinfo=timezone.utc) # 显式标记为 UTC
# 4. 计算运行时间(时区感知对象可直接相减)
run_time = end_time_utc - task_start_utc
run_time_sec = int(run_time.total_seconds())
# 5. 格式化时间为 UTC 的 ISO 8601 格式(带 "Z"
today_utc = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
# 6. 构造请求数据(所有时间以 UTC 格式发送)
payload = {
"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "67ede908eb9c22261016466e",
"data": {
"_widget_1744873387500": {"value": today_utc}, # UTC 日期
"_widget_1743644977694": {"value": task_name},
"_widget_1744873387501": {"value": task_start_iso}, # UTC 开始时间
"_widget_1744873387502": {"value": task_end_iso}, # UTC 结束时间
"_widget_1744873387504": {"value": run_time_sec},
}
}
# 7. 发送请求
response = API().data_batch_create(payload)
except Exception as e:
print(e)
def update_permission(self, df):
"""优化版:按门店聚合,减少请求频率"""
if df is None or df.empty:
return pd.DataFrame(columns=["数据id", "门店编码", "一级分类", "二级分类", "权限", "开关", "状态"])
# 检查 cookies 数据是否存在
if not self.get_cookies or len(self.get_cookies) == 0:
print("错误:cookies 数据为空")
return pd.DataFrame(columns=["数据id", "门店编码", "一级分类", "二级分类", "权限", "开关", "状态"])
cookie_str = self.get_cookies[0].get("_widget_1757558743223")
if not cookie_str:
print("错误:cookie 字符串为空")
return pd.DataFrame(columns=["数据id", "门店编码", "一级分类", "二级分类", "权限", "开关", "状态"])
cookies = {}
items = cookie_str.split('; ')
for item in items:
if '=' in item:
key, value = item.split('=', 1)
cookies[key.strip()] = value.strip()
headers = {
'accept': 'application/json, text/plain, */*',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'priority': 'u=1, i',
'referer': 'https://manage.f6yc.com/',
'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Microsoft Edge";v="140"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0',
'x-requested-with': 'XMLHttpRequest',
}
result_list = []
print(f"=== update_permission 调试信息 ===")
print(f"原始数据行数: {len(df)}")
print(f"数据列名: {df.columns.tolist()}")
if "开关" not in df.columns:
print("错误:数据中没有'开关'")
return pd.DataFrame(columns=["数据id", "门店编码", "一级分类", "二级分类", "权限", "开关", "状态"])
df_filtered = df.dropna(subset=['开关']).copy()
print(f"过滤空开关后的数据行数: {len(df_filtered)}")
if len(df_filtered) == 0:
print("警告:过滤后没有数据需要处理")
return pd.DataFrame(columns=["数据id", "门店编码", "一级分类", "二级分类", "权限", "开关", "状态"])
# ========== 核心优化:按company_id聚合 ==========
# 过滤掉company_id为空的行
df_valid = df_filtered[df_filtered['company_id'].notna()].copy()
if len(df_valid) == 0:
print("没有有效的company_id,跳过所有处理")
return pd.DataFrame(columns=["数据id", "门店编码", "一级分类", "二级分类", "权限", "开关", "状态"])
# 按 company_id 分组
grouped = df_valid.groupby('company_id')
print(f"开始处理 {len(grouped)} 个门店的权限...")
for company_id, group in grouped:
print(f"\n===== 处理门店 company_id: {company_id},共 {len(group)} 条权限 =====")
# 1. 获取当前门店的菜单状态(只请求一次)
params = {'groupId': company_id}
try:
response = requests.get(
'https://manage.f6yc.com/hive-admin/company/role/getGroupAdminRoleMenusScope',
params=params,
cookies=cookies,
headers=headers,
timeout=10
)
if response.status_code != 200:
print(f"错误:获取菜单状态失败,状态码: {response.status_code}")
# 该门店所有权限都标记为失败
for _, row in group.iterrows():
result_list.append({
"数据id": row["数据id"],
"门店编码": row["门店编码"],
"一级分类": row["一级分类"],
"二级分类": row["二级分类"],
"权限": row["权限"],
"开关": row["开关"],
"状态": f"获取菜单状态失败 {response.status_code}",
})
continue # 跳过这个门店,处理下一个
response_json = response.json()
data = response_json.get("data")
if not data:
# 可能是cookie过期,发送预警
payload = {
"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "68cb745753594c2570ba4f70",
"data": {
"_widget_1758164058473": {"value": "Boss权限自动审批"},
"_widget_1758164058474": {"value": f"cookies信息失效,请更新cookie"}
},
"is_start_workflow": "true"
}
API().data_batch_create(payload)
# 标记所有行为cookie失效
for _, row in group.iterrows():
result_list.append({
"数据id": row["数据id"],
"门店编码": row["门店编码"],
"一级分类": row["一级分类"],
"二级分类": row["二级分类"],
"权限": row["权限"],
"开关": row["开关"],
"状态": "cookies失效",
})
continue
menus_list = data.get("menus", [])
print(f"菜单项数: {len(menus_list)}")
except Exception as e:
print(f"获取菜单状态异常: {str(e)}")
for _, row in group.iterrows():
result_list.append({
"数据id": row["数据id"],
"门店编码": row["门店编码"],
"一级分类": row["一级分类"],
"二级分类": row["二级分类"],
"权限": row["权限"],
"开关": row["开关"],
"状态": f"获取菜单异常: {str(e)}",
})
continue
# 2. 在同一个menus_list上应用该门店的所有权限变更
has_modification = False # 标记是否有实际修改
for _, row in group.iterrows():
print(f" 应用权限: {row['一级分类']} > {row['二级分类']} > {row['权限']} = {row['开关']}")
if row['一级分类'] is None:
continue
for menu in menus_list:
if menu.get("name") == row["一级分类"]:
# 处理一级分类
if row["开关"] == "":
menu["isChecked"] = 1
has_modification = True
print(f" 开启一级分类: {row['一级分类']}")
for node in menu.get("nodes", []):
if node.get("name") == row["二级分类"]:
# 处理二级分类
if row["开关"] == "":
if node.get("isChecked") != 1:
node["isChecked"] = 1
has_modification = True
print(f" 开启二级分类: {row['二级分类']}")
if row["权限"] != "":
# 处理三级权限
for child_node in node.get("nodes", []):
if child_node.get("name") == row["权限"]:
new_value = 1 if row["开关"] == "" else 0
if child_node.get("isChecked") != new_value:
child_node["isChecked"] = new_value
has_modification = True
print(f" {'开启' if new_value else '关闭'}三级权限: {row['权限']}")
break
else:
# 无三级权限时,统一设置二级及子节点
new_value = 1 if row["开关"] == "" else 0
if node.get("isChecked") != new_value:
node["isChecked"] = new_value
has_modification = True
for child_node in node.get("nodes", []):
if child_node.get("isChecked") != new_value:
child_node["isChecked"] = new_value
has_modification = True
break # 找到匹配的二级分类后跳出
break # 找到匹配的一级分类后跳出
# 3. 如果没有实际修改,直接标记该门店所有权限为成功
if not has_modification:
print(f" 门店 {company_id} 无实际权限变更,跳过更新请求")
for _, row in group.iterrows():
result_list.append({
"数据id": row["数据id"],
"门店编码": row["门店编码"],
"一级分类": row["一级分类"],
"二级分类": row["二级分类"],
"权限": row["权限"],
"开关": row["开关"],
"状态": "无需更新(权限已生效)",
})
continue
# 4. 收集所有选中ID并发送更新请求(只发送一次)
checked_ids = []
def collect_checked_ids(items):
for item in items:
if item.get("isChecked") == 1:
checked_ids.append(str(item.get("id")))
if "nodes" in item:
collect_checked_ids(item["nodes"])
collect_checked_ids(menus_list)
checked_ids_str = ",".join(checked_ids)
print(f" 选中ID数量: {len(checked_ids)}")
# 5. 发送POST更新请求
role = data.get("role", {})
pk_id = role.get("id")
user_name = role.get("name")
if not pk_id or not user_name:
print(f" 错误:无法获取pkId或user_name")
for _, row in group.iterrows():
result_list.append({
"数据id": row["数据id"],
"门店编码": row["门店编码"],
"一级分类": row["一级分类"],
"二级分类": row["二级分类"],
"权限": row["权限"],
"开关": row["开关"],
"状态": "响应数据异常",
})
continue
new_json_data = {
"pkId": pk_id,
"name": user_name,
'isValid': 1,
"menuStr": checked_ids_str,
}
# 添加重试机制
max_retries = 3
update_success = False
update_error_msg = ""
for retry in range(max_retries):
try:
response = requests.post(
'https://manage.f6yc.com/hive-admin/company/role/update',
cookies=cookies,
headers=headers,
json=new_json_data,
timeout=30 # 增加超时时间,因为参数可能很长
)
result = response.json()
print(f" 更新请求结果 (重试{retry + 1}/{max_retries}): {result}")
if result.get("code") == 200:
update_success = True
break
elif result.get("code") == 500:
update_error_msg = f"服务器内部错误 (500)"
if retry < max_retries - 1:
wait_time = (retry + 1) * 2 # 递增等待时间:2s, 4s, 6s
print(f" 500错误,{wait_time}秒后重试...")
time.sleep(wait_time)
else:
update_error_msg = f"更新失败 code={result.get('code')}: {result.get('message')}"
break
except Exception as e:
update_error_msg = f"更新异常: {str(e)}"
if retry < max_retries - 1:
wait_time = (retry + 1) * 2
print(f" 请求异常,{wait_time}秒后重试...")
time.sleep(wait_time)
# 6. 记录该门店所有权限的结果
status = "更新成功" if update_success else f"更新失败: {update_error_msg}"
for _, row in group.iterrows():
result_list.append({
"数据id": row["数据id"],
"门店编码": row["门店编码"],
"一级分类": row["一级分类"],
"二级分类": row["二级分类"],
"权限": row["权限"],
"开关": row["开关"],
"状态": status,
})
# 7. 门店之间添加延时,避免请求过快
time.sleep(1)
result_df = pd.DataFrame(result_list)
return result_df
def send_status_and_approval(self, df):
all_success_ids = [] # 全部成功的 数据id
failed_records_dict = {} # 不成功的记录 {数据id: [不成功的记录]}
# 按 "数据id" 分组,并遍历每个分组
for data_id, group in df.groupby("数据id"):
if (group["状态"] == "更新成功").all():
all_success_ids.append(data_id)
print(f"数据id {data_id} 全部成功,执行提交操作")
get_task_id = API().workflow_instance_get({"data_id": data_id})
# print(get_task_id)
task_id = get_task_id.get("tasks")[-1].get("task_id")
user_name = get_task_id.get("tasks")[-1].get("assignee").get("username")
# 表单提交到下一步
res = API().workflow_task_approve({
"username": user_name,
"task_id": task_id,
"instance_id": data_id,
})
# print(res)
new_res = API().entry_data_update({
"api_key": "675b900991ad2491c69389ca", # 应用ID
"entry_id": "68c1116b51730ebbc690ae40", # 表单ID
"data_id": data_id,
"data": {"_widget_1751522755921": {"value": ""}, # 是否开通
},
})
elif (group["状态"] == "空门店编码跳过执行").all():
print(f"数据id {data_id} 存在空门店编码,跳过执行")
else:
# 情况2:有不成功的 -> 执行操作 b(如组合不成功的记录)
failed_records = group[group["状态"] != "更新成功"].to_dict("records")
failed_records_dict[data_id] = failed_records
print(f"数据id {data_id} 有不成功的记录:{failed_records}")
payload = {
"api_key": "6694d3c4fcb69ca9a111a6c4",
"entry_id": "68cb745753594c2570ba4f70",
"data": {"_widget_1758164058473": {"value": "Boss权限自动审批"}, # 预警类型
"_widget_1758164058474": {"value": f"数据id {data_id} 有不成功的记录:{failed_records}"}
# 预警内容
},
"is_start_workflow": "true"
}
res = API().data_batch_create(payload)
# print(f"已成功提交数据预警信息:{res}")
print(f"全部成功的数据id{all_success_ids}")
def main(self):
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
# step1 获取简道云上任务列表
self.load_data()
if not self.task_data:
print("当前没有需要执行的任务,结束。")
self.send_task_status(task_start_time, "boss权限自动审批")
return
# step2 根据权限开通列表获取权限对应功能
payload_df = self.map_switch_data()
if payload_df is None or payload_df.empty:
print("当前没有匹配到需要执行的审批节点数据,结束。")
self.send_task_status(task_start_time, "boss权限自动审批")
return
# step3 根据门店编码查询公司id。
payload_df = self.get_company_id(payload_df)
# step4 批量修改权限
result_df = self.update_permission(payload_df)
# step5 简道云发送状态与同意
self.send_status_and_approval(result_df)
self.send_task_status(task_start_time, "boss权限自动审批")
except Exception as e:
import traceback
error_detail = f"{type(e).__name__}: {e}"
print(f"任务执行异常: {error_detail}")
print(f"堆栈信息:\n{traceback.format_exc()}")
self.send_task_error(task_start_time, "boss权限自动审批失败", error_detail)
if __name__ == '__main__':
BossPermissionAutoApproval().main()