from datetime import datetime, timezone, timedelta, date, UTC import holidays from config import Config import psycopg2 import pandas as pd import pymysql from api import API from log_config import configure_task_logger, configure_error_task_logger api_instance = API() # 获取已经配置好的常规日志记录器 logger = configure_task_logger() # 获取已经配置好的错误任务日志记录器 error_task_logger = configure_error_task_logger() class CommonModule: """工具类""" def __init__(self): # 初始化中国节假日对象 self.cn_holidays = holidays.country_holidays('CN', years=2025) # 创建一个存储日期的集合,用于去重 self.date_set = set() self.conn = Config.CONN_INFO def time_to_UTC(self, time_input): """ 时间转为UTC格式(兼容字符串和Timestamp输入) :param time_input: 支持以下格式: - 字符串格式:"2025-06-05 14:30:00" 或 "2025-06-05" - Pandas Timestamp对象 :return: ISO8601格式的UTC时间字符串(如"2025-06-05T06:30:00Z") """ # 处理Timestamp输入 if isinstance(time_input, pd.Timestamp): dt = time_input.to_pydatetime() # 处理字符串输入 else: try: dt = datetime.strptime(time_input, "%Y-%m-%d %H:%M:%S") except ValueError: try: dt = datetime.strptime(time_input, "%Y-%m-%d") except ValueError as e: raise ValueError(f"不支持的日期格式: {time_input}") from e # 转换为UTC时间(假设原始时间为东八区) dt_utc = dt - timedelta(hours=8) return dt_utc.replace(tzinfo=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def get_holiday_list(self, year=None): """ 获取指定年份的节假日和周日列表。 如果没有提供年份,则使用当前年份。 """ if year is None: year = date.today().year # 清空集合,防止重复添加数据 self.date_set.clear() # 添加节假日到集合 for holiday in self.cn_holidays: if holiday.year == year: self.date_set.add(str(holiday)) # 添加该年的所有周日到集合 start_date = date(year, 1, 1) end_date = date(year, 12, 31) current_date = start_date while current_date <= end_date: if current_date.weekday() == 6: # 周日的weekday值为6 self.date_set.add(str(current_date)) current_date += timedelta(days=1) # 将集合转换为排序后的列表 return sorted(list(self.date_set), reverse=False) def get_ngv_details(self, days_back=1): """ 从固定的数据库中获取前几天的NGV明细。 参数 `days_back` 表示相对于今天的天数偏移量,默认为1(即前一天)。 返回包含NGV明细的pandas DataFrame。 """ try: # 获得连接 conn = psycopg2.connect(**self.conn) cursor = conn.cursor() # 获取指定天数前的日期 now_time = datetime.now() target_time = now_time + timedelta(days=-days_back) target_date_id = int(target_time.strftime('%Y%m%d')) # 获取目标日期 # sql语句查询 sql = f""" SELECT * FROM "public"."holo_ads_report_saas_profile_ngv_detail_d" WHERE "date_id" = '{target_date_id}' ; """ # 执行语句并获取结果集 cursor.execute(sql) rows = cursor.fetchall() all_fields = cursor.description # 执行结果转化为dataframe col = [i[0] for i in all_fields] data_NGV = pd.DataFrame(rows, columns=col) # 尝试自动解析日期时间字符串 time_format = "%Y-%m-%d %H:%M:%S" if 'saas_create_time' in data_NGV.columns: data_NGV['saas_create_time'] = pd.to_datetime(data_NGV['saas_create_time'], format=time_format, errors='coerce') data_NGV['saas_create_time'] = data_NGV['saas_create_time'].dt.strftime('%Y-%m-%d') # 关闭游标和连接 cursor.close() conn.close() return data_NGV except Exception as e: error_task_logger.error(f"获取NGV明细失败: {e}") return None def get_yichang_details(self, days_back=1): """ 从固定的数据库中获取前几天的异常明细。 参数 `days_back` 表示相对于今天的天数偏移量,默认为1(即前一天)。 返回包含异常明细的pandas DataFrame。 """ try: # 获得连接 conn = psycopg2.connect(**self.conn) cursor = conn.cursor() # 获取指定天数前的日期 now_time = datetime.now() target_time = now_time + timedelta(days=-days_back) target_date_id = int(target_time.strftime('%Y%m%d')) # 获取目标日期 # sql语句查询 sql = f"""-- SELECT * FROM "public"."holo_ads_dataservice_saas_org_health_warning" WHERE "pt" = '{target_date_id}' and "org_type" = '一般';""" # 执行语句并获取结果集 cursor.execute(sql) rows = cursor.fetchall() all_fields = cursor.description # 执行结果转化为dataframe col = [i[0] for i in all_fields] data_yichang = pd.DataFrame(rows, columns=col) # 尝试自动解析日期时间字符串 time_format = "%Y-%m-%d %H:%M:%S" if 'saas_create_time' in data_yichang.columns: data_yichang['saas_create_time'] = pd.to_datetime(data_yichang['saas_create_time'], format=time_format, errors='coerce') data_yichang['saas_create_time'] = data_yichang['saas_create_time'].dt.strftime('%Y-%m-%d') # 关闭游标和连接 cursor.close() conn.close() return data_yichang except Exception as e: error_task_logger.error(f"获取异常明细时出错: {e}") return None def get_renewal_details(self, ): """ 从固定的数据库中获取续约待办数据 """ try: # 获得连接 conn = psycopg2.connect(**self.conn) cursor = conn.cursor() # 获取指定天数前的日期 now_time = datetime.now() yes_time = now_time + timedelta(days=-2) yes_time_nyr = int(yes_time.strftime('%Y%m%d')) # 获取前两天日期 # 获取指定天数前的日期 today = date.today() days_to_add = 120 future_date = str(today + timedelta(days=days_to_add)) print("距离今天还有{}天的日期是:{}".format(days_to_add, future_date)) sql = f"""SELECT * FROM "public"."holo_ads_report_saas_profile_ngv_detail_d" WHERE "date_id" = '{yes_time_nyr}' and "expiry_time" like '%{future_date}%';""" # 执行语句并获取结果集 cursor.execute(sql) rows = cursor.fetchall() all_fields = cursor.description # 获取所有字段名 # 执行结果转化为dataframe col = [i[0] for i in all_fields] data_NGV = pd.DataFrame(list(rows), columns=col) # 关闭数据库连接 cursor.close() conn.close() return data_NGV except Exception as e: error_task_logger.error(f"获取续约待办数据时出错: {e}") return None def get_jcb_details(self, ): """ 从固定的数据库中获取前几天的NGV明细。 参数 `days_back` 表示相对于今天的天数偏移量,默认为1(即前一天)。 返回包含NGV明细的pandas DataFrame。 """ # 保存为CSV文件 output_dir = "output" # 设置输出目录 # 创建输出目录(如果不存在) import os os.makedirs(output_dir, exist_ok=True) try: # 获得连接并创建游标 conn = pymysql.connect( host=Config.BI_CONN_host, database=Config.BI_CONN_INFO_database, user=Config.BI_CONN_INFO_user, password=Config.BI_CONN_INFO_password, # charset='utf8mb4', # 设置字符集以避免编码问题 # cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果 ) cursor = conn.cursor() # 获取指定天数前的日期 # now_time = datetime.now() # target_time = now_time + timedelta(days=-days_back) target_date_id = "接车宝" # 获取目标日期 # SQL 查询语句 sql = f""" SELECT * FROM jdy_hs_holo_dws_sales_magic_box_ngv_d WHERE 产品名称 = '{target_date_id}'; """ # 执行查询并获取结果 cursor.execute(sql) rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表 # 将结果转换为 DataFrame if rows: # 如果有数据 data_NGV = pd.DataFrame(rows) if data_NGV.shape[1] > 16: data_NGV = data_NGV.iloc[:, :16] else: # 如果没有数据,返回空 DataFrame data_NGV = pd.DataFrame() # 尝试自动解析日期时间字符串 time_format = "%Y-%m-%d %H:%M:%S" if 'saas_create_time' in data_NGV.columns: data_NGV['saas_create_time'] = pd.to_datetime(data_NGV['saas_create_time'], format=time_format, errors='coerce') data_NGV['saas_create_time'] = data_NGV['saas_create_time'].dt.strftime('%Y-%m-%d') # 关闭游标 cursor.close() headers = [ "日期", "产品名称", "账号", # 唯一值 "联系手机号", "开户日", "使用时长", "续约日", "到期日", "客户状态", "近一周开单量", "近一周是否活跃", "G状态:近30天开单大于等于10天", "当月开单天数", "近30天开单天数", "当月G天数", "日分区", ] data_NGV.columns = headers cols_to_str = ["日期", "开户日", "续约日", "到期日"] data_NGV[cols_to_str] = data_NGV[cols_to_str].apply(lambda x: x.astype(str)) data_NGV.to_csv(os.path.join(output_dir, f"{target_date_id}.csv"), index=False) return data_NGV except Exception as e: error_task_logger.error(f"获取借车宝NGV明细时出错: {e}") return None def get_syxcx_details(self, ): """ 从f6operation_data_relay数据库中获取私域小程序数据。 返回pandas DataFrame。 """ try: # 获得连接并创建游标 conn = pymysql.connect( host=Config.BI_CONN_host, database=Config.BI_CONN_INFO_database, user=Config.BI_CONN_INFO_user, password=Config.BI_CONN_INFO_password, # charset='utf8mb4', # 设置字符集以避免编码问题 # cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果 ) cursor = conn.cursor() # 获取指定天数前的日期 # now_time = datetime.now() # target_time = now_time + timedelta(days=-days_back) # SQL 查询语句 sql = f""" SELECT * FROM jiandaoyun_dailyvisit_privateminiprogram; """ # 执行查询并获取结果 cursor.execute(sql) rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表 # 将结果转换为 DataFrame if rows: # 如果有数据 data_SY = pd.DataFrame(rows) else: # 如果没有数据,返回空 DataFrame data_SY = pd.DataFrame() # 关闭游标 cursor.close() headers = [ "公司ID", "公众号名称", "小程序模板类型", "是否开通微信商城", "上架商品数", "上架套餐卡数", "上架实体商品数", "上架项目数", "上架套餐数", "微信公众号粉丝数", "微信公众号绑定手机粉丝数", "是否开通微信小程序", "微信小程序绑定用户数", "微信小程序绑定手机用户量", "微信订单数", "微信支付订单量", "线下订单数" ] data_SY.columns = headers return data_SY except Exception as e: error_task_logger.error(f"获取私域小程序数据时出错: {e}") return None def get_commission_details(self, ): """ 从f6operation_data_relay数据库中获取小六提成数据。 返回pandas DataFrame。 """ try: # 获得连接并创建游标 conn = pymysql.connect( host=Config.BI_CONN_host, database=Config.BI_CONN_INFO_database, user=Config.BI_CONN_INFO_user, password=Config.BI_CONN_INFO_password, # charset='utf8mb4', # 设置字符集以避免编码问题 # cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果 ) cursor = conn.cursor() # 获取指定天数前的日期 # now_time = datetime.now() # target_time = now_time + timedelta(days=-days_back) # SQL 查询语句 sql = f""" SELECT * FROM JianDaoYun_DailyVisit_Commission; """ # 执行查询并获取结果 cursor.execute(sql) rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表 # 将结果转换为 DataFrame if rows: # 如果有数据 data_commission = pd.DataFrame(rows) else: # 如果没有数据,返回空 DataFrame data_commission = pd.DataFrame() # 关闭游标 cursor.close() headers = [ "门店id", "提成类型_二级分类", "提成基数(本月)", "提成基数(上月)", "公司id", "门店编码", "门店名称" ] data_commission.columns = headers return data_commission except Exception as e: error_task_logger.error(f"获取小六提成数据时出错: {e}") return None def get_differentindustries_details(self, ): """ 从f6operation_data_relay数据库中获取异业合作数据。 返回pandas DataFrame。 """ try: # 获得连接并创建游标 conn = pymysql.connect( host=Config.BI_CONN_host, database=Config.BI_CONN_INFO_database, user=Config.BI_CONN_INFO_user, password=Config.BI_CONN_INFO_password, # charset='utf8mb4', # 设置字符集以避免编码问题 # cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果 ) cursor = conn.cursor() # 获取指定天数前的日期 # now_time = datetime.now() # target_time = now_time + timedelta(days=-days_back) # SQL 查询语句 sql = f""" SELECT * FROM JianDaoYun_DailyVisit_DifferentIndustries; """ # 执行查询并获取结果 cursor.execute(sql) rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表 # 将结果转换为 DataFrame if rows: # 如果有数据 data_commission = pd.DataFrame(rows) else: # 如果没有数据,返回空 DataFrame data_commission = pd.DataFrame() # 关闭游标 cursor.close() headers = [ "门店id", "商品名称", "服务期结束时间", "门店名称", "公司id", "门店编码" ] data_commission.columns = headers return data_commission except Exception as e: error_task_logger.error(f"获取异业合作数据时出错: {e}") return None def get_perforamnce_details(self, ): """ 从f6operation_data_relay数据库中获取履约表数据 返回pandas DataFrame。 """ try: # 获得连接并创建游标 conn = pymysql.connect( host=Config.BI_CONN_host, database=Config.BI_CONN_INFO_database, user=Config.BI_CONN_INFO_user, password=Config.BI_CONN_INFO_password, # charset='utf8mb4', # 设置字符集以避免编码问题 # cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果 ) cursor = conn.cursor() # SQL 查询语句 sql = f""" SELECT * FROM SAAS_Fulfillment_Details; """ # 执行查询并获取结果 cursor.execute(sql) rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表 # 将结果转换为 DataFrame if rows: # 如果有数据 data_commission = pd.DataFrame(rows) else: # 如果没有数据,返回空 DataFrame data_commission = pd.DataFrame() # 关闭游标 cursor.close() headers = ["订单id", "f6订单编号", "宜搭的实例id", "商品id", "商品名称", "发布商品类型", "发布商品类型描述", "门店id", "门店名称", "门店编码", "运营负责人", "区域经理", "商户中心id", "saas开户时间", "公司id", "公司名称", "产生来源", "产生来源描述", "类型", "类型描述", "服务年份", "订单服务期第几年", "服务期起始时间", "服务期结束时间", "提成业务类型", "提成类别", "下单支付成功时间", "实付金额", "系统成本价", "版本费", "服务费", "介绍人员工ID", "介绍业绩归属人员工ID", "处理人ID employee_id", "业绩归属人员工ID", "操作时间", "处理人是否跟进,0: 未跟进,1: 已跟进", "满意度评分", "评价完成时间", "介绍人用户类型", "下单支付成功日期", "培训完成时间", "订单所处阶段", "日分区"] data_commission.columns = headers float_columns = ["实付金额", "系统成本价", "版本费", "服务费"] for col in float_columns: data_commission[col] = data_commission[col].astype(float) return data_commission except Exception as e: error_task_logger.error(f"获取履约表数据时出错: {e}") return None def get_GroupNotification_details(self, ): """ 从f6operation_data_relay数据库中获取短信数据支撑数据。 返回pandas DataFrame。 """ try: # 获得连接并创建游标 conn = pymysql.connect( host=Config.BI_CONN_host, database=Config.BI_CONN_INFO_database, user=Config.BI_CONN_INFO_user, password=Config.BI_CONN_INFO_password, # charset='utf8mb4', # 设置字符集以避免编码问题 # cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果 ) cursor = conn.cursor() # 获取指定天数前的日期 # now_time = datetime.now() # target_time = now_time + timedelta(days=-days_back) # SQL 查询语句 sql = f""" SELECT * FROM jiandaoyun_dailyvisit_groupnotification; """ # 执行查询并获取结果 cursor.execute(sql) rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表 # 将结果转换为 DataFrame if rows: # 如果有数据 data_commission = pd.DataFrame(rows) else: # 如果没有数据,返回空 DataFrame data_commission = pd.DataFrame() # 关闭游标 cursor.close() headers = [ "公司id", "是否启动短信功能", "是否购买短信包", "累计购买总数", "累计发送成功总人数", "剩余短信条数", "实付总金额", "短信剩余量是否小于20%", "第一次短信购买时间", "最近一次短信购买时间" ] data_commission.columns = headers return data_commission except Exception as e: error_task_logger.error(f"获取短信数据支撑数据时出错: {e}") return None from datetime import datetime, timedelta, UTC, timezone def send_task_status(self, task_start_time: str, task_name: str) -> None: """ 将任务状态发送到简道云(开始时间为北京时间,需转换到 UTC) :param task_start_time: 任务开始时间(字符串格式:"%Y-%m-%d %H:%M:%S",表示北京时间 UTC+8) :param task_name: 任务名称 """ try: # 1. 获取当前 UTC 时间(时区感知对象) end_time_utc = datetime.now(UTC) # ✅ 替代 utcnow() # 2. 解析传入的北京时间(UTC+8) task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S") # 3. 转换为 UTC 时间(减去 8 小时,并附加 UTC 时区) task_start_utc = task_start_naive - timedelta(hours=8) task_start_utc = task_start_utc.replace(tzinfo=timezone.utc) # 显式标记为 UTC # 4. 计算运行时间(时区感知对象可直接相减) run_time = end_time_utc - task_start_utc run_time_sec = int(run_time.total_seconds()) # 5. 格式化时间为 UTC 的 ISO 8601 格式(带 "Z") today_utc = end_time_utc.strftime("%Y-%m-%d") task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ") task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ") # 6. 构造请求数据(所有时间以 UTC 格式发送) payload = { "api_key": Config.SCHEDULED_TASKS_APP_ID, "entry_id": Config.JDY_TASKS_ENTRY_ID, "data": { "_widget_1744873387500": {"value": today_utc}, # UTC 日期 "_widget_1743644977694": {"value": task_name}, "_widget_1744873387501": {"value": task_start_iso}, # UTC 开始时间 "_widget_1744873387502": {"value": task_end_iso}, # UTC 结束时间 "_widget_1744873387504": {"value": run_time_sec}, } } # 7. 发送请求 response = api_instance.data_batch_create(payload) logger.info(f"任务状态发送成功: {response}") except Exception as e: error_task_logger.error(f"任务状态发送失败: {e}") task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") CommonModule.send_task_error(task_start_time, "发送任务状态", e) def send_task_error(self, task_start_time: str, task_name: str, error_message: str) -> None: """ 将任务失败情况发送到简道云(影响业务数据时调用) :param task_start_time: 任务开始时间(字符串格式:"%Y-%m-%d %H:%M:%S",表示北京时间 UTC+8) :param task_name: 任务名称 :param error_message: 失败详情 """ try: # 1. 获取当前 UTC 时间(时区感知对象) end_time_utc = datetime.now(UTC) # ✅ 替代 utcnow() # 2. 解析传入的北京时间(UTC+8) task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S") # 3. 转换为 UTC 时间(减去 8 小时,并附加 UTC 时区) task_start_utc = task_start_naive - timedelta(hours=8) task_start_utc = task_start_utc.replace(tzinfo=timezone.utc) # 显式标记为 UTC # 4. 计算运行时间(时区感知对象可直接相减) run_time = end_time_utc - task_start_utc run_time_sec = int(run_time.total_seconds()) # 5. 格式化时间为 UTC 的 ISO 8601 格式(带 "Z") today_utc = end_time_utc.strftime("%Y-%m-%d") task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ") task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ") # 6. 构造请求数据(所有时间以 UTC 格式发送) payload = { "api_key": Config.SCHEDULED_TASKS_APP_ID, "entry_id": Config.JDY_TASKS_ERROR_ENTRY_ID, "data": { "_widget_1744873387500": {"value": today_utc}, # UTC 日期 "_widget_1743644977694": {"value": task_name}, "_widget_1744873387501": {"value": task_start_iso}, # UTC 开始时间 "_widget_1744873387502": {"value": task_end_iso}, # UTC 结束时间 "_widget_1744873387504": {"value": run_time_sec}, "_widget_1754981992215": {"value": error_message}, # 错误信息 } } # 7. 发送请求 response = api_instance.data_batch_create(payload) logger.info(f"任务错误发生成功: {response}") except Exception as e: error_task_logger.error(f"任务错误发送失败: {e}")