Files
saas/test/coupon_data_to_BI.py
T
2025-09-23 16:35:34 +08:00

256 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pandas as pd
import requests
import json
from time import sleep
from module import F6_module
import mysql.connector
from mysql.connector import Error
from datetime import datetime
class CouponDataProcessor:
def __init__(self):
self.f6_module = F6_module()
self.base_url = "https://yunxiu.f6car.cn/macan/coupon/info/pagingCouponUsageRecord"
self.headers = {
'accept': 'application/json, text/plain, */*',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'referer': 'https://yunxiu.f6car.cn/erp/view/index.html'
}
self.db_config = {
'host': "f6-public.rwlb.rds.aliyuncs.com",
'user': "rw_operation_data_relay",
'password': "m+q5Z4%IVuF9bf",
'database': "f6operation_data_relay"
} # 衡时数据库链接配置-mysql
self.username = "15222738424"
self.password = "cw25966929@"
def drop_column(self, cursor, table_name, column_name):
"""删除表中的指定列"""
try:
# 检查列是否存在
cursor.execute(f"SHOW COLUMNS FROM {table_name} LIKE '{column_name}'")
if cursor.fetchone():
# 如果列存在,则删除
drop_query = f"ALTER TABLE {table_name} DROP COLUMN {column_name}"
cursor.execute(drop_query)
print(f"成功从表 {table_name} 中删除列 {column_name}")
else:
print(f"{table_name} 中不存在列 {column_name}")
except Error as e:
print(f"删除列失败: {e}")
def _fetch_all_coupons(self, page_size=100):
"""获取所有分页数据"""
cookies = self._login()
params = {
'keyword': '',
'couponName': '',
'currentPage': '1',
'pageSize': str(page_size),
'sorts': ''
}
# 获取第一页确定总页数
first_page = self._fetch_page(params, cookies)
if not first_page:
return None
total_records = first_page.get('info', {}).get('total', 0)
if total_records == 0:
return None
total_pages = (total_records + page_size - 1) // page_size
print(f"共发现 {total_records} 条记录,{total_pages}")
# 收集所有数据
all_data = first_page.get('info', {}).get('list', [])
for page in range(2, total_pages + 1):
params['currentPage'] = str(page)
print(f"正在获取第 {page}/{total_pages} 页...")
page_data = self._fetch_page(params, cookies)
if page_data:
all_data.extend(page_data.get('info', {}).get('list', []))
sleep(0.5) # 礼貌延迟
return all_data
def _login(self):
"""登录获取cookies"""
res = self.f6_module.login_in(self.username, self.password)
return requests.utils.dict_from_cookiejar(res.cookies)
def _fetch_page(self, params, cookies, max_retries=3):
"""带重试机制的页面请求"""
for attempt in range(max_retries):
try:
response = requests.get(
self.base_url,
params=params,
cookies=cookies,
headers=self.headers,
timeout=10
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"请求失败(尝试 {attempt + 1}/{max_retries}: {str(e)}")
if attempt < max_retries - 1:
sleep(2)
return None
def _process_data(self, raw_data):
"""处理原始数据"""
df = pd.DataFrame(raw_data)
if not df.empty:
# 处理couponCarList字段(列表/字典转为JSON字符串)
if 'couponCarList' in df.columns:
df['couponCarList'] = df['couponCarList'].apply(
lambda x: json.dumps(x, ensure_ascii=False) if pd.notna(x) else None
)
# 同时提取carId和carNo
df['carId'] = df['couponCarList'].apply(
lambda x: json.loads(x)[0].get('carId') if pd.notna(x) else None
)
df['carNo'] = df['couponCarList'].apply(
lambda x: json.loads(x)[0].get('carNo') if pd.notna(x) else None
)
# 处理couponInfo字段(字典转为JSON字符串)
if 'couponInfo' in df.columns:
df['couponInfo'] = df['couponInfo'].apply(
lambda x: json.dumps(x, ensure_ascii=False) if pd.notna(x) else None
)
# 同时展开部分常用字段
try:
coupon_info = pd.json_normalize(df['couponInfo'].apply(
lambda x: json.loads(x) if pd.notna(x) else {}
))
df = pd.concat([df, coupon_info.add_prefix('couponInfo.')], axis=1)
except Exception as e:
print(f"展开couponInfo时出错: {str(e)}")
# 处理时间字段
if 'takeTime' in df.columns:
df['takeTime'] = pd.to_datetime(df['takeTime'], unit='ms')
if 'useTime' in df.columns:
df['useTime'] = pd.to_datetime(df['useTime'], unit='ms')
# 重命名列
if 'id' in df.columns:
df = df.rename(columns={'id': 'id1'})
return df
def _import_to_database(self, df, table_name="coupon_usage_record_details", batch_size=1000):
"""直接将处理后的DataFrame导入MySQL"""
conn = None
cursor = None
try:
# 连接数据库
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor()
# 删除表中的所有数据
print(f"正在清空表 {table_name} 中的数据...")
cursor.execute(f"DELETE FROM {table_name}")
cursor.execute(f"ALTER TABLE {table_name} AUTO_INCREMENT = 1")
conn.commit()
print(f"已成功清空表 {table_name} 中的所有数据")
# 处理时间类型数据
datetime_columns = [col for col in df.columns if df[col].dtype == 'datetime64[ns]']
for col in datetime_columns:
df[col] = df[col].apply(self._convert_datetime)
# 处理所有数据,将NaN转为None
df = df.where(pd.notna(df), None)
# 获取数据库列信息
cursor.execute(f"SHOW COLUMNS FROM {table_name}")
db_columns = [col[0] for col in cursor.fetchall() if col[0] != 'id']
# 确保DataFrame列与数据库列一致
df = df[db_columns]
# 生成插入语句
columns = ', '.join([f"`{col}`" for col in df.columns])
placeholders = ', '.join(['%s'] * len(df.columns))
insert_query = f"INSERT INTO `{table_name}` ({columns}) VALUES ({placeholders})"
# 分批插入数据
print("开始导入数据...")
total_rows = len(df)
for i in range(0, total_rows, batch_size):
batch = df.iloc[i:i + batch_size]
# 将DataFrame转换为元组列表,并处理所有数据类型
records = [tuple(self._convert_datetime(val) if isinstance(val, (pd.Timestamp, datetime)) else val
for val in row)
for row in batch.values]
try:
cursor.executemany(insert_query, records)
conn.commit()
print(f"已导入 {min(i + batch_size, total_rows)}/{total_rows} 条记录")
except Error as e:
conn.rollback()
print(f"批量导入失败: {e}")
# 尝试逐条导入以找出问题行
for idx, record in enumerate(records):
try:
cursor.execute(insert_query, record)
conn.commit()
except Error as e:
print(f"{i + idx + 1} 行导入失败: {e}")
print(f"问题数据: {record}")
conn.rollback()
print(f"成功导入 {total_rows} 条记录到 {table_name}")
except Error as e:
print(f"数据库操作失败: {e}")
except Exception as e:
print(f"发生错误: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
@staticmethod
def _convert_datetime(value):
"""将Pandas/NumPy时间类型转换为MySQL兼容的datetime"""
if pd.isna(value):
return None
if isinstance(value, pd.Timestamp):
return value.to_pydatetime()
if isinstance(value, datetime):
return value
return value
def execute_pipeline(self):
"""执行完整数据处理流程"""
try:
# 1. 获取数据
print("开始获取优惠券数据...")
raw_data = self._fetch_all_coupons()
if not raw_data:
raise Exception("未能获取有效数据")
# 2. 处理数据
print("处理数据中...")
processed_df = self._process_data(raw_data)
# 3. 直接导入数据库
self._import_to_database(processed_df)
print("数据处理流程完成!")
except Exception as e:
print(f"流程执行失败: {e}")
if __name__ == "__main__":
processor = CouponDataProcessor()
processor.execute_pipeline()