101 lines
3.5 KiB
Python
101 lines
3.5 KiB
Python
from api import API
|
|
from config import Config
|
|
from typing import List
|
|
import logging
|
|
from log_config import configure_task_logger, configure_error_task_logger
|
|
import csv
|
|
from datetime import datetime
|
|
|
|
# 获取已经配置好的常规日志记录器
|
|
logger = configure_task_logger()
|
|
|
|
# 获取已经配置好的错误任务日志记录器
|
|
error_task_logger = configure_error_task_logger()
|
|
|
|
|
|
class SampleCloudModules:
|
|
def __init__(self):
|
|
self.api_instance = API()
|
|
|
|
def search_scheduled_tasks(self) -> List:
|
|
"""
|
|
构造查询参数并调用 CloudAPI 获取表单数据。
|
|
:return: 返回从简道云获取的多条表单数据(字典格式)
|
|
"""
|
|
data = {
|
|
'api_key': Config.SCHEDULED_TASKS_APP_ID,
|
|
'entry_id': Config.SCHEDULED_TASKS_ENTRY_ID
|
|
}
|
|
|
|
try:
|
|
data_dict = self.api_instance.entry_data_list(data)
|
|
data_list = data_dict['data']
|
|
task_list = []
|
|
for data in data_list:
|
|
task_id = data['_widget_1734403750536']
|
|
task_time = data['_widget_1734403750537']
|
|
task_switch = data['_widget_1734403750535']
|
|
task_list.extend([(task_id, task_time, task_switch)])
|
|
|
|
# 记录每条任务的详细信息
|
|
logging.info(f"Task fetched: ID={task_id}, Time={task_time}, Switch={task_switch}")
|
|
|
|
return task_list
|
|
except Exception as e:
|
|
logging.error(f"Error occurred while fetching tasks: {e}")
|
|
return []
|
|
|
|
@staticmethod
|
|
def fetch_and_save_tasks():
|
|
"""
|
|
从云端表单读取数据并保存为 CSV 文件。
|
|
"""
|
|
# print("执行111")
|
|
try:
|
|
cloud_modules = SampleCloudModules()
|
|
tasks_data = cloud_modules.search_scheduled_tasks()
|
|
|
|
# 创建任务列表
|
|
tasks = []
|
|
for task in tasks_data:
|
|
unique_id, exec_time_str, is_switch_on_str = task
|
|
|
|
# 使用 strptime 解析时间字符串
|
|
exec_time = datetime.strptime(exec_time_str, '%H:%M').time()
|
|
|
|
# 将 is_switch_on 字符串转换为布尔值
|
|
is_switch_on = is_switch_on_str == "开"
|
|
status = "待执行"
|
|
if not is_switch_on:
|
|
status = "已禁用"
|
|
|
|
# 初始化任务字典
|
|
task_dict = {
|
|
'unique_id': unique_id,
|
|
'exec_time': exec_time.strftime('%H:%M'), # 将时间格式化为字符串
|
|
'is_switch_on': is_switch_on, # 确保 is_switch_on 是布尔值
|
|
'status': status,
|
|
'retry_count': 0,
|
|
}
|
|
# print(task_dict)
|
|
|
|
tasks.append(task_dict)
|
|
|
|
# 将任务列表保存为 CSV 文件
|
|
with open('tasks.csv', 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f,
|
|
fieldnames=['unique_id', 'exec_time', 'is_switch_on', 'status', 'retry_count'])
|
|
writer.writeheader() # 写入表头
|
|
# print("写入数据")
|
|
writer.writerows(tasks) # 写入任务数据
|
|
# print("写入完成")
|
|
|
|
logger.info("任务已从云端获取并保存到 tasks.csv 文件。")
|
|
|
|
except Exception as e:
|
|
error_task_logger.error(f"从云端获取任务时发生异常: {e}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
start = SampleCloudModules()
|
|
start.fetch_and_save_tasks() |