Files
saas/back_ground_module/common_module.py
T

682 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime, timezone, timedelta, date, UTC
import holidays
from config import Config
import psycopg2
import pandas as pd
import pymysql
from api import API
from log_config import configure_task_logger, configure_error_task_logger
import time
api_instance = API()
# 获取已经配置好的常规日志记录器
logger = configure_task_logger()
# 获取已经配置好的错误任务日志记录器
error_task_logger = configure_error_task_logger()
class CommonModule:
"""工具类"""
def __init__(self):
# 初始化中国节假日对象
self.cn_holidays = holidays.country_holidays('CN', years=2025)
# 创建一个存储日期的集合,用于去重
self.date_set = set()
self.conn = Config.CONN_INFO
def time_to_UTC(self, time_input):
"""
时间转为UTC格式(兼容字符串和Timestamp输入)
:param time_input: 支持以下格式:
- 字符串格式:"2025-06-05 14:30:00""2025-06-05"
- Pandas Timestamp对象
:return: ISO8601格式的UTC时间字符串(如"2025-06-05T06:30:00Z"
"""
# 处理Timestamp输入
if isinstance(time_input, pd.Timestamp):
dt = time_input.to_pydatetime()
# 处理字符串输入
else:
try:
dt = datetime.strptime(time_input, "%Y-%m-%d %H:%M:%S")
except ValueError:
try:
dt = datetime.strptime(time_input, "%Y-%m-%d")
except ValueError as e:
raise ValueError(f"不支持的日期格式: {time_input}") from e
# 转换为UTC时间(假设原始时间为东八区)
dt_utc = dt - timedelta(hours=8)
return dt_utc.replace(tzinfo=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def get_holiday_list(self, year=None):
"""
获取指定年份的节假日和周日列表。
如果没有提供年份,则使用当前年份。
"""
if year is None:
year = date.today().year
# 清空集合,防止重复添加数据
self.date_set.clear()
# 添加节假日到集合
for holiday in self.cn_holidays:
if holiday.year == year:
self.date_set.add(str(holiday))
# 添加该年的所有周日到集合
start_date = date(year, 1, 1)
end_date = date(year, 12, 31)
current_date = start_date
while current_date <= end_date:
if current_date.weekday() == 6: # 周日的weekday值为6
self.date_set.add(str(current_date))
current_date += timedelta(days=1)
# 将集合转换为排序后的列表
return sorted(list(self.date_set), reverse=False)
def get_ngv_details(self, days_back=1):
"""
从固定的数据库中获取前几天的NGV明细。
参数 `days_back` 表示相对于今天的天数偏移量,默认为1(即前一天)。
返回包含NGV明细的pandas DataFrame。
"""
try:
# 获得连接
conn = psycopg2.connect(**self.conn)
cursor = conn.cursor()
# 获取指定天数前的日期
now_time = datetime.now()
target_time = now_time + timedelta(days=-days_back)
target_date_id = int(target_time.strftime('%Y%m%d')) # 获取目标日期
# sql语句查询
sql = f"""
SELECT * FROM "public"."holo_ads_report_saas_profile_ngv_detail_d" WHERE "date_id" = '{target_date_id}' ;
"""
# 执行语句并获取结果集
cursor.execute(sql)
rows = cursor.fetchall()
all_fields = cursor.description
# 执行结果转化为dataframe
col = [i[0] for i in all_fields]
data_NGV = pd.DataFrame(rows, columns=col)
# 尝试自动解析日期时间字符串
time_format = "%Y-%m-%d %H:%M:%S"
if 'saas_create_time' in data_NGV.columns:
data_NGV['saas_create_time'] = pd.to_datetime(data_NGV['saas_create_time'], format=time_format,
errors='coerce')
data_NGV['saas_create_time'] = data_NGV['saas_create_time'].dt.strftime('%Y-%m-%d')
# 关闭游标和连接
cursor.close()
conn.close()
return data_NGV
except Exception as e:
error_task_logger.error(f"获取NGV明细失败: {e}")
return None
def get_yichang_details(self, days_back=1):
"""
从固定的数据库中获取前几天的异常明细。
参数 `days_back` 表示相对于今天的天数偏移量,默认为1(即前一天)。
返回包含异常明细的pandas DataFrame。
"""
try:
# 获得连接
conn = psycopg2.connect(**self.conn)
cursor = conn.cursor()
# 获取指定天数前的日期
now_time = datetime.now()
target_time = now_time + timedelta(days=-days_back)
target_date_id = int(target_time.strftime('%Y%m%d')) # 获取目标日期
# sql语句查询
sql = f""" SELECT * FROM "public"."holo_ads_dataservice_saas_org_health_warning" WHERE "pt" = '{target_date_id}' and "org_type" = '一般';"""
# sql = f""" SELECT * FROM "public"."holo_ads_dataservice_saas_org_health_warning" """
# 执行语句并获取结果集
cursor.execute(sql)
rows = cursor.fetchall()
all_fields = cursor.description
# 执行结果转化为dataframe
col = [i[0] for i in all_fields]
data_yichang = pd.DataFrame(rows, columns=col)
# print(data_yichang.head(10))
# 尝试自动解析日期时间字符串
time_format = "%Y-%m-%d %H:%M:%S"
if 'saas_create_time' in data_yichang.columns:
data_yichang['saas_create_time'] = pd.to_datetime(data_yichang['saas_create_time'], format=time_format,
errors='coerce')
data_yichang['saas_create_time'] = data_yichang['saas_create_time'].dt.strftime('%Y-%m-%d')
# 关闭游标和连接
cursor.close()
conn.close()
return data_yichang
except Exception as e:
error_task_logger.error(f"获取异常明细时出错: {e}")
return None
def get_renewal_details(self, ):
"""
从固定的数据库中获取续约待办数据
"""
try:
# 获得连接
conn = psycopg2.connect(**self.conn)
cursor = conn.cursor()
# 获取指定天数前的日期
now_time = datetime.now()
yes_time = now_time + timedelta(days=-2)
yes_time_nyr = int(yes_time.strftime('%Y%m%d')) # 获取前两天日期
# 获取指定天数前的日期
today = date.today()
days_to_add = 120
future_date = str(today + timedelta(days=days_to_add))
print("距离今天还有{}天的日期是:{}".format(days_to_add, future_date))
sql = f"""SELECT * FROM "public"."holo_ads_report_saas_profile_ngv_detail_d" WHERE "date_id" = '{yes_time_nyr}' and "expiry_time" like '%{future_date}%';"""
# 执行语句并获取结果集
cursor.execute(sql)
rows = cursor.fetchall()
all_fields = cursor.description # 获取所有字段名
# 执行结果转化为dataframe
col = [i[0] for i in all_fields]
data_NGV = pd.DataFrame(list(rows), columns=col)
# 关闭数据库连接
cursor.close()
conn.close()
return data_NGV
except Exception as e:
error_task_logger.error(f"获取续约待办数据时出错: {e}")
return None
def get_jcb_details(self, ):
"""
从固定的数据库中获取前几天的借车宝。
参数 `days_back` 表示相对于今天的天数偏移量,默认为1(即前一天)。
返回包含NGV明细的pandas DataFrame。
"""
# 保存为CSV文件
output_dir = "output" # 设置输出目录
# 创建输出目录(如果不存在)
import os
os.makedirs(output_dir, exist_ok=True)
try:
# 获得连接并创建游标
conn = pymysql.connect(
host=Config.BI_CONN_host,
database=Config.BI_CONN_INFO_database,
user=Config.BI_CONN_INFO_user,
password=Config.BI_CONN_INFO_password,
# charset='utf8mb4', # 设置字符集以避免编码问题
# cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
)
cursor = conn.cursor()
# 获取指定天数前的日期
# now_time = datetime.now()
# target_time = now_time + timedelta(days=-days_back)
target_date_id = "接车宝" # 获取目标日期
# SQL 查询语句
sql = f"""
SELECT * FROM jdy_hs_holo_dws_sales_magic_box_ngv_d WHERE 产品名称 = '{target_date_id}';
"""
# 执行查询并获取结果
cursor.execute(sql)
rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表
# 将结果转换为 DataFrame
if rows: # 如果有数据
data_NGV = pd.DataFrame(rows)
if data_NGV.shape[1] > 16:
data_NGV = data_NGV.iloc[:, :16]
else: # 如果没有数据,返回空 DataFrame
data_NGV = pd.DataFrame()
# 尝试自动解析日期时间字符串
time_format = "%Y-%m-%d %H:%M:%S"
if 'saas_create_time' in data_NGV.columns:
data_NGV['saas_create_time'] = pd.to_datetime(data_NGV['saas_create_time'], format=time_format,
errors='coerce')
data_NGV['saas_create_time'] = data_NGV['saas_create_time'].dt.strftime('%Y-%m-%d')
# 关闭游标
cursor.close()
headers = [
"日期",
"产品名称",
"账号", # 唯一值
"联系手机号",
"开户日",
"使用时长",
"续约日",
"到期日",
"客户状态",
"近一周开单量",
"近一周是否活跃",
"G状态:近30天开单大于等于10天",
"当月开单天数",
"近30天开单天数",
"当月G天数",
"日分区",
]
data_NGV.columns = headers
cols_to_str = ["日期", "开户日", "续约日", "到期日"]
data_NGV[cols_to_str] = data_NGV[cols_to_str].apply(lambda x: x.astype(str))
data_NGV.to_csv(os.path.join(output_dir, f"{target_date_id}.csv"), index=False)
return data_NGV
except Exception as e:
error_task_logger.error(f"获取借车宝NGV明细时出错: {e}")
return None
def get_syxcx_details(self, ):
"""
从f6operation_data_relay数据库中获取私域小程序数据。
返回pandas DataFrame。
"""
try:
# 获得连接并创建游标
conn = pymysql.connect(
host=Config.BI_CONN_host,
database=Config.BI_CONN_INFO_database,
user=Config.BI_CONN_INFO_user,
password=Config.BI_CONN_INFO_password,
# charset='utf8mb4', # 设置字符集以避免编码问题
# cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
)
cursor = conn.cursor()
# 获取指定天数前的日期
# now_time = datetime.now()
# target_time = now_time + timedelta(days=-days_back)
# SQL 查询语句
sql = f"""
SELECT * FROM jiandaoyun_dailyvisit_privateminiprogram;
"""
# 执行查询并获取结果
cursor.execute(sql)
rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表
# 将结果转换为 DataFrame
if rows: # 如果有数据
data_SY = pd.DataFrame(rows)
else: # 如果没有数据,返回空 DataFrame
data_SY = pd.DataFrame()
# 关闭游标
cursor.close()
headers = [
"公司ID", "公众号名称", "小程序模板类型", "是否开通微信商城", "上架商品数",
"上架套餐卡数", "上架实体商品数", "上架项目数", "上架套餐数", "微信公众号粉丝数",
"微信公众号绑定手机粉丝数", "是否开通微信小程序", "微信小程序绑定用户数", "微信小程序绑定手机用户量",
"微信订单数", "微信支付订单量", "线下订单数"
]
data_SY.columns = headers
return data_SY
except Exception as e:
error_task_logger.error(f"获取私域小程序数据时出错: {e}")
return None
def get_commission_details(self, ):
"""
从f6operation_data_relay数据库中获取小六提成数据。
返回pandas DataFrame。
"""
try:
# 获得连接并创建游标
conn = pymysql.connect(
host=Config.BI_CONN_host,
database=Config.BI_CONN_INFO_database,
user=Config.BI_CONN_INFO_user,
password=Config.BI_CONN_INFO_password,
# charset='utf8mb4', # 设置字符集以避免编码问题
# cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
)
cursor = conn.cursor()
# 获取指定天数前的日期
# now_time = datetime.now()
# target_time = now_time + timedelta(days=-days_back)
# SQL 查询语句
sql = f"""
SELECT * FROM JianDaoYun_DailyVisit_Commission;
"""
# 执行查询并获取结果
cursor.execute(sql)
rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表
# 将结果转换为 DataFrame
if rows: # 如果有数据
data_commission = pd.DataFrame(rows)
else: # 如果没有数据,返回空 DataFrame
data_commission = pd.DataFrame()
# 关闭游标
cursor.close()
headers = [
"门店id", "提成类型_二级分类", "提成基数(本月)", "提成基数(上月)", "公司id", "门店编码", "门店名称"
]
data_commission.columns = headers
return data_commission
except Exception as e:
error_task_logger.error(f"获取小六提成数据时出错: {e}")
return None
def get_differentindustries_details(self, ):
"""
从f6operation_data_relay数据库中获取异业合作数据。
返回pandas DataFrame。
"""
try:
# 获得连接并创建游标
conn = pymysql.connect(
host=Config.BI_CONN_host,
database=Config.BI_CONN_INFO_database,
user=Config.BI_CONN_INFO_user,
password=Config.BI_CONN_INFO_password,
# charset='utf8mb4', # 设置字符集以避免编码问题
# cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
)
cursor = conn.cursor()
# 获取指定天数前的日期
# now_time = datetime.now()
# target_time = now_time + timedelta(days=-days_back)
# SQL 查询语句
sql = f"""
SELECT * FROM JianDaoYun_DailyVisit_DifferentIndustries;
"""
# 执行查询并获取结果
cursor.execute(sql)
rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表
# 将结果转换为 DataFrame
if rows: # 如果有数据
data_commission = pd.DataFrame(rows)
else: # 如果没有数据,返回空 DataFrame
data_commission = pd.DataFrame()
# 关闭游标
cursor.close()
headers = [
"门店id", "商品名称", "服务期结束时间", "门店名称", "公司id", "门店编码"
]
data_commission.columns = headers
return data_commission
except Exception as e:
error_task_logger.error(f"获取异业合作数据时出错: {e}")
return None
def get_perforamnce_details(self, ):
"""
从f6operation_data_relay数据库中获取履约表数据
返回pandas DataFrame。
"""
try:
# 获得连接并创建游标
conn = pymysql.connect(
host=Config.BI_CONN_host,
database=Config.BI_CONN_INFO_database,
user=Config.BI_CONN_INFO_user,
password=Config.BI_CONN_INFO_password,
# charset='utf8mb4', # 设置字符集以避免编码问题
# cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
)
cursor = conn.cursor()
# SQL 查询语句
sql = f"""
SELECT * FROM SAAS_Fulfillment_Details;
"""
# 执行查询并获取结果
cursor.execute(sql)
rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表
# 将结果转换为 DataFrame
if rows: # 如果有数据
data_commission = pd.DataFrame(rows)
else: # 如果没有数据,返回空 DataFrame
data_commission = pd.DataFrame()
# 关闭游标
cursor.close()
headers = ["订单id", "f6订单编号", "宜搭的实例id", "商品id", "商品名称", "发布商品类型", "发布商品类型描述",
"门店id", "门店名称", "门店编码", "运营负责人", "区域经理", "商户中心id", "saas开户时间",
"公司id", "公司名称", "产生来源", "产生来源描述", "类型", "类型描述", "服务年份",
"订单服务期第几年", "服务期起始时间", "服务期结束时间", "提成业务类型", "提成类别",
"下单支付成功时间", "实付金额", "系统成本价", "版本费", "服务费", "介绍人员工ID",
"介绍业绩归属人员工ID", "处理人ID employee_id", "业绩归属人员工ID", "操作时间",
"处理人是否跟进,0: 未跟进,1: 已跟进", "满意度评分", "评价完成时间", "介绍人用户类型",
"下单支付成功日期", "培训完成时间", "订单所处阶段", "日分区"]
data_commission.columns = headers
float_columns = ["实付金额", "系统成本价", "版本费", "服务费"]
for col in float_columns:
data_commission[col] = data_commission[col].astype(float)
return data_commission
except Exception as e:
error_task_logger.error(f"获取履约表数据时出错: {e}")
return None
def get_GroupNotification_details(self, ):
"""
从f6operation_data_relay数据库中获取短信数据支撑数据。
返回pandas DataFrame。
"""
try:
# 获得连接并创建游标
conn = pymysql.connect(
host=Config.BI_CONN_host,
database=Config.BI_CONN_INFO_database,
user=Config.BI_CONN_INFO_user,
password=Config.BI_CONN_INFO_password,
# charset='utf8mb4', # 设置字符集以避免编码问题
# cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
)
cursor = conn.cursor()
# 获取指定天数前的日期
# now_time = datetime.now()
# target_time = now_time + timedelta(days=-days_back)
# SQL 查询语句
sql = f"""
SELECT * FROM jiandaoyun_dailyvisit_groupnotification;
"""
# 执行查询并获取结果
cursor.execute(sql)
rows = cursor.fetchall() # pymysql 的 DictCursor 会返回字典列表
# 将结果转换为 DataFrame
if rows: # 如果有数据
data_commission = pd.DataFrame(rows)
else: # 如果没有数据,返回空 DataFrame
data_commission = pd.DataFrame()
# 关闭游标
cursor.close()
headers = [
"公司id", "是否启动短信功能", "是否购买短信包", "累计购买总数", "累计发送成功总人数", "剩余短信条数",
"实付总金额", "短信剩余量是否小于20%", "第一次短信购买时间", "最近一次短信购买时间"
]
data_commission.columns = headers
return data_commission
except Exception as e:
error_task_logger.error(f"获取短信数据支撑数据时出错: {e}")
return None
from datetime import datetime, timedelta, UTC, timezone
def send_task_status(self, task_start_time: str, task_name: str) -> None:
"""
将任务状态发送到简道云(开始时间为北京时间,需转换到 UTC)
:param task_start_time: 任务开始时间(字符串格式:"%Y-%m-%d %H:%M:%S",表示北京时间 UTC+8
:param task_name: 任务名称
"""
try:
# 1. 获取当前 UTC 时间(时区感知对象)
end_time_utc = datetime.now(UTC) # ✅ 替代 utcnow()
# 2. 解析传入的北京时间(UTC+8)
task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S")
# 3. 转换为 UTC 时间(减去 8 小时,并附加 UTC 时区)
task_start_utc = task_start_naive - timedelta(hours=8)
task_start_utc = task_start_utc.replace(tzinfo=timezone.utc) # 显式标记为 UTC
# 4. 计算运行时间(时区感知对象可直接相减)
run_time = end_time_utc - task_start_utc
run_time_sec = int(run_time.total_seconds())
# 5. 格式化时间为 UTC 的 ISO 8601 格式(带 "Z"
today_utc = end_time_utc.strftime("%Y-%m-%d")
task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
# 6. 构造请求数据(所有时间以 UTC 格式发送)
payload = {
"api_key": Config.SCHEDULED_TASKS_APP_ID,
"entry_id": Config.JDY_TASKS_ENTRY_ID,
"data": {
"_widget_1744873387500": {"value": today_utc}, # UTC 日期
"_widget_1743644977694": {"value": task_name},
"_widget_1744873387501": {"value": task_start_iso}, # UTC 开始时间
"_widget_1744873387502": {"value": task_end_iso}, # UTC 结束时间
"_widget_1744873387504": {"value": run_time_sec},
}
}
# 7. 发送请求
response = api_instance.data_batch_create(payload)
logger.info(f"任务状态发送成功: {response}")
except Exception as e:
error_task_logger.error(f"任务状态发送失败: {e}")
task_start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
CommonModule.send_task_error(task_start_time, "发送任务状态", e)
def send_task_error(self, task_start_time: str, task_name: str, error_message: str,
df: pd.DataFrame = None) -> None:
"""
将任务失败情况发送到简道云(影响业务数据时调用)
:param df: 失败文件
:param task_start_time: 任务开始时间(字符串格式:"%Y-%m-%d %H:%M:%S",表示北京时间 UTC+8
:param task_name: 任务名称
:param error_message: 失败详情
"""
try:
# 1. 获取当前 UTC 时间(时区感知对象)
end_time_utc = datetime.now(UTC) # ✅ 替代 utcnow()
# 2. 解析传入的北京时间(UTC+8)
task_start_naive = datetime.strptime(task_start_time, "%Y-%m-%d %H:%M:%S")
# 3. 转换为 UTC 时间(减去 8 小时,并附加 UTC 时区)
task_start_utc = task_start_naive - timedelta(hours=8)
task_start_utc = task_start_utc.replace(tzinfo=timezone.utc) # 显式标记为 UTC
# 4. 计算运行时间(时区感知对象可直接相减)
run_time = end_time_utc - task_start_utc
run_time_sec = int(run_time.total_seconds())
# 5. 格式化时间为 UTC 的 ISO 8601 格式(带 "Z"
today_utc = end_time_utc.strftime("%Y-%m-%d")
task_end_iso = end_time_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
task_start_iso = task_start_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
# 6.上传附件
UUid = time.strftime("%Y%m%d%H%M%S", time.localtime())
if df is not None:
df.to_excel("upload_file.xlsx", index=False)
file_path = "upload_file.xlsx"
up_data = api_instance.get_upload_token(
{"api_key": "6694d3c4fcb69ca9a111a6c4", "entry_id": "689ae65da00c17578e27cd74",
"transaction_id": UUid})
upload_url = up_data.get("upload_url")
upload_token = up_data.get("upload_token")
upload_result = api_instance.upload_file(
{"upload_url": upload_url, "upload_token": upload_token, "file_path": file_path})
upload_key = upload_result.get("key")
else:
upload_key = ""
# 7. 构造请求数据(所有时间以 UTC 格式发送)
payload = {
"api_key": Config.SCHEDULED_TASKS_APP_ID,
"entry_id": Config.JDY_TASKS_ERROR_ENTRY_ID,
"data": {
"_widget_1744873387500": {"value": today_utc}, # UTC 日期
"_widget_1743644977694": {"value": task_name},
"_widget_1744873387501": {"value": task_start_iso}, # UTC 开始时间
"_widget_1744873387502": {"value": task_end_iso}, # UTC 结束时间
"_widget_1744873387504": {"value": run_time_sec},
"_widget_1754981992215": {"value": error_message}, # 错误信息
"_widget_1764830825356": {"value": [upload_key]}
},
"transaction_id": UUid
}
# 8. 发送请求
response = api_instance.data_batch_create(payload)
logger.info(f"任务错误发生成功: {response}")
except Exception as e:
error_task_logger.error(f"任务错误发送失败: {e}")