662 lines
26 KiB
Python
662 lines
26 KiB
Python
from datetime import datetime, timezone, timedelta, date, UTC
|
||
import holidays
|
||
from config import Config
|
||
import psycopg2
|
||
import pandas as pd
|
||
import pymysql
|
||
from api import API
|
||
from log_config import configure_task_logger, configure_error_task_logger
|
||
|
||
|
||
api_instance = API()
|
||
# 获取已经配置好的常规日志记录器
|
||
logger = configure_task_logger()
|
||
|
||
# 获取已经配置好的错误任务日志记录器
|
||
error_task_logger = configure_error_task_logger()
|
||
|
||
|
||
class CommonModule:
|
||
"""工具类"""
|
||
|
||
def __init__(self):
|
||
# 初始化中国节假日对象
|
||
self.cn_holidays = holidays.country_holidays('CN', years=2025)
|
||
# 创建一个存储日期的集合,用于去重
|
||
self.date_set = set()
|
||
self.conn = Config.CONN_INFO
|
||
|
||
def time_to_UTC(self, time_input):
|
||
"""
|
||
时间转为UTC格式(兼容字符串和Timestamp输入)
|
||
:param time_input: 支持以下格式:
|
||
- 字符串格式:"2025-06-05 14:30:00" 或 "2025-06-05"
|
||
- Pandas Timestamp对象
|
||
:return: ISO8601格式的UTC时间字符串(如"2025-06-05T06:30:00Z")
|
||
"""
|
||
# 处理Timestamp输入
|
||
if isinstance(time_input, pd.Timestamp):
|
||
dt = time_input.to_pydatetime()
|
||
# 处理字符串输入
|
||
else:
|
||
try:
|
||
dt = datetime.strptime(time_input, "%Y-%m-%d %H:%M:%S")
|
||
except ValueError:
|
||
try:
|
||
dt = datetime.strptime(time_input, "%Y-%m-%d")
|
||
except ValueError as e:
|
||
raise ValueError(f"不支持的日期格式: {time_input}") from e
|
||
|
||
# 转换为UTC时间(假设原始时间为东八区)
|
||
dt_utc = dt - timedelta(hours=8)
|
||
return dt_utc.replace(tzinfo=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
|
||
def get_holiday_list(self, year=None):
|
||
"""
|
||
获取指定年份的节假日和周日列表。
|
||
如果没有提供年份,则使用当前年份。
|
||
"""
|
||
if year is None:
|
||
year = date.today().year
|
||
|
||
# 清空集合,防止重复添加数据
|
||
self.date_set.clear()
|
||
|
||
# 添加节假日到集合
|
||
for holiday in self.cn_holidays:
|
||
if holiday.year == year:
|
||
self.date_set.add(str(holiday))
|
||
|
||
# 添加该年的所有周日到集合
|
||
start_date = date(year, 1, 1)
|
||
end_date = date(year, 12, 31)
|
||
current_date = start_date
|
||
while current_date <= end_date:
|
||
if current_date.weekday() == 6: # 周日的weekday值为6
|
||
self.date_set.add(str(current_date))
|
||
current_date += timedelta(days=1)
|
||
|
||
# 将集合转换为排序后的列表
|
||
return sorted(list(self.date_set), reverse=False)
|
||
|
||
def get_ngv_details(self, days_back=1):
|
||
"""
|
||
从固定的数据库中获取前几天的NGV明细。
|
||
参数 `days_back` 表示相对于今天的天数偏移量,默认为1(即前一天)。
|
||
返回包含NGV明细的pandas DataFrame。
|
||
"""
|
||
try:
|
||
# 获得连接
|
||
conn = psycopg2.connect(**self.conn)
|
||
cursor = conn.cursor()
|
||
|
||
# 获取指定天数前的日期
|
||
now_time = datetime.now()
|
||
target_time = now_time + timedelta(days=-days_back)
|
||
target_date_id = int(target_time.strftime('%Y%m%d')) # 获取目标日期
|
||
|
||
# sql语句查询
|
||
sql = f"""
|
||
SELECT * FROM "public"."holo_ads_report_saas_profile_ngv_detail_d" WHERE "date_id" = '{target_date_id}' ;
|
||
"""
|
||
|
||
# 执行语句并获取结果集
|
||
cursor.execute(sql)
|
||
rows = cursor.fetchall()
|
||
all_fields = cursor.description
|
||
|
||
# 执行结果转化为dataframe
|
||
col = [i[0] for i in all_fields]
|
||
data_NGV = pd.DataFrame(rows, columns=col)
|
||
|
||
# 尝试自动解析日期时间字符串
|
||
time_format = "%Y-%m-%d %H:%M:%S"
|
||
if 'saas_create_time' in data_NGV.columns:
|
||
data_NGV['saas_create_time'] = pd.to_datetime(data_NGV['saas_create_time'], format=time_format,
|
||
errors='coerce')
|
||
data_NGV['saas_create_time'] = data_NGV['saas_create_time'].dt.strftime('%Y-%m-%d')
|
||
|
||
# 关闭游标和连接
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
return data_NGV
|
||
|
||
except Exception as e:
|
||
error_task_logger.error(f"获取NGV明细失败: {e}")
|
||
return None
|
||
|
||
def get_yichang_details(self, days_back=1):
|
||
"""
|
||
从固定的数据库中获取前几天的异常明细。
|
||
参数 `days_back` 表示相对于今天的天数偏移量,默认为1(即前一天)。
|
||
返回包含异常明细的pandas DataFrame。
|
||
"""
|
||
try:
|
||
# 获得连接
|
||
conn = psycopg2.connect(**self.conn)
|
||
cursor = conn.cursor()
|
||
|
||
# 获取指定天数前的日期
|
||
now_time = datetime.now()
|
||
|
||
target_time = now_time + timedelta(days=-days_back)
|
||
target_date_id = int(target_time.strftime('%Y%m%d')) # 获取目标日期
|
||
|
||
# sql语句查询
|
||
sql = f""" SELECT * FROM "public"."holo_ads_dataservice_saas_org_health_warning" WHERE "pt" = '{target_date_id}' and "org_type" = '一般';"""
|
||
# sql = f""" SELECT * FROM "public"."holo_ads_dataservice_saas_org_health_warning" """
|
||
|
||
# 执行语句并获取结果集
|
||
cursor.execute(sql)
|
||
rows = cursor.fetchall()
|
||
all_fields = cursor.description
|
||
|
||
# 执行结果转化为dataframe
|
||
col = [i[0] for i in all_fields]
|
||
data_yichang = pd.DataFrame(rows, columns=col)
|
||
# print(data_yichang.head(10))
|
||
|
||
# 尝试自动解析日期时间字符串
|
||
time_format = "%Y-%m-%d %H:%M:%S"
|
||
if 'saas_create_time' in data_yichang.columns:
|
||
data_yichang['saas_create_time'] = pd.to_datetime(data_yichang['saas_create_time'], format=time_format,
|
||
errors='coerce')
|
||
data_yichang['saas_create_time'] = data_yichang['saas_create_time'].dt.strftime('%Y-%m-%d')
|
||
|
||
# 关闭游标和连接
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
|
||
return data_yichang
|
||
|
||
except Exception as e:
|
||
error_task_logger.error(f"获取异常明细时出错: {e}")
|
||
return None
|
||
|
||
def get_renewal_details(self, ):
|
||
"""
|
||
从固定的数据库中获取续约待办数据
|
||
"""
|
||
try:
|
||
# 获得连接
|
||
conn = psycopg2.connect(**self.conn)
|
||
cursor = conn.cursor()
|
||
|
||
# 获取指定天数前的日期
|
||
now_time = datetime.now()
|
||
yes_time = now_time + timedelta(days=-2)
|
||
yes_time_nyr = int(yes_time.strftime('%Y%m%d')) # 获取前两天日期
|
||
|
||
# 获取指定天数前的日期
|
||
today = date.today()
|
||
days_to_add = 120
|
||
future_date = str(today + timedelta(days=days_to_add))
|
||
|
||
print("距离今天还有{}天的日期是:{}".format(days_to_add, future_date))
|
||
|
||
sql = f"""SELECT * FROM "public"."holo_ads_report_saas_profile_ngv_detail_d" WHERE "date_id" = '{yes_time_nyr}' and "expiry_time" like '%{future_date}%';"""
|
||
# 执行语句并获取结果集
|
||
cursor.execute(sql)
|
||
rows = cursor.fetchall()
|
||
all_fields = cursor.description # 获取所有字段名
|
||
|
||
# 执行结果转化为dataframe
|
||
col = [i[0] for i in all_fields]
|
||
data_NGV = pd.DataFrame(list(rows), columns=col)
|
||
|
||
# 关闭数据库连接
|
||
cursor.close()
|
||
conn.close()
|
||
|
||
return data_NGV
|
||
except Exception as e:
|
||
error_task_logger.error(f"获取续约待办数据时出错: {e}")
|
||
return None
|
||
|
||
|
||
def get_jcb_details(self, ):
|
||
"""
|
||
从固定的数据库中获取前几天的NGV明细。
|
||
参数 `days_back` 表示相对于今天的天数偏移量,默认为1(即前一天)。
|
||
返回包含NGV明细的pandas DataFrame。
|
||
"""
|
||
# 保存为CSV文件
|
||
output_dir = "output" # 设置输出目录
|
||
|
||
# 创建输出目录(如果不存在)
|
||
import os
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
try:
|
||
# 获得连接并创建游标
|
||
conn = pymysql.connect(
|
||
host=Config.BI_CONN_host,
|
||
database=Config.BI_CONN_INFO_database,
|
||
user=Config.BI_CONN_INFO_user,
|
||
password=Config.BI_CONN_INFO_password,
|
||
# charset='utf8mb4', # 设置字符集以避免编码问题
|
||
# cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
|
||
)
|
||
cursor = conn.cursor()
|
||
|
||
# 获取指定天数前的日期
|
||
# now_time = datetime.now()
|
||
# target_time = now_time + timedelta(days=-days_back)
|
||
target_date_id = "接车宝" # 获取目标日期
|
||
|
||
# SQL 查询语句
|
||
sql = f"""
|
||
SELECT * FROM jdy_hs_holo_dws_sales_magic_box_ngv_d WHERE 产品名称 = '{target_date_id}';
|
||
"""
|
||
|
||
# 执行查询并获取结果
|
||
cursor.execute(sql)
|
||
rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表
|
||
|
||
# 将结果转换为 DataFrame
|
||
if rows: # 如果有数据
|
||
data_NGV = pd.DataFrame(rows)
|
||
if data_NGV.shape[1] > 16:
|
||
data_NGV = data_NGV.iloc[:, :16]
|
||
else: # 如果没有数据,返回空 DataFrame
|
||
data_NGV = pd.DataFrame()
|
||
|
||
# 尝试自动解析日期时间字符串
|
||
time_format = "%Y-%m-%d %H:%M:%S"
|
||
if 'saas_create_time' in data_NGV.columns:
|
||
data_NGV['saas_create_time'] = pd.to_datetime(data_NGV['saas_create_time'], format=time_format,
|
||
errors='coerce')
|
||
data_NGV['saas_create_time'] = data_NGV['saas_create_time'].dt.strftime('%Y-%m-%d')
|
||
|
||
# 关闭游标
|
||
cursor.close()
|
||
headers = [
|
||
"日期",
|
||
"产品名称",
|
||
"账号", # 唯一值
|
||
"联系手机号",
|
||
"开户日",
|
||
"使用时长",
|
||
"续约日",
|
||
"到期日",
|
||
"客户状态",
|
||
"近一周开单量",
|
||
"近一周是否活跃",
|
||
"G状态:近30天开单大于等于10天",
|
||
"当月开单天数",
|
||
"近30天开单天数",
|
||
"当月G天数",
|
||
"日分区",
|
||
]
|
||
data_NGV.columns = headers
|
||
|
||
cols_to_str = ["日期", "开户日", "续约日", "到期日"]
|
||
data_NGV[cols_to_str] = data_NGV[cols_to_str].apply(lambda x: x.astype(str))
|
||
|
||
data_NGV.to_csv(os.path.join(output_dir, f"{target_date_id}.csv"), index=False)
|
||
|
||
return data_NGV
|
||
|
||
except Exception as e:
|
||
error_task_logger.error(f"获取借车宝NGV明细时出错: {e}")
|
||
return None
|
||
|
||
def get_syxcx_details(self, ):
|
||
"""
|
||
从f6operation_data_relay数据库中获取私域小程序数据。
|
||
返回pandas DataFrame。
|
||
"""
|
||
|
||
try:
|
||
# 获得连接并创建游标
|
||
conn = pymysql.connect(
|
||
host=Config.BI_CONN_host,
|
||
database=Config.BI_CONN_INFO_database,
|
||
user=Config.BI_CONN_INFO_user,
|
||
password=Config.BI_CONN_INFO_password,
|
||
# charset='utf8mb4', # 设置字符集以避免编码问题
|
||
# cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
|
||
)
|
||
cursor = conn.cursor()
|
||
|
||
# 获取指定天数前的日期
|
||
# now_time = datetime.now()
|
||
# target_time = now_time + timedelta(days=-days_back)
|
||
|
||
# SQL 查询语句
|
||
sql = f"""
|
||
SELECT * FROM jiandaoyun_dailyvisit_privateminiprogram;
|
||
"""
|
||
|
||
# 执行查询并获取结果
|
||
cursor.execute(sql)
|
||
rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表
|
||
|
||
# 将结果转换为 DataFrame
|
||
if rows: # 如果有数据
|
||
data_SY = pd.DataFrame(rows)
|
||
else: # 如果没有数据,返回空 DataFrame
|
||
data_SY = pd.DataFrame()
|
||
# 关闭游标
|
||
cursor.close()
|
||
headers = [
|
||
"公司ID", "公众号名称", "小程序模板类型", "是否开通微信商城", "上架商品数",
|
||
"上架套餐卡数", "上架实体商品数", "上架项目数", "上架套餐数", "微信公众号粉丝数",
|
||
"微信公众号绑定手机粉丝数", "是否开通微信小程序", "微信小程序绑定用户数", "微信小程序绑定手机用户量",
|
||
"微信订单数", "微信支付订单量", "线下订单数"
|
||
]
|
||
|
||
data_SY.columns = headers
|
||
|
||
return data_SY
|
||
|
||
except Exception as e:
|
||
error_task_logger.error(f"获取私域小程序数据时出错: {e}")
|
||
return None
|
||
|
||
def get_commission_details(self, ):
|
||
"""
|
||
从f6operation_data_relay数据库中获取小六提成数据。
|
||
返回pandas DataFrame。
|
||
"""
|
||
|
||
try:
|
||
# 获得连接并创建游标
|
||
conn = pymysql.connect(
|
||
host=Config.BI_CONN_host,
|
||
database=Config.BI_CONN_INFO_database,
|
||
user=Config.BI_CONN_INFO_user,
|
||
password=Config.BI_CONN_INFO_password,
|
||
# charset='utf8mb4', # 设置字符集以避免编码问题
|
||
# cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
|
||
)
|
||
cursor = conn.cursor()
|
||
|
||
# 获取指定天数前的日期
|
||
# now_time = datetime.now()
|
||
# target_time = now_time + timedelta(days=-days_back)
|
||
|
||
# SQL 查询语句
|
||
sql = f"""
|
||
SELECT * FROM JianDaoYun_DailyVisit_Commission;
|
||
"""
|
||
|
||
# 执行查询并获取结果
|
||
cursor.execute(sql)
|
||
rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表
|
||
|
||
# 将结果转换为 DataFrame
|
||
if rows: # 如果有数据
|
||
data_commission = pd.DataFrame(rows)
|
||
else: # 如果没有数据,返回空 DataFrame
|
||
data_commission = pd.DataFrame()
|
||
# 关闭游标
|
||
cursor.close()
|
||
headers = [
|
||
"门店id", "提成类型_二级分类", "提成基数(本月)", "提成基数(上月)", "公司id", "门店编码", "门店名称"
|
||
]
|
||
|
||
data_commission.columns = headers
|
||
|
||
return data_commission
|
||
|
||
except Exception as e:
|
||
error_task_logger.error(f"获取小六提成数据时出错: {e}")
|
||
return None
|
||
|
||
def get_differentindustries_details(self, ):
|
||
"""
|
||
从f6operation_data_relay数据库中获取异业合作数据。
|
||
返回pandas DataFrame。
|
||
"""
|
||
|
||
try:
|
||
# 获得连接并创建游标
|
||
conn = pymysql.connect(
|
||
host=Config.BI_CONN_host,
|
||
database=Config.BI_CONN_INFO_database,
|
||
user=Config.BI_CONN_INFO_user,
|
||
password=Config.BI_CONN_INFO_password,
|
||
# charset='utf8mb4', # 设置字符集以避免编码问题
|
||
# cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
|
||
)
|
||
cursor = conn.cursor()
|
||
|
||
# 获取指定天数前的日期
|
||
# now_time = datetime.now()
|
||
# target_time = now_time + timedelta(days=-days_back)
|
||
|
||
# SQL 查询语句
|
||
sql = f"""
|
||
SELECT * FROM JianDaoYun_DailyVisit_DifferentIndustries;
|
||
"""
|
||
|
||
# 执行查询并获取结果
|
||
cursor.execute(sql)
|
||
rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表
|
||
|
||
# 将结果转换为 DataFrame
|
||
if rows: # 如果有数据
|
||
data_commission = pd.DataFrame(rows)
|
||
else: # 如果没有数据,返回空 DataFrame
|
||
data_commission = pd.DataFrame()
|
||
# 关闭游标
|
||
cursor.close()
|
||
headers = [
|
||
"门店id", "商品名称", "服务期结束时间", "门店名称", "公司id", "门店编码"
|
||
]
|
||
|
||
data_commission.columns = headers
|
||
|
||
return data_commission
|
||
|
||
except Exception as e:
|
||
error_task_logger.error(f"获取异业合作数据时出错: {e}")
|
||
return None
|
||
|
||
def get_perforamnce_details(self, ):
|
||
"""
|
||
从f6operation_data_relay数据库中获取履约表数据
|
||
返回pandas DataFrame。
|
||
"""
|
||
|
||
try:
|
||
# 获得连接并创建游标
|
||
conn = pymysql.connect(
|
||
host=Config.BI_CONN_host,
|
||
database=Config.BI_CONN_INFO_database,
|
||
user=Config.BI_CONN_INFO_user,
|
||
password=Config.BI_CONN_INFO_password,
|
||
# charset='utf8mb4', # 设置字符集以避免编码问题
|
||
# cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
|
||
)
|
||
cursor = conn.cursor()
|
||
|
||
# SQL 查询语句
|
||
sql = f"""
|
||
SELECT * FROM SAAS_Fulfillment_Details;
|
||
"""
|
||
|
||
# 执行查询并获取结果
|
||
cursor.execute(sql)
|
||
rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表
|
||
|
||
# 将结果转换为 DataFrame
|
||
if rows: # 如果有数据
|
||
data_commission = pd.DataFrame(rows)
|
||
else: # 如果没有数据,返回空 DataFrame
|
||
data_commission = pd.DataFrame()
|
||
# 关闭游标
|
||
cursor.close()
|
||
headers = ["订单id", "f6订单编号", "宜搭的实例id", "商品id", "商品名称", "发布商品类型", "发布商品类型描述",
|
||
"门店id", "门店名称", "门店编码", "运营负责人", "区域经理", "商户中心id", "saas开户时间",
|
||
"公司id", "公司名称", "产生来源", "产生来源描述", "类型", "类型描述", "服务年份",
|
||
"订单服务期第几年", "服务期起始时间", "服务期结束时间", "提成业务类型", "提成类别",
|
||
"下单支付成功时间", "实付金额", "系统成本价", "版本费", "服务费", "介绍人员工ID",
|
||
"介绍业绩归属人员工ID", "处理人ID employee_id", "业绩归属人员工ID", "操作时间",
|
||
"处理人是否跟进,0: 未跟进,1: 已跟进", "满意度评分", "评价完成时间", "介绍人用户类型",
|
||
"下单支付成功日期", "培训完成时间", "订单所处阶段", "日分区"]
|
||
data_commission.columns = headers
|
||
|
||
float_columns = ["实付金额", "系统成本价", "版本费", "服务费"]
|
||
for col in float_columns:
|
||
data_commission[col] = data_commission[col].astype(float)
|
||
|
||
return data_commission
|
||
|
||
except Exception as e:
|
||
error_task_logger.error(f"获取履约表数据时出错: {e}")
|
||
return None
|
||
|
||
|
||
def get_GroupNotification_details(self, ):
|
||
"""
|
||
从f6operation_data_relay数据库中获取短信数据支撑数据。
|
||
返回pandas DataFrame。
|
||
"""
|
||
|
||
try:
|
||
# 获得连接并创建游标
|
||
conn = pymysql.connect(
|
||
host=Config.BI_CONN_host,
|
||
database=Config.BI_CONN_INFO_database,
|
||
user=Config.BI_CONN_INFO_user,
|
||
password=Config.BI_CONN_INFO_password,
|
||
# charset='utf8mb4', # 设置字符集以避免编码问题
|
||
# cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
|
||
)
|
||
cursor = conn.cursor()
|
||
|
||
# 获取指定天数前的日期
|
||
# now_time = datetime.now()
|
||
# target_time = now_time + timedelta(days=-days_back)
|
||
|
||
# SQL 查询语句
|
||
sql = f"""
|
||
SELECT * FROM jiandaoyun_dailyvisit_groupnotification;
|
||
"""
|
||
|
||
# 执行查询并获取结果
|
||
cursor.execute(sql)
|
||
rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表
|
||
|
||
# 将结果转换为 DataFrame
|
||
if rows: # 如果有数据
|
||
data_commission = pd.DataFrame(rows)
|
||
else: # 如果没有数据,返回空 DataFrame
|
||
data_commission = pd.DataFrame()
|
||
# 关闭游标
|
||
cursor.close()
|
||
headers = [
|
||
"公司id", "是否启动短信功能", "是否购买短信包", "累计购买总数", "累计发送成功总人数", "剩余短信条数",
|
||
"实付总金额", "短信剩余量是否小于20%", "第一次短信购买时间", "最近一次短信购买时间"
|
||
]
|
||
|
||
data_commission.columns = headers
|
||
|
||
return data_commission
|
||
|
||
except Exception as e:
|
||
error_task_logger.error(f"获取短信数据支撑数据时出错: {e}")
|
||
return None
|
||
|
||
from datetime import datetime, timedelta, UTC, timezone
|
||
|
||
def send_task_status(self, task_start_time: str, task_name: str) -> None:
|
||
"""
|
||
将任务状态发送到简道云(开始时间为北京时间,需转换到 UTC)
|
||
:param task_start_time: 任务开始时间(字符串格式:"%Y-%m-%d %H:%M:%S",表示北京时间 UTC+8)
|
||
:param task_name: 任务名称
|
||
"""
|
||
try:
|
||
# 1. 获取当前 UTC 时间(时区感知对象)
|
||
end_time_utc = datetime.now(UTC) # ✅ 替代 utcnow()
|
||
|
||
# 2. 解析传入的北京时间(UTC+8)
|
||
task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S")
|
||
|
||
# 3. 转换为 UTC 时间(减去 8 小时,并附加 UTC 时区)
|
||
task_start_utc = task_start_naive - timedelta(hours=8)
|
||
task_start_utc = task_start_utc.replace(tzinfo=timezone.utc) # 显式标记为 UTC
|
||
|
||
# 4. 计算运行时间(时区感知对象可直接相减)
|
||
run_time = end_time_utc - task_start_utc
|
||
run_time_sec = int(run_time.total_seconds())
|
||
|
||
# 5. 格式化时间为 UTC 的 ISO 8601 格式(带 "Z")
|
||
today_utc = end_time_utc.strftime("%Y-%m-%d")
|
||
task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
|
||
# 6. 构造请求数据(所有时间以 UTC 格式发送)
|
||
payload = {
|
||
"api_key": Config.SCHEDULED_TASKS_APP_ID,
|
||
"entry_id": Config.JDY_TASKS_ENTRY_ID,
|
||
"data": {
|
||
"_widget_1744873387500": {"value": today_utc}, # UTC 日期
|
||
"_widget_1743644977694": {"value": task_name},
|
||
"_widget_1744873387501": {"value": task_start_iso}, # UTC 开始时间
|
||
"_widget_1744873387502": {"value": task_end_iso}, # UTC 结束时间
|
||
"_widget_1744873387504": {"value": run_time_sec},
|
||
}
|
||
}
|
||
|
||
# 7. 发送请求
|
||
response = api_instance.data_batch_create(payload)
|
||
logger.info(f"任务状态发送成功: {response}")
|
||
|
||
except Exception as e:
|
||
error_task_logger.error(f"任务状态发送失败: {e}")
|
||
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
CommonModule.send_task_error(task_start_time, "发送任务状态", e)
|
||
|
||
def send_task_error(self, task_start_time: str, task_name: str, error_message: str) -> None:
|
||
"""
|
||
将任务失败情况发送到简道云(影响业务数据时调用)
|
||
:param task_start_time: 任务开始时间(字符串格式:"%Y-%m-%d %H:%M:%S",表示北京时间 UTC+8)
|
||
:param task_name: 任务名称
|
||
:param error_message: 失败详情
|
||
"""
|
||
try:
|
||
# 1. 获取当前 UTC 时间(时区感知对象)
|
||
end_time_utc = datetime.now(UTC) # ✅ 替代 utcnow()
|
||
|
||
# 2. 解析传入的北京时间(UTC+8)
|
||
task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S")
|
||
|
||
# 3. 转换为 UTC 时间(减去 8 小时,并附加 UTC 时区)
|
||
task_start_utc = task_start_naive - timedelta(hours=8)
|
||
task_start_utc = task_start_utc.replace(tzinfo=timezone.utc) # 显式标记为 UTC
|
||
|
||
# 4. 计算运行时间(时区感知对象可直接相减)
|
||
run_time = end_time_utc - task_start_utc
|
||
run_time_sec = int(run_time.total_seconds())
|
||
|
||
# 5. 格式化时间为 UTC 的 ISO 8601 格式(带 "Z")
|
||
today_utc = end_time_utc.strftime("%Y-%m-%d")
|
||
task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
|
||
# 6. 构造请求数据(所有时间以 UTC 格式发送)
|
||
payload = {
|
||
"api_key": Config.SCHEDULED_TASKS_APP_ID,
|
||
"entry_id": Config.JDY_TASKS_ERROR_ENTRY_ID,
|
||
"data": {
|
||
"_widget_1744873387500": {"value": today_utc}, # UTC 日期
|
||
"_widget_1743644977694": {"value": task_name},
|
||
"_widget_1744873387501": {"value": task_start_iso}, # UTC 开始时间
|
||
"_widget_1744873387502": {"value": task_end_iso}, # UTC 结束时间
|
||
"_widget_1744873387504": {"value": run_time_sec},
|
||
"_widget_1754981992215": {"value": error_message}, # 错误信息
|
||
}
|
||
}
|
||
|
||
# 7. 发送请求
|
||
response = api_instance.data_batch_create(payload)
|
||
logger.info(f"任务错误发生成功: {response}")
|
||
|
||
except Exception as e:
|
||
error_task_logger.error(f"任务错误发送失败: {e}")
|