from api import API import pandas as pd from tqdm import tqdm import hashlib from datetime import datetime # 初始化 API 实例和 Token api_instance = API() TOKEN = api_instance.generateToken() FORMID = "FORM-XHA66881FHMAR0F07GT4Y59GGA972DD6B5OHLB" appType = "APP_RTPWHV37ENXPQUZHTL25" systemToken = "IA766O61SHFZT6UB0WNOB58GI5RW2K58KCU1LL6" def fetch_process_data(): """获取所有流程实例""" today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) # 当天0点 form_data_two = api_instance.read_processes_instances( token=TOKEN, formUuid=FORMID, page=1, n=100, appType=appType, systemToken=systemToken, instanceStatus="" # createFromTimeGMT=today_midnight.timestamp() * 1000 ) # 之后添加增量更新 all_process_list = [] PAGES_two = form_data_two.get('totalCount') // 100 + 1 # 手动控制小于3w PAGES_two = 290 for a in tqdm(range(1, PAGES_two + 1)): try: form_data_two = api_instance.read_processes_instances( token=TOKEN, formUuid=FORMID, page=a, n=100, appType=appType, systemToken=systemToken, instanceStatus="" ) all_process_list = all_process_list + form_data_two.get("data") except Exception as e: print(f"Error fetching page {a}: {e}") continue # break return all_process_list def extract_approval_records(process_instances): """提取每条流程的审批记录""" all_data_list = [] for data in tqdm(process_instances, desc="处理流程实例"): processInstanceId = data.get("processInstanceId") version = data.get("version") res_new = api_instance.get_approval_records( token=TOKEN, processInstanceId=processInstanceId, appType=appType, systemToken=systemToken ) records_new = res_new.get('result', []) for record in records_new: operateTimeGMT = record.get('operateTimeGMT') # if operateTimeGMT is not None: # operateTime = datetime.fromtimestamp(operateTimeGMT / 1000).strftime('%Y-%m-%d %H:%M:%S') # else: # operateTime = operateTimeGMT showName = record.get('showName') operatorName = record.get('operatorName') action = record.get('action') # data_id = record.get('dataId') activity_id = record.get('activityId') all_data_list.append( [operateTimeGMT, showName, operatorName, action, processInstanceId, version, activity_id]) df = pd.DataFrame(all_data_list) df.to_csv("审批记录.csv", index=False) return all_data_list def group_by_process(all_data_list): """按 '提交申请' 分组,一个流程为一组""" result_groups = [] current_group = [] j = 1 for record in all_data_list: showName = record[1] if showName == "提交申请": record.append(0) j = 1 if current_group: result_groups.append(current_group) current_group = [] current_group.append(record) else: record.append(j) j += 1 current_group.append(record) if current_group: result_groups.append(current_group) return result_groups def transform_to_wide_table(result_groups): """将审批记录从长表转为宽表""" flattened_rows = [] for group in result_groups: row_data = {} # 遍历其余审批节点 for i, item in enumerate(group, start=1): operateTimeGMT, showName, operatorName, action, dataId, version, activity_id, index, = item if action == "已撤销": showName = "该节点已撤销" row_data.update({ f'审批{i}时间': operateTimeGMT, f'审批{i}节点名': showName, f'审批{i}人': operatorName, f'审批{i}动作': action, f'序号{i}': index, f'审批{i}数据id': dataId, f'审批{i}流程版本': version, f'审批{i}流程节点id': activity_id }) flattened_rows.append(row_data) # 转换为DataFrame df_final = pd.DataFrame(flattened_rows) # 计算最大审批步骤 max_steps = max(len(group) - 1 for group in result_groups) # 减去提交节点 # 构建所有列名 all_columns = [ '审批时间', '审批节点名', '审批人', '审批动作', '序号', '数据id', '流程版本', '流程节点id' ] for i in range(1, max_steps + 1): all_columns.extend([ f'审批{i}时间', f'审批{i}节点名', f'审批{i}人', f'审批{i}动作', f'序号{i}', f'审批{i}数据id', f'审批{i}流程版本', f'审批{i}流程节点id' ]) # 统一列结构并填充缺失值 df_final = df_final.reindex(columns=all_columns) df_final.fillna("-", inplace=True) df_final = df_final[df_final['审批时间'].notna()] # 导出CSV df_final.to_csv("审批流程行转列结果_with_node_name.csv", index=False) return df_final, max_steps def classify_flows(df_final, max_steps): """根据审批节点名 + 动作组合进行流程分组""" def extract_signature(row): signature = [] i = 1 while f'审批{i}节点名' in row: node_name = row[f'审批{i}节点名'] action = row[f'审批{i}动作'] if node_name == "-": break signature.append((node_name, action)) i += 1 return signature def has_special_action(signature): special_actions = {"已撤销", "已转交", "已退回", "已拒绝"} for _, action in signature: if action in special_actions: return True return False def get_hash(signature): sig_str = str(signature) return hashlib.md5(sig_str.encode('utf-8')).hexdigest() df_final['signature'] = df_final.apply(extract_signature, axis=1) group_map = {} current_group_id = 1 df_final['group_id'] = 0 for idx, row in df_final.iterrows(): sig = row['signature'] if has_special_action(sig): df_final.at[idx, 'group_id'] = 5000 # current_group_id += 1 else: sig_hash = get_hash(sig) if sig_hash not in group_map: group_map[sig_hash] = current_group_id current_group_id += 1 df_final.at[idx, 'group_id'] = group_map[sig_hash] df_final.to_csv("审批流程分类结果.csv", index=False) print("✅ 分组完成,已保存至 '审批流程分类结果.csv'") result_rows = [] for index, row in df_final.iterrows(): base_info = {'group_id': row["group_id"]} process_id_list = [] process_list = [] for i in range(1, max_steps): prefix = f'审批{i}' if row[f'{prefix}流程节点id'] != "-": process_id = row[f'{prefix}流程节点id'] process_id_list.append(process_id) if row[f'{prefix}节点名'] != "-": process = row[f'{prefix}节点名'] process_list.append(process) for i in range(1, max_steps): # 审批1到审批n prefix = f'审批{i}' approval_data = {} if f'{prefix}时间' in df_final.columns and pd.notna(row[f'{prefix}时间']) and row[f'{prefix}时间'] != '-': approval_data = { '审批时间': row[f'{prefix}时间'], '审批节点名': row[f'{prefix}节点名'], '审批人': row[f'{prefix}人'], '审批动作': row[f'{prefix}动作'], '序号': row[f'序号{i}'] if f'序号{i}' in df_final.columns else '-', f'审批数据id': row[f'审批{i}数据id'] if f'审批{i}数据id' in df_final.columns else '-', f'审批流程版本': row[f'审批{i}流程版本'] if f'审批{i}流程版本' in df_final.columns else '-', f'审批流程节点id': row[f'审批{i}流程节点id'] if f'审批{i}流程节点id' in df_final.columns else '-', f'审批节点id合并': process_id_list, f'审批节点名合并': process_list } # 合并基础数据和审批数据 result_row = {**base_info, **approval_data} result_rows.append(result_row) dfn = pd.DataFrame(result_rows) dfn.to_csv("审批流程分类结果_with_node_name.csv", index=False) return dfn def time_calculate(df_final): """计算每个审批步骤之间的耗时(秒),并设置“提交申请”的耗时为0""" # 确保审批时间为 datetime 类型 df_final["审批时间"] = pd.to_datetime(df_final["审批时间"]) # 按照流程 ID 分组计算耗时(如果存在流程 ID 列) if '流程ID' in df_final.columns: df_final['耗时'] = df_final.groupby('流程ID')['审批时间'].diff().dt.total_seconds().div(60) else: df_final['耗时'] = df_final['审批时间'].diff().dt.total_seconds().div(60) # 将“提交申请”行的耗时设置为0 df_final.loc[df_final['审批动作'] == '提交申请', '耗时'] = 0 # 处理首行 NaN(或者非同一流程导致的 NaN) df_final['耗时'] = df_final['耗时'].fillna(0) df_final = df_final[df_final['审批时间'].notna()] # 保存结果 df_final.to_csv("最终结果.csv", index=False) return df_final if __name__ == "__main__": # Step 1: 获取流程实例 process_instances = fetch_process_data() # Step 2: 提取审批记录 all_data_list = extract_approval_records(process_instances) # Step 3: 按 '提交申请' 分组 result_groups = group_by_process(all_data_list) # Step 4: 转换为宽表 df_final, max_steps = transform_to_wide_table(result_groups) # Step 5: 对流程进行分类并保存结果 df_final1 = classify_flows(df_final, max_steps) # Step 6: 耗时计算 df_final2 = time_calculate(df_final1)