241 lines
5.5 KiB
Markdown
241 lines
5.5 KiB
Markdown
# MySQLAgent 使用文档
|
||
|
||
**最后更新于:2023-08-06**
|
||
**代码版本:1.2.0**
|
||
|
||
> **环境要求:**
|
||
> - Python ≥ 3.8
|
||
> - PyMySQL ≥ 1.0.2
|
||
> - pandas ≥ 1.3.0
|
||
|
||
---
|
||
|
||
## 1. 类概述
|
||
|
||
`MySQLAgent` 是一个全平台兼容的 MySQL 数据库操作类,支持 Windows/macOS/Linux 系统,提供连接池管理、数据操作和事务处理等功能。
|
||
|
||
### 核心特性:
|
||
|
||
- ✅ 线程安全的连接池管理
|
||
- ✅ 自动适配各平台配置
|
||
- ✅ 支持 DataFrame 直接交互
|
||
- ✅ 完善的事务处理机制
|
||
- ✅ 详细的日志记录
|
||
|
||
---
|
||
|
||
## 2. 初始化配置
|
||
|
||
### 基本配置参数
|
||
```python
|
||
Config = {
|
||
'host': 'localhost', # 数据库主机
|
||
'port': 3306, # 端口
|
||
'user': 'root', # 用户名
|
||
'password': '123123', # 密码
|
||
'database': 'test_db', # 数据库名
|
||
'charset': 'utf8mb4', # 字符集(默认 utf8mb4)
|
||
'max_connections': 5, # 最大连接数(默认 5)
|
||
'connect_timeout': 10, # 连接超时(秒)
|
||
'read_timeout': 30, # 读取超时(秒)
|
||
'write_timeout': 30, # 写入超时(秒)
|
||
'ssl': None # SSL 配置
|
||
}
|
||
```
|
||
|
||
|
||
### 获取平台默认配置
|
||
```python
|
||
from mysql_agent import get_default_config
|
||
|
||
# 自动根据当前操作系统返回优化配置
|
||
config = get_default_config()
|
||
|
||
# 可覆盖默认值
|
||
config.update({
|
||
'host': '192.168.1.100',
|
||
'database': 'production_db'
|
||
})
|
||
|
||
db = MySQLAgent(config)
|
||
```
|
||
|
||
### 各平台特殊配置
|
||
| 平台 | 默认超时(连接/读/写) | SSL 配置 | 批处理优化 |
|
||
|---------|------------------------|----------------|------------------|
|
||
| Windows | 10/30/30 秒 | 禁用 | 小批次 (100-500) |
|
||
| macOS | 15/60/60 秒 | 自动检测证书 | 中批次 (500-1000)|
|
||
| Linux | 15/60/60 秒 | 禁用 | 大批次 (1000+) |
|
||
|
||
## 3. 基础CRUD操作
|
||
### 查询数据
|
||
```python
|
||
# 返回 DataFrame
|
||
df = db.query_to_df(
|
||
"SELECT * FROM users WHERE age > %s",
|
||
params=(18,),
|
||
parse_dates=['create_time'] # 自动解析日期字段
|
||
)
|
||
|
||
# 直接执行 SQL 返回原始结果
|
||
result = db.execute_sql(
|
||
"SELECT name, email FROM users WHERE status = %s",
|
||
params={'status': 1},
|
||
fetch=True # 设为 True 返回查询结果
|
||
)
|
||
```
|
||
|
||
### 插入数据
|
||
```python
|
||
# 单条插入
|
||
data = {'name': '张三', 'age': 25}
|
||
db.execute_sql(
|
||
"INSERT INTO users (name, age) VALUES (%(name)s, %(age)s)",
|
||
params=data
|
||
)
|
||
|
||
# 批量插入 DataFrame
|
||
import pandas as pd
|
||
new_users = pd.DataFrame({
|
||
'name': ['李四', '王五'],
|
||
'age': [28, 32]
|
||
})
|
||
inserted_rows = db.insert_from_df(
|
||
'users',
|
||
new_users,
|
||
chunk_size=500 # 分批插入大小
|
||
)
|
||
```
|
||
|
||
### 更新数据
|
||
```python
|
||
# 条件更新
|
||
db.execute_sql(
|
||
"UPDATE users SET status = %s WHERE last_login < %s",
|
||
params=(0, '2023-01-01')
|
||
)
|
||
|
||
# 使用 DataFrame 更新
|
||
update_df = pd.DataFrame({
|
||
'id': [1, 2],
|
||
'status': [1, 0]
|
||
})
|
||
affected_rows = db.update_from_df(
|
||
'users',
|
||
update_df,
|
||
key_columns='id' # 用于匹配记录的关键列
|
||
)
|
||
```
|
||
|
||
### 删除数据
|
||
```python
|
||
# 条件删除
|
||
db.execute_sql(
|
||
"DELETE FROM logs WHERE created_at < %s",
|
||
params=('2022-01-01',)
|
||
)
|
||
```
|
||
|
||
## 4. 表结构管理
|
||
### 创建表
|
||
```python
|
||
# 根据 DataFrame 自动创建表
|
||
sample_data = pd.DataFrame({
|
||
'id': pd.Series(dtype='int'),
|
||
'name': pd.Series(dtype='str'),
|
||
'created_at': pd.Series(dtype='datetime64[ns]')
|
||
})
|
||
db.create_table_from_df(
|
||
'new_table',
|
||
sample_data,
|
||
primary_key='id' # 指定主键
|
||
)
|
||
|
||
# 手动创建表
|
||
db.execute_sql("""
|
||
CREATE TABLE IF NOT EXISTS products (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
name VARCHAR(100) NOT NULL,
|
||
price DECIMAL(10,2),
|
||
stock INT DEFAULT 0
|
||
)
|
||
""")
|
||
```
|
||
|
||
### 表操作
|
||
```python
|
||
# 检查表是否存在
|
||
if db.table_exists('users'):
|
||
print("用户表已存在")
|
||
|
||
# 删除表
|
||
db.drop_table('temp_table')
|
||
|
||
# 获取表结构
|
||
schema = db._get_table_info('products')
|
||
```
|
||
### 字段修改
|
||
```python
|
||
# 字段b修改为c并转换数据类型为datetime
|
||
try:
|
||
db.execute_sql("ALTER TABLE a CHANGE COLUMN b c DATETIME")
|
||
except pymysql.err.InternalError as e:
|
||
print(f"修改失败: {str(e)}")
|
||
```
|
||
|
||
## 5. 事务管理
|
||
### 基本事务
|
||
```python
|
||
conn = db.begin_transaction()
|
||
try:
|
||
cursor = conn.cursor()
|
||
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
|
||
cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
|
||
db.commit_transaction(conn)
|
||
except Exception as e:
|
||
db.rollback_transaction(conn)
|
||
raise
|
||
```
|
||
|
||
### 上下文管理器
|
||
```python
|
||
with db.begin_transaction() as conn:
|
||
conn.cursor().execute("INSERT INTO logs (message) VALUES ('Transaction start')")
|
||
# 其他操作...
|
||
# 无需显式 commit/rollback
|
||
```
|
||
|
||
## 6. 高级功能
|
||
### 大数据量处理
|
||
```python
|
||
# 分块读取大数据
|
||
chunk_size = 10000
|
||
for chunk in pd.read_sql_query(
|
||
"SELECT * FROM large_table",
|
||
con=db.get_connection(),
|
||
chunksize=chunk_size
|
||
):
|
||
process_chunk(chunk)
|
||
|
||
# 批量插入优化
|
||
large_df = generate_large_data() # 假设返回 10 万行数据
|
||
db.insert_from_df(
|
||
'target_table',
|
||
large_df,
|
||
chunk_size=2000 # 根据平台自动调整
|
||
)
|
||
```
|
||
|
||
### 并发查询
|
||
```python
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
|
||
def fetch_user(user_id):
|
||
return db.query_to_df(
|
||
"SELECT * FROM users WHERE id = %s",
|
||
params=(user_id,)
|
||
)
|
||
|
||
with ThreadPoolExecutor(max_workers=10) as executor:
|
||
results = list(executor.map(fetch_user, range(1, 1001)))
|
||
``` |