from api import API from config import Config from typing import List import logging from log_config import configure_task_logger, configure_error_task_logger import csv from datetime import datetime # 获取已经配置好的常规日志记录器 logger = configure_task_logger() # 获取已经配置好的错误任务日志记录器 error_task_logger = configure_error_task_logger() class SampleCloudModules: def __init__(self): self.api_instance = API() def search_scheduled_tasks(self) -> List: """ 构造查询参数并调用 CloudAPI 获取表单数据。 :return: 返回从简道云获取的多条表单数据(字典格式) """ data = { 'api_key': Config.SCHEDULED_TASKS_APP_ID, 'entry_id': Config.SCHEDULED_TASKS_ENTRY_ID } try: data_dict = self.api_instance.entry_data_list(data) data_list = data_dict['data'] task_list = [] for data in data_list: task_id = data['_widget_1734403750536'] task_time = data['_widget_1734403750537'] task_switch = data['_widget_1734403750535'] task_list.extend([(task_id, task_time, task_switch)]) # 记录每条任务的详细信息 logging.info(f"Task fetched: ID={task_id}, Time={task_time}, Switch={task_switch}") return task_list except Exception as e: logging.error(f"Error occurred while fetching tasks: {e}") return [] @staticmethod def fetch_and_save_tasks(): """ 从云端表单读取数据并保存为 CSV 文件。 """ # print("执行111") try: cloud_modules = SampleCloudModules() tasks_data = cloud_modules.search_scheduled_tasks() # 创建任务列表 tasks = [] for task in tasks_data: unique_id, exec_time_str, is_switch_on_str = task # 使用 strptime 解析时间字符串 exec_time = datetime.strptime(exec_time_str, '%H:%M').time() # 将 is_switch_on 字符串转换为布尔值 is_switch_on = is_switch_on_str == "开" status = "待执行" if not is_switch_on: status = "已禁用" # 初始化任务字典 task_dict = { 'unique_id': unique_id, 'exec_time': exec_time.strftime('%H:%M'), # 将时间格式化为字符串 'is_switch_on': is_switch_on, # 确保 is_switch_on 是布尔值 'status': status, 'retry_count': 0, } # print(task_dict) tasks.append(task_dict) # 将任务列表保存为 CSV 文件 with open('tasks.csv', 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=['unique_id', 'exec_time', 'is_switch_on', 'status', 'retry_count']) writer.writeheader() # 写入表头 # print("写入数据") writer.writerows(tasks) # 写入任务数据 # print("写入完成") logger.info("任务已从云端获取并保存到 tasks.csv 文件。") except Exception as e: error_task_logger.error(f"从云端获取任务时发生异常: {e}") if __name__ == '__main__': start = SampleCloudModules() start.fetch_and_save_tasks()