256 lines
9.7 KiB
Python
256 lines
9.7 KiB
Python
import pandas as pd
|
||
import requests
|
||
import json
|
||
from time import sleep
|
||
from module import F6_module
|
||
import mysql.connector
|
||
from mysql.connector import Error
|
||
from datetime import datetime
|
||
|
||
|
||
class CouponDataProcessor:
|
||
def __init__(self):
|
||
self.f6_module = F6_module()
|
||
self.base_url = "https://yunxiu.f6car.cn/macan/coupon/info/pagingCouponUsageRecord"
|
||
self.headers = {
|
||
'accept': 'application/json, text/plain, */*',
|
||
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
|
||
'referer': 'https://yunxiu.f6car.cn/erp/view/index.html'
|
||
}
|
||
self.db_config = {
|
||
'host': "f6-public.rwlb.rds.aliyuncs.com",
|
||
'user': "rw_operation_data_relay",
|
||
'password': "m+q5Z4%IVuF9bf",
|
||
'database': "f6operation_data_relay"
|
||
} # 衡时数据库链接配置-mysql
|
||
self.username = "15222738424"
|
||
self.password = "cw25966929@"
|
||
|
||
def drop_column(self, cursor, table_name, column_name):
|
||
"""删除表中的指定列"""
|
||
try:
|
||
# 检查列是否存在
|
||
cursor.execute(f"SHOW COLUMNS FROM {table_name} LIKE '{column_name}'")
|
||
if cursor.fetchone():
|
||
# 如果列存在,则删除
|
||
drop_query = f"ALTER TABLE {table_name} DROP COLUMN {column_name}"
|
||
cursor.execute(drop_query)
|
||
print(f"成功从表 {table_name} 中删除列 {column_name}")
|
||
else:
|
||
print(f"表 {table_name} 中不存在列 {column_name}")
|
||
except Error as e:
|
||
print(f"删除列失败: {e}")
|
||
|
||
def _fetch_all_coupons(self, page_size=100):
|
||
"""获取所有分页数据"""
|
||
cookies = self._login()
|
||
params = {
|
||
'keyword': '',
|
||
'couponName': '',
|
||
'currentPage': '1',
|
||
'pageSize': str(page_size),
|
||
'sorts': ''
|
||
}
|
||
|
||
# 获取第一页确定总页数
|
||
first_page = self._fetch_page(params, cookies)
|
||
if not first_page:
|
||
return None
|
||
|
||
total_records = first_page.get('info', {}).get('total', 0)
|
||
if total_records == 0:
|
||
return None
|
||
|
||
total_pages = (total_records + page_size - 1) // page_size
|
||
print(f"共发现 {total_records} 条记录,{total_pages} 页")
|
||
|
||
# 收集所有数据
|
||
all_data = first_page.get('info', {}).get('list', [])
|
||
for page in range(2, total_pages + 1):
|
||
params['currentPage'] = str(page)
|
||
print(f"正在获取第 {page}/{total_pages} 页...")
|
||
page_data = self._fetch_page(params, cookies)
|
||
if page_data:
|
||
all_data.extend(page_data.get('info', {}).get('list', []))
|
||
sleep(0.5) # 礼貌延迟
|
||
|
||
return all_data
|
||
|
||
def _login(self):
|
||
"""登录获取cookies"""
|
||
res = self.f6_module.login_in(self.username, self.password)
|
||
return requests.utils.dict_from_cookiejar(res.cookies)
|
||
|
||
def _fetch_page(self, params, cookies, max_retries=3):
|
||
"""带重试机制的页面请求"""
|
||
for attempt in range(max_retries):
|
||
try:
|
||
response = requests.get(
|
||
self.base_url,
|
||
params=params,
|
||
cookies=cookies,
|
||
headers=self.headers,
|
||
timeout=10
|
||
)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
except Exception as e:
|
||
print(f"请求失败(尝试 {attempt + 1}/{max_retries}): {str(e)}")
|
||
if attempt < max_retries - 1:
|
||
sleep(2)
|
||
return None
|
||
|
||
def _process_data(self, raw_data):
|
||
"""处理原始数据"""
|
||
df = pd.DataFrame(raw_data)
|
||
|
||
if not df.empty:
|
||
# 处理couponCarList字段(列表/字典转为JSON字符串)
|
||
if 'couponCarList' in df.columns:
|
||
df['couponCarList'] = df['couponCarList'].apply(
|
||
lambda x: json.dumps(x, ensure_ascii=False) if pd.notna(x) else None
|
||
)
|
||
# 同时提取carId和carNo
|
||
df['carId'] = df['couponCarList'].apply(
|
||
lambda x: json.loads(x)[0].get('carId') if pd.notna(x) else None
|
||
)
|
||
df['carNo'] = df['couponCarList'].apply(
|
||
lambda x: json.loads(x)[0].get('carNo') if pd.notna(x) else None
|
||
)
|
||
|
||
# 处理couponInfo字段(字典转为JSON字符串)
|
||
if 'couponInfo' in df.columns:
|
||
df['couponInfo'] = df['couponInfo'].apply(
|
||
lambda x: json.dumps(x, ensure_ascii=False) if pd.notna(x) else None
|
||
)
|
||
# 同时展开部分常用字段
|
||
try:
|
||
coupon_info = pd.json_normalize(df['couponInfo'].apply(
|
||
lambda x: json.loads(x) if pd.notna(x) else {}
|
||
))
|
||
df = pd.concat([df, coupon_info.add_prefix('couponInfo.')], axis=1)
|
||
except Exception as e:
|
||
print(f"展开couponInfo时出错: {str(e)}")
|
||
|
||
# 处理时间字段
|
||
if 'takeTime' in df.columns:
|
||
df['takeTime'] = pd.to_datetime(df['takeTime'], unit='ms')
|
||
if 'useTime' in df.columns:
|
||
df['useTime'] = pd.to_datetime(df['useTime'], unit='ms')
|
||
|
||
# 重命名列
|
||
if 'id' in df.columns:
|
||
df = df.rename(columns={'id': 'id1'})
|
||
|
||
return df
|
||
|
||
def _import_to_database(self, df, table_name="coupon_usage_record_details", batch_size=1000):
|
||
"""直接将处理后的DataFrame导入MySQL"""
|
||
conn = None
|
||
cursor = None
|
||
try:
|
||
# 连接数据库
|
||
conn = mysql.connector.connect(**self.db_config)
|
||
cursor = conn.cursor()
|
||
|
||
# 删除表中的所有数据
|
||
print(f"正在清空表 {table_name} 中的数据...")
|
||
cursor.execute(f"DELETE FROM {table_name}")
|
||
cursor.execute(f"ALTER TABLE {table_name} AUTO_INCREMENT = 1")
|
||
conn.commit()
|
||
print(f"已成功清空表 {table_name} 中的所有数据")
|
||
|
||
# 处理时间类型数据
|
||
datetime_columns = [col for col in df.columns if df[col].dtype == 'datetime64[ns]']
|
||
for col in datetime_columns:
|
||
df[col] = df[col].apply(self._convert_datetime)
|
||
|
||
# 处理所有数据,将NaN转为None
|
||
df = df.where(pd.notna(df), None)
|
||
|
||
# 获取数据库列信息
|
||
cursor.execute(f"SHOW COLUMNS FROM {table_name}")
|
||
db_columns = [col[0] for col in cursor.fetchall() if col[0] != 'id']
|
||
|
||
# 确保DataFrame列与数据库列一致
|
||
df = df[db_columns]
|
||
|
||
# 生成插入语句
|
||
columns = ', '.join([f"`{col}`" for col in df.columns])
|
||
placeholders = ', '.join(['%s'] * len(df.columns))
|
||
insert_query = f"INSERT INTO `{table_name}` ({columns}) VALUES ({placeholders})"
|
||
|
||
# 分批插入数据
|
||
print("开始导入数据...")
|
||
total_rows = len(df)
|
||
for i in range(0, total_rows, batch_size):
|
||
batch = df.iloc[i:i + batch_size]
|
||
# 将DataFrame转换为元组列表,并处理所有数据类型
|
||
records = [tuple(self._convert_datetime(val) if isinstance(val, (pd.Timestamp, datetime)) else val
|
||
for val in row)
|
||
for row in batch.values]
|
||
try:
|
||
cursor.executemany(insert_query, records)
|
||
conn.commit()
|
||
print(f"已导入 {min(i + batch_size, total_rows)}/{total_rows} 条记录")
|
||
except Error as e:
|
||
conn.rollback()
|
||
print(f"批量导入失败: {e}")
|
||
# 尝试逐条导入以找出问题行
|
||
for idx, record in enumerate(records):
|
||
try:
|
||
cursor.execute(insert_query, record)
|
||
conn.commit()
|
||
except Error as e:
|
||
print(f"第 {i + idx + 1} 行导入失败: {e}")
|
||
print(f"问题数据: {record}")
|
||
conn.rollback()
|
||
|
||
print(f"成功导入 {total_rows} 条记录到 {table_name} 表")
|
||
|
||
except Error as e:
|
||
print(f"数据库操作失败: {e}")
|
||
except Exception as e:
|
||
print(f"发生错误: {e}")
|
||
finally:
|
||
if cursor:
|
||
cursor.close()
|
||
if conn:
|
||
conn.close()
|
||
|
||
@staticmethod
|
||
def _convert_datetime(value):
|
||
"""将Pandas/NumPy时间类型转换为MySQL兼容的datetime"""
|
||
if pd.isna(value):
|
||
return None
|
||
if isinstance(value, pd.Timestamp):
|
||
return value.to_pydatetime()
|
||
if isinstance(value, datetime):
|
||
return value
|
||
return value
|
||
|
||
def execute_pipeline(self):
|
||
"""执行完整数据处理流程"""
|
||
try:
|
||
# 1. 获取数据
|
||
print("开始获取优惠券数据...")
|
||
raw_data = self._fetch_all_coupons()
|
||
if not raw_data:
|
||
raise Exception("未能获取有效数据")
|
||
|
||
# 2. 处理数据
|
||
print("处理数据中...")
|
||
processed_df = self._process_data(raw_data)
|
||
|
||
# 3. 直接导入数据库
|
||
self._import_to_database(processed_df)
|
||
print("数据处理流程完成!")
|
||
|
||
except Exception as e:
|
||
print(f"流程执行失败: {e}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
processor = CouponDataProcessor()
|
||
processor.execute_pipeline()
|