402 lines
16 KiB
Python
402 lines
16 KiB
Python
from yd_api import YDAPI
|
||
from tqdm import tqdm
|
||
import hashlib
|
||
from datetime import datetime, timedelta
|
||
import pandas as pd
|
||
import mysql.connector
|
||
import json
|
||
from back_ground_module import CommonModule
|
||
import numpy as np
|
||
from config import Config
|
||
from log_config import configure_task_logger, configure_error_task_logger
|
||
|
||
# 获取已经配置好的常规日志记录器
|
||
logger = configure_task_logger()
|
||
|
||
# 获取已经配置好的错误任务日志记录器
|
||
error_task_logger = configure_error_task_logger()
|
||
common_module = CommonModule()
|
||
|
||
# 初始化 API 实例和 Token
|
||
api_instance = YDAPI()
|
||
TOKEN = api_instance.generateToken()
|
||
|
||
FORMID = "FORM-XHA66881FHMAR0F07GT4Y59GGA972DD6B5OHLB"
|
||
appType = "APP_RTPWHV37ENXPQUZHTL25"
|
||
systemToken = "IA766O61SHFZT6UB0WNOB58GI5RW2K58KCU1LL6"
|
||
|
||
|
||
class TimeConsumingProcess():
|
||
"""
|
||
获取流程表单数据耗时
|
||
"""
|
||
|
||
def fetch_process_data(self):
|
||
"""获取所有流程实例"""
|
||
today_midnight = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
||
yesterday_midnight = today_midnight - timedelta(days=1)
|
||
|
||
start_time = yesterday_midnight.strftime("%Y-%m-%d") # 昨天0点
|
||
end_time = today_midnight.strftime("%Y-%m-%d") # 今天0点
|
||
form_data_two = api_instance.read_processes_instances(
|
||
token=TOKEN, formUuid=FORMID, page=1, n=100,
|
||
appType=appType, systemToken=systemToken, instanceStatus="",
|
||
createFromTimeGMT=start_time, createToTimeGMT=end_time
|
||
) # 之后添加增量更新
|
||
|
||
all_process_list = []
|
||
|
||
PAGES_two = form_data_two.get('totalCount') // 100 + 1
|
||
|
||
for a in tqdm(range(1, PAGES_two + 1)):
|
||
try:
|
||
form_data_two = api_instance.read_processes_instances(
|
||
token=TOKEN, formUuid=FORMID, page=a, n=100,
|
||
appType=appType, systemToken=systemToken, instanceStatus="",
|
||
createFromTimeGMT=start_time, createToTimeGMT=end_time
|
||
)
|
||
all_process_list = all_process_list + form_data_two.get("data")
|
||
except Exception as e:
|
||
logger.warning(f"获取流程实例数据时出错: {e}")
|
||
continue
|
||
|
||
return all_process_list
|
||
|
||
def extract_approval_records(self, process_instances: list):
|
||
"""提取每条流程的审批记录"""
|
||
all_data_list = []
|
||
for data in tqdm(process_instances, desc="处理流程实例"):
|
||
processInstanceId = data.get("processInstanceId")
|
||
version = data.get("version")
|
||
|
||
res_new = api_instance.get_approval_records(
|
||
token=TOKEN, processInstanceId=processInstanceId,
|
||
appType=appType, systemToken=systemToken
|
||
)
|
||
records_new = res_new.get('result', [])
|
||
for record in records_new:
|
||
operateTimeGMT = record.get('operateTimeGMT')
|
||
# if operateTimeGMT is not None:
|
||
# operateTime = datetime.fromtimestamp(operateTimeGMT / 1000).strftime('%Y-%m-%d %H:%M:%S')
|
||
# else:
|
||
# operateTime = operateTimeGMT
|
||
showName = record.get('showName')
|
||
operatorName = record.get('operatorName')
|
||
action = record.get('action')
|
||
# data_id = record.get('dataId')
|
||
activity_id = record.get('activityId')
|
||
all_data_list.append(
|
||
[operateTimeGMT, showName, operatorName, action, processInstanceId, version, activity_id])
|
||
|
||
df = pd.DataFrame(all_data_list)
|
||
# df.to_csv("审批记录.csv", index=False)
|
||
|
||
return all_data_list
|
||
|
||
def group_by_process(self, all_data_list):
|
||
"""按 '提交申请' 分组,一个流程为一组"""
|
||
result_groups = []
|
||
current_group = []
|
||
j = 1
|
||
for record in all_data_list:
|
||
showName = record[1]
|
||
if showName == "提交申请":
|
||
record.append(0)
|
||
j = 1
|
||
if current_group:
|
||
result_groups.append(current_group)
|
||
current_group = []
|
||
current_group.append(record)
|
||
else:
|
||
record.append(j)
|
||
j += 1
|
||
current_group.append(record)
|
||
if current_group:
|
||
result_groups.append(current_group)
|
||
return result_groups
|
||
|
||
def transform_to_wide_table(self, result_groups):
|
||
"""将审批记录从长表转为宽表"""
|
||
flattened_rows = []
|
||
|
||
for group in result_groups:
|
||
row_data = {}
|
||
|
||
# 遍历其余审批节点
|
||
for i, item in enumerate(group, start=1):
|
||
operateTimeGMT, showName, operatorName, action, dataId, version, activity_id, index, = item
|
||
if action == "已撤销":
|
||
showName = "该节点已撤销"
|
||
row_data.update({
|
||
f'审批{i}时间': operateTimeGMT,
|
||
f'审批{i}节点名': showName,
|
||
f'审批{i}人': operatorName,
|
||
f'审批{i}动作': action,
|
||
f'序号{i}': index,
|
||
f'审批{i}数据id': dataId,
|
||
f'审批{i}流程版本': version,
|
||
f'审批{i}流程节点id': activity_id
|
||
})
|
||
|
||
flattened_rows.append(row_data)
|
||
|
||
# 转换为DataFrame
|
||
df_final = pd.DataFrame(flattened_rows)
|
||
|
||
# 计算最大审批步骤
|
||
max_steps = max(len(group) - 1 for group in result_groups) # 减去提交节点
|
||
|
||
# 构建所有列名
|
||
all_columns = [
|
||
'审批时间', '审批节点名', '审批人', '审批动作', '序号', '数据id', '流程版本', '流程节点id'
|
||
]
|
||
for i in range(1, max_steps + 1):
|
||
all_columns.extend([
|
||
f'审批{i}时间',
|
||
f'审批{i}节点名',
|
||
f'审批{i}人',
|
||
f'审批{i}动作',
|
||
f'序号{i}',
|
||
f'审批{i}数据id',
|
||
f'审批{i}流程版本',
|
||
f'审批{i}流程节点id'
|
||
])
|
||
|
||
# 统一列结构并填充缺失值
|
||
df_final = df_final.reindex(columns=all_columns)
|
||
# number填充为0string填充为-
|
||
numeric_cols = df_final.select_dtypes(include=['number']).columns
|
||
str_cols = df_final.select_dtypes(include=['object']).columns
|
||
|
||
df_final[numeric_cols] = df_final[numeric_cols].fillna(0)
|
||
df_final[str_cols] = df_final[str_cols].fillna("-")
|
||
df_final = df_final[df_final['审批时间'].notna()]
|
||
|
||
# 导出CSV
|
||
# df_final.to_csv("审批流程行转列结果_with_node_name.csv", index=False)
|
||
|
||
return df_final, max_steps
|
||
|
||
def classify_flows(self, df_final, max_steps):
|
||
"""根据审批节点名 + 动作组合进行流程分组"""
|
||
|
||
def extract_signature(row):
|
||
signature = []
|
||
i = 1
|
||
while f'审批{i}节点名' in row:
|
||
node_name = row[f'审批{i}节点名']
|
||
action = row[f'审批{i}动作']
|
||
if node_name == "-":
|
||
break
|
||
signature.append((node_name, action))
|
||
i += 1
|
||
return signature
|
||
|
||
def has_special_action(signature):
|
||
special_actions = {"已撤销", "已转交", "已退回", "已拒绝"}
|
||
for _, action in signature:
|
||
if action in special_actions:
|
||
return True
|
||
return False
|
||
|
||
def get_hash(signature):
|
||
sig_str = str(signature)
|
||
return hashlib.md5(sig_str.encode('utf-8')).hexdigest()
|
||
|
||
df_final['signature'] = df_final.apply(extract_signature, axis=1)
|
||
|
||
group_map = {}
|
||
current_group_id = 1
|
||
df_final['group_id'] = 0
|
||
|
||
for idx, row in df_final.iterrows():
|
||
sig = row['signature']
|
||
if has_special_action(sig):
|
||
df_final.at[idx, 'group_id'] = 5000
|
||
# current_group_id += 1
|
||
else:
|
||
sig_hash = get_hash(sig)
|
||
if sig_hash not in group_map:
|
||
group_map[sig_hash] = current_group_id
|
||
current_group_id += 1
|
||
df_final.at[idx, 'group_id'] = group_map[sig_hash]
|
||
|
||
# df_final.to_csv("审批流程分类结果.csv", index=False)
|
||
# print("✅ 分组完成,已保存至 '审批流程分类结果.csv'")
|
||
|
||
result_rows = []
|
||
for index, row in df_final.iterrows():
|
||
base_info = {'group_id': row["group_id"]}
|
||
process_id_list = []
|
||
process_list = []
|
||
for i in range(1, max_steps):
|
||
prefix = f'审批{i}'
|
||
if row[f'{prefix}流程节点id'] != "-":
|
||
process_id = row[f'{prefix}流程节点id']
|
||
process_id_list.append(process_id)
|
||
if row[f'{prefix}节点名'] != "-":
|
||
process = row[f'{prefix}节点名']
|
||
process_list.append(process)
|
||
|
||
for i in range(1, max_steps): # 审批1到审批n
|
||
prefix = f'审批{i}'
|
||
approval_data = {}
|
||
if f'{prefix}时间' in df_final.columns and pd.notna(row[f'{prefix}时间']) and row[
|
||
f'{prefix}时间'] != '-':
|
||
approval_data = {
|
||
'审批时间': row[f'{prefix}时间'],
|
||
'审批节点名': row[f'{prefix}节点名'],
|
||
'审批人': row[f'{prefix}人'],
|
||
'审批动作': row[f'{prefix}动作'],
|
||
'序号': row[f'序号{i}'] if f'序号{i}' in df_final.columns else '-',
|
||
f'审批数据id': row[f'审批{i}数据id'] if f'审批{i}数据id' in df_final.columns else '-',
|
||
f'审批流程版本': row[f'审批{i}流程版本'] if f'审批{i}流程版本' in df_final.columns else '-',
|
||
f'审批流程节点id': row[
|
||
f'审批{i}流程节点id'] if f'审批{i}流程节点id' in df_final.columns else '-',
|
||
f'审批节点id合并': process_id_list,
|
||
f'审批节点名合并': process_list
|
||
}
|
||
# 合并基础数据和审批数据
|
||
result_row = {**base_info, **approval_data}
|
||
result_rows.append(result_row)
|
||
dfn = pd.DataFrame(result_rows)
|
||
|
||
# dfn.to_csv("审批流程分类结果_with_node_name.csv", index=False)
|
||
|
||
return dfn
|
||
|
||
def time_calculate(self, df_final):
|
||
"""计算每个审批步骤之间的耗时(秒),并设置"提交申请"的耗时为0"""
|
||
# 创建副本避免SettingWithCopyWarning
|
||
df_final = df_final.copy()
|
||
|
||
# 确保审批时间为 datetime 类型
|
||
df_final["审批时间"] = pd.to_datetime(df_final["审批时间"])
|
||
|
||
# 处理空值(修复FutureWarning)
|
||
numeric_cols = df_final.select_dtypes(include=['number']).columns
|
||
df_final[numeric_cols] = df_final[numeric_cols].fillna(0)
|
||
str_cols = df_final.select_dtypes(include=['object']).columns
|
||
df_final[str_cols] = df_final[str_cols].fillna("-")
|
||
|
||
# 处理JSON列(修复其他SettingWithCopyWarning)
|
||
df_final.loc[:, '审批节点id合并'] = df_final['审批节点id合并'].apply(
|
||
lambda x: json.dumps(x, ensure_ascii=False) if isinstance(x, (list, dict)) else x
|
||
)
|
||
df_final.loc[:, '审批节点名合并'] = df_final['审批节点名合并'].apply(
|
||
lambda x: json.dumps(x, ensure_ascii=False) if isinstance(x, (list, dict)) else x
|
||
)
|
||
|
||
# 计算耗时
|
||
df_final.loc[:, '耗时'] = df_final['审批时间'].diff().dt.total_seconds().div(60)
|
||
df_final.loc[df_final['审批动作'] == '提交申请', '耗时'] = 0
|
||
df_final['耗时'] = df_final['耗时'].fillna(0)
|
||
|
||
# 处理时间转换(修复第一个SettingWithCopyWarning)
|
||
df_final.loc[:, "审批时间"] = pd.to_datetime(df_final["审批时间"]).dt.strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
# 拼接url
|
||
df_final[
|
||
"url"] = "https://f6car.aliwork.com/APP_RTPWHV37ENXPQUZHTL25/formDetail/FORM-XHA66881FHMAR0F07GT4Y59GGA972DD6B5OHLB?formInstId=" + \
|
||
df_final["审批数据id"].astype(str)
|
||
return df_final
|
||
|
||
def write_to_bi(self, df_final):
|
||
# 连接信息
|
||
|
||
table_name = "yida_process_time_statistics"
|
||
|
||
# 连接数据库
|
||
connection = mysql.connector.connect(
|
||
host=Config.BI_CONN_host,
|
||
user=Config.BI_CONN_INFO_user,
|
||
password=Config.BI_CONN_INFO_password,
|
||
database=Config.BI_CONN_INFO_database
|
||
)
|
||
print(f"成功连接 {Config.BI_CONN_INFO_database}")
|
||
cursor = connection.cursor()
|
||
|
||
# 数据预处理
|
||
df = df_final.copy()
|
||
|
||
# 1. 转换时间戳为字符串
|
||
for col in df.select_dtypes(include=['datetime64[ns]', 'datetimetz']).columns:
|
||
df[col] = pd.to_datetime(df[col]).dt.strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
# 2. 增强空值处理(包含更多可能的空值表示)
|
||
empty_values = [np.nan, None, '', 'null', 'NULL', 'NA', 'NaN', 'None', '-']
|
||
df = df.replace(empty_values, None)
|
||
|
||
# 3. 严格过滤条件(确保所有关键字段都有有效值)
|
||
required_columns = ['审批节点名', '审批时间', 'group_id', '审批人', '审批动作']
|
||
df = df.dropna(subset=required_columns, how='any')
|
||
|
||
# 4. 额外检查:确保时间格式正确
|
||
df = df[df['审批时间'].str.match(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$') | df['审批时间'].isnull()]
|
||
|
||
# 生成插入语句
|
||
try:
|
||
columns = ', '.join(df.columns)
|
||
placeholders = ', '.join(['%s'] * len(df.columns))
|
||
insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
|
||
|
||
# 批量插入数据
|
||
records = [tuple(row) for row in df.values]
|
||
cursor.executemany(insert_query, records)
|
||
connection.commit()
|
||
print(f"成功导入 {cursor.rowcount} 条记录到 {table_name} 表")
|
||
|
||
except Exception as e:
|
||
connection.rollback()
|
||
print(f"写入失败: {str(e)}")
|
||
# 打印前5条问题数据
|
||
print("问题数据示例:")
|
||
print(df.head().to_string())
|
||
|
||
finally:
|
||
cursor.close()
|
||
connection.close()
|
||
|
||
def main(self):
|
||
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
try:
|
||
logger.info("开始执行宜搭流程耗时写入BI任务。")
|
||
# Step 1: 获取流程实例
|
||
process_instances = self.fetch_process_data()
|
||
logger.info("获取流程实例成功。")
|
||
|
||
# Step 2: 提取审批记录
|
||
all_data_list = self.extract_approval_records(process_instances)
|
||
logger.info("提取审批记录成功。")
|
||
|
||
# Step 3: 按 '提交申请' 分组
|
||
result_groups = self.group_by_process(all_data_list)
|
||
logger.info("按 '提交申请' 分组成功。")
|
||
|
||
# Step 4: 转换为宽表
|
||
df_final, max_steps = self.transform_to_wide_table(result_groups)
|
||
logger.info("转换为宽表成功。")
|
||
|
||
# Step 5: 对流程进行分类并保存结果
|
||
df_final1 = self.classify_flows(df_final, max_steps)
|
||
logger.info("对流程进行分类并保存结果成功。")
|
||
|
||
# Step 6: 耗时计算
|
||
df_final2 = self.time_calculate(df_final1)
|
||
logger.info("耗时计算成功。")
|
||
|
||
# Step 7: 向BI写入数据
|
||
self.write_to_bi(df_final2)
|
||
logger.info("向BI写入数据成功。")
|
||
|
||
common_module.send_task_status(task_start_time, "宜搭流程耗时写入BI")
|
||
logger.info("宜搭流程耗时写入BI任务执行成功。")
|
||
except Exception as e:
|
||
error_task_logger.error(f"宜搭流程耗时写入BI执行出错: {e}")
|
||
common_module.send_task_error(task_start_time, "宜搭流程耗时写入BI", str(e))
|
||
|
||
|
||
if __name__ == '__main__':
|
||
start = TimeConsumingProcess()
|
||
start.main()
|