Files
saas/sample_cloud_modules.py
2025-08-12 13:43:10 +08:00

101 lines
3.5 KiB
Python

from api import API
from config import Config
from typing import List
import logging
from log_config import configure_task_logger, configure_error_task_logger
import csv
from datetime import datetime
# 获取已经配置好的常规日志记录器
logger = configure_task_logger()
# 获取已经配置好的错误任务日志记录器
error_task_logger = configure_error_task_logger()
class SampleCloudModules:
def __init__(self):
self.api_instance = API()
def search_scheduled_tasks(self) -> List:
"""
构造查询参数并调用 CloudAPI 获取表单数据。
:return: 返回从简道云获取的多条表单数据(字典格式)
"""
data = {
'api_key': Config.SCHEDULED_TASKS_APP_ID,
'entry_id': Config.SCHEDULED_TASKS_ENTRY_ID
}
try:
data_dict = self.api_instance.entry_data_list(data)
data_list = data_dict['data']
task_list = []
for data in data_list:
task_id = data['_widget_1734403750536']
task_time = data['_widget_1734403750537']
task_switch = data['_widget_1734403750535']
task_list.extend([(task_id, task_time, task_switch)])
# 记录每条任务的详细信息
logging.info(f"Task fetched: ID={task_id}, Time={task_time}, Switch={task_switch}")
return task_list
except Exception as e:
logging.error(f"Error occurred while fetching tasks: {e}")
return []
@staticmethod
def fetch_and_save_tasks():
"""
从云端表单读取数据并保存为 CSV 文件。
"""
# print("执行111")
try:
cloud_modules = SampleCloudModules()
tasks_data = cloud_modules.search_scheduled_tasks()
# 创建任务列表
tasks = []
for task in tasks_data:
unique_id, exec_time_str, is_switch_on_str = task
# 使用 strptime 解析时间字符串
exec_time = datetime.strptime(exec_time_str, '%H:%M').time()
# 将 is_switch_on 字符串转换为布尔值
is_switch_on = is_switch_on_str == ""
status = "待执行"
if not is_switch_on:
status = "已禁用"
# 初始化任务字典
task_dict = {
'unique_id': unique_id,
'exec_time': exec_time.strftime('%H:%M'), # 将时间格式化为字符串
'is_switch_on': is_switch_on, # 确保 is_switch_on 是布尔值
'status': status,
'retry_count': 0,
}
# print(task_dict)
tasks.append(task_dict)
# 将任务列表保存为 CSV 文件
with open('tasks.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f,
fieldnames=['unique_id', 'exec_time', 'is_switch_on', 'status', 'retry_count'])
writer.writeheader() # 写入表头
# print("写入数据")
writer.writerows(tasks) # 写入任务数据
# print("写入完成")
logger.info("任务已从云端获取并保存到 tasks.csv 文件。")
except Exception as e:
error_task_logger.error(f"从云端获取任务时发生异常: {e}")
if __name__ == '__main__':
start = SampleCloudModules()
start.fetch_and_save_tasks()