高德匹配手机号

This commit is contained in:
z66
2025-09-23 16:35:34 +08:00
parent 30cacc7da2
commit 6321bb4cf8
15 changed files with 5343 additions and 5 deletions
+255
View File
@@ -0,0 +1,255 @@
import pandas as pd
import requests
import json
from time import sleep
from module import F6_module
import mysql.connector
from mysql.connector import Error
from datetime import datetime
class CouponDataProcessor:
def __init__(self):
self.f6_module = F6_module()
self.base_url = "https://yunxiu.f6car.cn/macan/coupon/info/pagingCouponUsageRecord"
self.headers = {
'accept': 'application/json, text/plain, */*',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'referer': 'https://yunxiu.f6car.cn/erp/view/index.html'
}
self.db_config = {
'host': "f6-public.rwlb.rds.aliyuncs.com",
'user': "rw_operation_data_relay",
'password': "m+q5Z4%IVuF9bf",
'database': "f6operation_data_relay"
} # 衡时数据库链接配置-mysql
self.username = "15222738424"
self.password = "cw25966929@"
def drop_column(self, cursor, table_name, column_name):
"""删除表中的指定列"""
try:
# 检查列是否存在
cursor.execute(f"SHOW COLUMNS FROM {table_name} LIKE '{column_name}'")
if cursor.fetchone():
# 如果列存在,则删除
drop_query = f"ALTER TABLE {table_name} DROP COLUMN {column_name}"
cursor.execute(drop_query)
print(f"成功从表 {table_name} 中删除列 {column_name}")
else:
print(f"{table_name} 中不存在列 {column_name}")
except Error as e:
print(f"删除列失败: {e}")
def _fetch_all_coupons(self, page_size=100):
"""获取所有分页数据"""
cookies = self._login()
params = {
'keyword': '',
'couponName': '',
'currentPage': '1',
'pageSize': str(page_size),
'sorts': ''
}
# 获取第一页确定总页数
first_page = self._fetch_page(params, cookies)
if not first_page:
return None
total_records = first_page.get('info', {}).get('total', 0)
if total_records == 0:
return None
total_pages = (total_records + page_size - 1) // page_size
print(f"共发现 {total_records} 条记录,{total_pages}")
# 收集所有数据
all_data = first_page.get('info', {}).get('list', [])
for page in range(2, total_pages + 1):
params['currentPage'] = str(page)
print(f"正在获取第 {page}/{total_pages} 页...")
page_data = self._fetch_page(params, cookies)
if page_data:
all_data.extend(page_data.get('info', {}).get('list', []))
sleep(0.5) # 礼貌延迟
return all_data
def _login(self):
"""登录获取cookies"""
res = self.f6_module.login_in(self.username, self.password)
return requests.utils.dict_from_cookiejar(res.cookies)
def _fetch_page(self, params, cookies, max_retries=3):
"""带重试机制的页面请求"""
for attempt in range(max_retries):
try:
response = requests.get(
self.base_url,
params=params,
cookies=cookies,
headers=self.headers,
timeout=10
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"请求失败(尝试 {attempt + 1}/{max_retries}: {str(e)}")
if attempt < max_retries - 1:
sleep(2)
return None
def _process_data(self, raw_data):
"""处理原始数据"""
df = pd.DataFrame(raw_data)
if not df.empty:
# 处理couponCarList字段(列表/字典转为JSON字符串)
if 'couponCarList' in df.columns:
df['couponCarList'] = df['couponCarList'].apply(
lambda x: json.dumps(x, ensure_ascii=False) if pd.notna(x) else None
)
# 同时提取carId和carNo
df['carId'] = df['couponCarList'].apply(
lambda x: json.loads(x)[0].get('carId') if pd.notna(x) else None
)
df['carNo'] = df['couponCarList'].apply(
lambda x: json.loads(x)[0].get('carNo') if pd.notna(x) else None
)
# 处理couponInfo字段(字典转为JSON字符串)
if 'couponInfo' in df.columns:
df['couponInfo'] = df['couponInfo'].apply(
lambda x: json.dumps(x, ensure_ascii=False) if pd.notna(x) else None
)
# 同时展开部分常用字段
try:
coupon_info = pd.json_normalize(df['couponInfo'].apply(
lambda x: json.loads(x) if pd.notna(x) else {}
))
df = pd.concat([df, coupon_info.add_prefix('couponInfo.')], axis=1)
except Exception as e:
print(f"展开couponInfo时出错: {str(e)}")
# 处理时间字段
if 'takeTime' in df.columns:
df['takeTime'] = pd.to_datetime(df['takeTime'], unit='ms')
if 'useTime' in df.columns:
df['useTime'] = pd.to_datetime(df['useTime'], unit='ms')
# 重命名列
if 'id' in df.columns:
df = df.rename(columns={'id': 'id1'})
return df
def _import_to_database(self, df, table_name="coupon_usage_record_details", batch_size=1000):
"""直接将处理后的DataFrame导入MySQL"""
conn = None
cursor = None
try:
# 连接数据库
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor()
# 删除表中的所有数据
print(f"正在清空表 {table_name} 中的数据...")
cursor.execute(f"DELETE FROM {table_name}")
cursor.execute(f"ALTER TABLE {table_name} AUTO_INCREMENT = 1")
conn.commit()
print(f"已成功清空表 {table_name} 中的所有数据")
# 处理时间类型数据
datetime_columns = [col for col in df.columns if df[col].dtype == 'datetime64[ns]']
for col in datetime_columns:
df[col] = df[col].apply(self._convert_datetime)
# 处理所有数据,将NaN转为None
df = df.where(pd.notna(df), None)
# 获取数据库列信息
cursor.execute(f"SHOW COLUMNS FROM {table_name}")
db_columns = [col[0] for col in cursor.fetchall() if col[0] != 'id']
# 确保DataFrame列与数据库列一致
df = df[db_columns]
# 生成插入语句
columns = ', '.join([f"`{col}`" for col in df.columns])
placeholders = ', '.join(['%s'] * len(df.columns))
insert_query = f"INSERT INTO `{table_name}` ({columns}) VALUES ({placeholders})"
# 分批插入数据
print("开始导入数据...")
total_rows = len(df)
for i in range(0, total_rows, batch_size):
batch = df.iloc[i:i + batch_size]
# 将DataFrame转换为元组列表,并处理所有数据类型
records = [tuple(self._convert_datetime(val) if isinstance(val, (pd.Timestamp, datetime)) else val
for val in row)
for row in batch.values]
try:
cursor.executemany(insert_query, records)
conn.commit()
print(f"已导入 {min(i + batch_size, total_rows)}/{total_rows} 条记录")
except Error as e:
conn.rollback()
print(f"批量导入失败: {e}")
# 尝试逐条导入以找出问题行
for idx, record in enumerate(records):
try:
cursor.execute(insert_query, record)
conn.commit()
except Error as e:
print(f"{i + idx + 1} 行导入失败: {e}")
print(f"问题数据: {record}")
conn.rollback()
print(f"成功导入 {total_rows} 条记录到 {table_name}")
except Error as e:
print(f"数据库操作失败: {e}")
except Exception as e:
print(f"发生错误: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()
@staticmethod
def _convert_datetime(value):
"""将Pandas/NumPy时间类型转换为MySQL兼容的datetime"""
if pd.isna(value):
return None
if isinstance(value, pd.Timestamp):
return value.to_pydatetime()
if isinstance(value, datetime):
return value
return value
def execute_pipeline(self):
"""执行完整数据处理流程"""
try:
# 1. 获取数据
print("开始获取优惠券数据...")
raw_data = self._fetch_all_coupons()
if not raw_data:
raise Exception("未能获取有效数据")
# 2. 处理数据
print("处理数据中...")
processed_df = self._process_data(raw_data)
# 3. 直接导入数据库
self._import_to_database(processed_df)
print("数据处理流程完成!")
except Exception as e:
print(f"流程执行失败: {e}")
if __name__ == "__main__":
processor = CouponDataProcessor()
processor.execute_pipeline()