import pandas as pd import requests import json from time import sleep from module import F6_module import mysql.connector from mysql.connector import Error from datetime import datetime class CouponDataProcessor: def __init__(self): self.f6_module = F6_module() self.base_url = "https://yunxiu.f6car.cn/macan/coupon/info/pagingCouponUsageRecord" self.headers = { 'accept': 'application/json, text/plain, */*', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'referer': 'https://yunxiu.f6car.cn/erp/view/index.html' } self.db_config = { 'host': "f6-public.rwlb.rds.aliyuncs.com", 'user': "rw_operation_data_relay", 'password': "m+q5Z4%IVuF9bf", 'database': "f6operation_data_relay" } # 衡时数据库链接配置-mysql self.username = "15222738424" self.password = "cw25966929@" def drop_column(self, cursor, table_name, column_name): """删除表中的指定列""" try: # 检查列是否存在 cursor.execute(f"SHOW COLUMNS FROM {table_name} LIKE '{column_name}'") if cursor.fetchone(): # 如果列存在,则删除 drop_query = f"ALTER TABLE {table_name} DROP COLUMN {column_name}" cursor.execute(drop_query) print(f"成功从表 {table_name} 中删除列 {column_name}") else: print(f"表 {table_name} 中不存在列 {column_name}") except Error as e: print(f"删除列失败: {e}") def _fetch_all_coupons(self, page_size=100): """获取所有分页数据""" cookies = self._login() params = { 'keyword': '', 'couponName': '', 'currentPage': '1', 'pageSize': str(page_size), 'sorts': '' } # 获取第一页确定总页数 first_page = self._fetch_page(params, cookies) if not first_page: return None total_records = first_page.get('info', {}).get('total', 0) if total_records == 0: return None total_pages = (total_records + page_size - 1) // page_size print(f"共发现 {total_records} 条记录,{total_pages} 页") # 收集所有数据 all_data = first_page.get('info', {}).get('list', []) for page in range(2, total_pages + 1): params['currentPage'] = str(page) print(f"正在获取第 {page}/{total_pages} 页...") page_data = self._fetch_page(params, cookies) if page_data: all_data.extend(page_data.get('info', {}).get('list', [])) sleep(0.5) # 礼貌延迟 return all_data def _login(self): """登录获取cookies""" res = self.f6_module.login_in(self.username, self.password) return requests.utils.dict_from_cookiejar(res.cookies) def _fetch_page(self, params, cookies, max_retries=3): """带重试机制的页面请求""" for attempt in range(max_retries): try: response = requests.get( self.base_url, params=params, cookies=cookies, headers=self.headers, timeout=10 ) response.raise_for_status() return response.json() except Exception as e: print(f"请求失败(尝试 {attempt + 1}/{max_retries}): {str(e)}") if attempt < max_retries - 1: sleep(2) return None def _process_data(self, raw_data): """处理原始数据""" df = pd.DataFrame(raw_data) if not df.empty: # 处理couponCarList字段(列表/字典转为JSON字符串) if 'couponCarList' in df.columns: df['couponCarList'] = df['couponCarList'].apply( lambda x: json.dumps(x, ensure_ascii=False) if pd.notna(x) else None ) # 同时提取carId和carNo df['carId'] = df['couponCarList'].apply( lambda x: json.loads(x)[0].get('carId') if pd.notna(x) else None ) df['carNo'] = df['couponCarList'].apply( lambda x: json.loads(x)[0].get('carNo') if pd.notna(x) else None ) # 处理couponInfo字段(字典转为JSON字符串) if 'couponInfo' in df.columns: df['couponInfo'] = df['couponInfo'].apply( lambda x: json.dumps(x, ensure_ascii=False) if pd.notna(x) else None ) # 同时展开部分常用字段 try: coupon_info = pd.json_normalize(df['couponInfo'].apply( lambda x: json.loads(x) if pd.notna(x) else {} )) df = pd.concat([df, coupon_info.add_prefix('couponInfo.')], axis=1) except Exception as e: print(f"展开couponInfo时出错: {str(e)}") # 处理时间字段 if 'takeTime' in df.columns: df['takeTime'] = pd.to_datetime(df['takeTime'], unit='ms') if 'useTime' in df.columns: df['useTime'] = pd.to_datetime(df['useTime'], unit='ms') # 重命名列 if 'id' in df.columns: df = df.rename(columns={'id': 'id1'}) return df def _import_to_database(self, df, table_name="coupon_usage_record_details", batch_size=1000): """直接将处理后的DataFrame导入MySQL""" conn = None cursor = None try: # 连接数据库 conn = mysql.connector.connect(**self.db_config) cursor = conn.cursor() # 删除表中的所有数据 print(f"正在清空表 {table_name} 中的数据...") cursor.execute(f"DELETE FROM {table_name}") cursor.execute(f"ALTER TABLE {table_name} AUTO_INCREMENT = 1") conn.commit() print(f"已成功清空表 {table_name} 中的所有数据") # 处理时间类型数据 datetime_columns = [col for col in df.columns if df[col].dtype == 'datetime64[ns]'] for col in datetime_columns: df[col] = df[col].apply(self._convert_datetime) # 处理所有数据,将NaN转为None df = df.where(pd.notna(df), None) # 获取数据库列信息 cursor.execute(f"SHOW COLUMNS FROM {table_name}") db_columns = [col[0] for col in cursor.fetchall() if col[0] != 'id'] # 确保DataFrame列与数据库列一致 df = df[db_columns] # 生成插入语句 columns = ', '.join([f"`{col}`" for col in df.columns]) placeholders = ', '.join(['%s'] * len(df.columns)) insert_query = f"INSERT INTO `{table_name}` ({columns}) VALUES ({placeholders})" # 分批插入数据 print("开始导入数据...") total_rows = len(df) for i in range(0, total_rows, batch_size): batch = df.iloc[i:i + batch_size] # 将DataFrame转换为元组列表,并处理所有数据类型 records = [tuple(self._convert_datetime(val) if isinstance(val, (pd.Timestamp, datetime)) else val for val in row) for row in batch.values] try: cursor.executemany(insert_query, records) conn.commit() print(f"已导入 {min(i + batch_size, total_rows)}/{total_rows} 条记录") except Error as e: conn.rollback() print(f"批量导入失败: {e}") # 尝试逐条导入以找出问题行 for idx, record in enumerate(records): try: cursor.execute(insert_query, record) conn.commit() except Error as e: print(f"第 {i + idx + 1} 行导入失败: {e}") print(f"问题数据: {record}") conn.rollback() print(f"成功导入 {total_rows} 条记录到 {table_name} 表") except Error as e: print(f"数据库操作失败: {e}") except Exception as e: print(f"发生错误: {e}") finally: if cursor: cursor.close() if conn: conn.close() @staticmethod def _convert_datetime(value): """将Pandas/NumPy时间类型转换为MySQL兼容的datetime""" if pd.isna(value): return None if isinstance(value, pd.Timestamp): return value.to_pydatetime() if isinstance(value, datetime): return value return value def execute_pipeline(self): """执行完整数据处理流程""" try: # 1. 获取数据 print("开始获取优惠券数据...") raw_data = self._fetch_all_coupons() if not raw_data: raise Exception("未能获取有效数据") # 2. 处理数据 print("处理数据中...") processed_df = self._process_data(raw_data) # 3. 直接导入数据库 self._import_to_database(processed_df) print("数据处理流程完成!") except Exception as e: print(f"流程执行失败: {e}") if __name__ == "__main__": processor = CouponDataProcessor() processor.execute_pipeline()