Files
F6--/张阳脚本/简道云/back_ground_tasks.py
T
2026-01-30 11:28:35 +08:00

68 lines
2.4 KiB
Python

import requests
import pandas as pd
import os
from typing import Dict, Any
from api import API
import time
from tqdm import tqdm
api_instance = API()
def update_jiandaoyun(data: Dict[str, Any], results: str):
""" 更新简道云表单 """
# 定义简道云数据配置
jiandaoyun_data = {
'api_key': data['api_key'],
'entry_id': data['entry_id'],
'data_id': data['data_id'],
"data": {
'_widget_1731379774828': {"value": "已执行"}, # f6系统批量操作测试 是否执行成功
'_widget_1731381334870': {"value": results} # f6系统批量操作测试 执行明细
}
}
time.sleep(1)
print(jiandaoyun_data)
api_instance.entry_data_update(jiandaoyun_data)
print('更新简道云表单成功')
def create_brand_background(data: Dict[str, Any], cookies: Dict[str, str], df: pd.DataFrame, save_path: str):
# 定义请求URL
url = 'https://yunxiu.f6car.cn/camaro/brand/getOrCreate'
# 遍历DataFrame中的每一行数据
results = []
for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="创建品牌"):
brand_name = row['品牌名']
try:
response = requests.post(url, cookies=cookies, json={"brandName": brand_name})
response.raise_for_status() # 抛出HTTP错误
results.append({'品牌名': brand_name, '状态': '创建成功'})
except requests.exceptions.RequestException as e:
results.append({'品牌名': brand_name, '状态': f'创建失败: {str(e)}'})
print({'msg': '已执行', 'msg_details': f'{results}'})
os.remove(save_path)
print(f'{save_path}已删除')
print(data)
# 调用api回写改掉 执行明细与执行状态
update_jiandaoyun(data, f'{results}')
def delete_history_background(data: Dict[str, Any], cookies: Dict[str, str], org_id: str, org_name: str):
url = f'https://yunxiu.f6car.cn/maintain-dump/maintainHistory/?orgid={org_id}' # 删除url
res = requests.delete(url=url, cookies=cookies)
res_data = res.json()
if res.status_code == 200 and res_data.get('code') == 200:
results = f'{org_name} 历史维修记录已删除'
print(results)
else:
results = f'删除 {org_name} 历史维修记录失败: {res_data.get("message")}'
print(results)
# 调用api回写改掉 执行明细与执行状态
time.sleep(1)
update_jiandaoyun(data, f'{results}')