Files
saas/test/获取流程表单数据.py
T
2025-08-12 13:43:10 +08:00

343 lines
13 KiB
Python

from yd_api import YDAPI
import pandas as pd
from tqdm import tqdm
import hashlib
from datetime import datetime
import pandas as pd
import mysql.connector
from mysql.connector import Error
import json
# 初始化 API 实例和 Token
api_instance = YDAPI()
TOKEN = api_instance.generateToken()
FORMID = "FORM-XHA66881FHMAR0F07GT4Y59GGA972DD6B5OHLB"
appType = "APP_RTPWHV37ENXPQUZHTL25"
systemToken = "IA766O61SHFZT6UB0WNOB58GI5RW2K58KCU1LL6"
class TimeConsumingProcess():
"""
获取流程表单数据耗时
"""
def fetch_process_data(self):
"""获取所有流程实例"""
today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0).timestamp()* 1000 # 当天0点
form_data_two = api_instance.read_processes_instances(
token=TOKEN, formUuid=FORMID, page=1, n=100,
appType=appType, systemToken=systemToken, instanceStatus="",
createFromTimeGMT=today_midnight
) # 之后添加增量更新
all_process_list = []
PAGES_two = form_data_two.get('totalCount') // 100 + 1
# 手动控制小于3w
PAGES_two = 290
for a in tqdm(range(1, PAGES_two + 1)):
try:
form_data_two = api_instance.read_processes_instances(
token=TOKEN, formUuid=FORMID, page=a, n=100,
appType=appType, systemToken=systemToken, instanceStatus=""
)
all_process_list = all_process_list + form_data_two.get("data")
except Exception as e:
print(f"Error fetching page {a}: {e}")
continue
return all_process_list
def extract_approval_records(self, process_instances):
"""提取每条流程的审批记录"""
all_data_list = []
for data in tqdm(process_instances, desc="处理流程实例"):
processInstanceId = data.get("processInstanceId")
version = data.get("version")
res_new = api_instance.get_approval_records(
token=TOKEN, processInstanceId=processInstanceId,
appType=appType, systemToken=systemToken
)
records_new = res_new.get('result', [])
for record in records_new:
operateTimeGMT = record.get('operateTimeGMT')
# if operateTimeGMT is not None:
# operateTime = datetime.fromtimestamp(operateTimeGMT / 1000).strftime('%Y-%m-%d %H:%M:%S')
# else:
# operateTime = operateTimeGMT
showName = record.get('showName')
operatorName = record.get('operatorName')
action = record.get('action')
# data_id = record.get('dataId')
activity_id = record.get('activityId')
all_data_list.append(
[operateTimeGMT, showName, operatorName, action, processInstanceId, version, activity_id])
df = pd.DataFrame(all_data_list)
# df.to_csv("审批记录.csv", index=False)
return all_data_list
def group_by_process(self, all_data_list):
"""'提交申请' 分组,一个流程为一组"""
result_groups = []
current_group = []
j = 1
for record in all_data_list:
showName = record[1]
if showName == "提交申请":
record.append(0)
j = 1
if current_group:
result_groups.append(current_group)
current_group = []
current_group.append(record)
else:
record.append(j)
j += 1
current_group.append(record)
if current_group:
result_groups.append(current_group)
return result_groups
def transform_to_wide_table(self, result_groups):
"""将审批记录从长表转为宽表"""
flattened_rows = []
for group in result_groups:
row_data = {}
# 遍历其余审批节点
for i, item in enumerate(group, start=1):
operateTimeGMT, showName, operatorName, action, dataId, version, activity_id, index, = item
if action == "已撤销":
showName = "该节点已撤销"
row_data.update({
f'审批{i}时间': operateTimeGMT,
f'审批{i}节点名': showName,
f'审批{i}': operatorName,
f'审批{i}动作': action,
f'序号{i}': index,
f'审批{i}数据id': dataId,
f'审批{i}流程版本': version,
f'审批{i}流程节点id': activity_id
})
flattened_rows.append(row_data)
# 转换为DataFrame
df_final = pd.DataFrame(flattened_rows)
# 计算最大审批步骤
max_steps = max(len(group) - 1 for group in result_groups) # 减去提交节点
# 构建所有列名
all_columns = [
'审批时间', '审批节点名', '审批人', '审批动作', '序号', '数据id', '流程版本', '流程节点id'
]
for i in range(1, max_steps + 1):
all_columns.extend([
f'审批{i}时间',
f'审批{i}节点名',
f'审批{i}',
f'审批{i}动作',
f'序号{i}',
f'审批{i}数据id',
f'审批{i}流程版本',
f'审批{i}流程节点id'
])
# 统一列结构并填充缺失值
df_final = df_final.reindex(columns=all_columns)
df_final.fillna("-", inplace=True)
df_final = df_final[df_final['审批时间'].notna()]
# 导出CSV
# df_final.to_csv("审批流程行转列结果_with_node_name.csv", index=False)
return df_final, max_steps
def classify_flows(self, df_final, max_steps):
"""根据审批节点名 + 动作组合进行流程分组"""
def extract_signature(row):
signature = []
i = 1
while f'审批{i}节点名' in row:
node_name = row[f'审批{i}节点名']
action = row[f'审批{i}动作']
if node_name == "-":
break
signature.append((node_name, action))
i += 1
return signature
def has_special_action(signature):
special_actions = {"已撤销", "已转交", "已退回", "已拒绝"}
for _, action in signature:
if action in special_actions:
return True
return False
def get_hash(signature):
sig_str = str(signature)
return hashlib.md5(sig_str.encode('utf-8')).hexdigest()
df_final['signature'] = df_final.apply(extract_signature, axis=1)
group_map = {}
current_group_id = 1
df_final['group_id'] = 0
for idx, row in df_final.iterrows():
sig = row['signature']
if has_special_action(sig):
df_final.at[idx, 'group_id'] = 5000
# current_group_id += 1
else:
sig_hash = get_hash(sig)
if sig_hash not in group_map:
group_map[sig_hash] = current_group_id
current_group_id += 1
df_final.at[idx, 'group_id'] = group_map[sig_hash]
# df_final.to_csv("审批流程分类结果.csv", index=False)
print("✅ 分组完成,已保存至 '审批流程分类结果.csv'")
result_rows = []
for index, row in df_final.iterrows():
base_info = {'group_id': row["group_id"]}
process_id_list = []
process_list = []
for i in range(1, max_steps):
prefix = f'审批{i}'
if row[f'{prefix}流程节点id'] != "-":
process_id = row[f'{prefix}流程节点id']
process_id_list.append(process_id)
if row[f'{prefix}节点名'] != "-":
process = row[f'{prefix}节点名']
process_list.append(process)
for i in range(1, max_steps): # 审批1到审批n
prefix = f'审批{i}'
approval_data = {}
if f'{prefix}时间' in df_final.columns and pd.notna(row[f'{prefix}时间']) and row[
f'{prefix}时间'] != '-':
approval_data = {
'审批时间': row[f'{prefix}时间'],
'审批节点名': row[f'{prefix}节点名'],
'审批人': row[f'{prefix}'],
'审批动作': row[f'{prefix}动作'],
'序号': row[f'序号{i}'] if f'序号{i}' in df_final.columns else '-',
f'审批数据id': row[f'审批{i}数据id'] if f'审批{i}数据id' in df_final.columns else '-',
f'审批流程版本': row[f'审批{i}流程版本'] if f'审批{i}流程版本' in df_final.columns else '-',
f'审批流程节点id': row[
f'审批{i}流程节点id'] if f'审批{i}流程节点id' in df_final.columns else '-',
f'审批节点id合并': process_id_list,
f'审批节点名合并': process_list
}
# 合并基础数据和审批数据
result_row = {**base_info, **approval_data}
result_rows.append(result_row)
dfn = pd.DataFrame(result_rows)
# dfn.to_csv("审批流程分类结果_with_node_name.csv", index=False)
return dfn
def time_calculate(self, df_final):
"""计算每个审批步骤之间的耗时(秒),并设置“提交申请”的耗时为0"""
# 确保审批时间为 datetime 类型
df_final["审批时间"] = pd.to_datetime(df_final["审批时间"])
# 按照流程 ID 分组计算耗时(如果存在流程 ID 列)
if '流程ID' in df_final.columns:
df_final['耗时'] = df_final.groupby('流程ID')['审批时间'].diff().dt.total_seconds().div(60)
else:
df_final['耗时'] = df_final['审批时间'].diff().dt.total_seconds().div(60)
# 将“提交申请”行的耗时设置为0
df_final.loc[df_final['审批动作'] == '提交申请', '耗时'] = 0
# 处理首行 NaN(或者非同一流程导致的 NaN)
df_final['耗时'] = df_final['耗时'].fillna(0)
df_final = df_final[df_final['审批时间'].notna()]
# 列表替换为字符串
df_final['审批节点id合并'] = df_final['审批节点id合并'].apply(json.dumps, ensure_ascii=False)
df_final['审批节点名合并'] = df_final['审批节点名合并'].apply(json.dumps, ensure_ascii=False)
# 保存结果
# df_final.to_csv("最终结果.csv", index=False)
return df_final
def write_to_bi(self, df_final):
# 连接信息
host = "rm-uf6r230vbtxf5gdz63o.mysql.rds.aliyuncs.com"
user = "rw_operation_data_relay"
password = "m+q5Z4%IVuF9bf"
database = "f6operation_data_relay"
table_name = "yida_process_time_statistics"
# 连接
connection = mysql.connector.connect(
host=host,
user=user,
password=password,
database=database
)
print(f"成功连接 {database}")
cursor = connection.cursor()
# 处理数据
df = df_final.where(pd.notna(df_final), None) # 将NaN转换为None
# 生成插入语句
columns = ', '.join(df.columns)
placeholders = ', '.join(['%s'] * len(df.columns))
insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
# 批量插入数据
records = [tuple(row) for row in df.values]
cursor.executemany(insert_query, records)
connection.commit()
print(f"成功导入 {cursor.rowcount} 条记录到 {table_name}")
cursor.close()
connection.close()
def main(self):
# Step 1: 获取流程实例
process_instances = self.fetch_process_data()
# Step 2: 提取审批记录
all_data_list = self.extract_approval_records(process_instances)
# Step 3: 按 '提交申请' 分组
result_groups = self.group_by_process(all_data_list)
# Step 4: 转换为宽表
df_final, max_steps = self.transform_to_wide_table(result_groups)
# Step 5: 对流程进行分类并保存结果
df_final1 = self.classify_flows(df_final, max_steps)
# Step 6: 耗时计算
df_final2 = self.time_calculate(df_final1)
# Step 7: 向BI写入数据
self.write_to_bi(df_final2)
if __name__ == '__main__':
start = TimeConsumingProcess()
start.main()