Files
2025-10-17 17:59:28 +08:00

5.5 KiB
Raw Permalink Blame History

MySQLAgent 使用文档

最后更新于:2023-08-06
代码版本:1.2.0

环境要求:

  • Python ≥ 3.8
  • PyMySQL ≥ 1.0.2
  • pandas ≥ 1.3.0

1. 类概述

MySQLAgent 是一个全平台兼容的 MySQL 数据库操作类,支持 Windows/macOS/Linux 系统,提供连接池管理、数据操作和事务处理等功能。

核心特性:

  • 线程安全的连接池管理
  • 自动适配各平台配置
  • 支持 DataFrame 直接交互
  • 完善的事务处理机制
  • 详细的日志记录

2. 初始化配置

基本配置参数

Config = {
    'host': 'localhost',           # 数据库主机
    'port': 3306,                  # 端口
    'user': 'root',                # 用户名
    'password': '123123',          # 密码
    'database': 'test_db',         # 数据库名
    'charset': 'utf8mb4',          # 字符集(默认 utf8mb4
    'max_connections': 5,          # 最大连接数(默认 5
    'connect_timeout': 10,         # 连接超时(秒)
    'read_timeout': 30,            # 读取超时(秒)
    'write_timeout': 30,           # 写入超时(秒)
    'ssl': None                    # SSL 配置
}

获取平台默认配置

from mysql_agent import get_default_config

# 自动根据当前操作系统返回优化配置
config = get_default_config()

# 可覆盖默认值
config.update({
    'host': '192.168.1.100',
    'database': 'production_db'
})

db = MySQLAgent(config)

各平台特殊配置

平台 默认超时(连接/读/写) SSL 配置 批处理优化
Windows 10/30/30 秒 禁用 小批次 (100-500)
macOS 15/60/60 秒 自动检测证书 中批次 (500-1000)
Linux 15/60/60 秒 禁用 大批次 (1000+)

3. 基础CRUD操作

查询数据

# 返回 DataFrame
df = db.query_to_df(
    "SELECT * FROM users WHERE age > %s",
    params=(18,),
    parse_dates=['create_time']  # 自动解析日期字段
)

# 直接执行 SQL 返回原始结果
result = db.execute_sql(
    "SELECT name, email FROM users WHERE status = %s",
    params={'status': 1},
    fetch=True  # 设为 True 返回查询结果
)

插入数据

# 单条插入
data = {'name': '张三', 'age': 25}
db.execute_sql(
    "INSERT INTO users (name, age) VALUES (%(name)s, %(age)s)",
    params=data
)

# 批量插入 DataFrame
import pandas as pd
new_users = pd.DataFrame({
    'name': ['李四', '王五'],
    'age': [28, 32]
})
inserted_rows = db.insert_from_df(
    'users',
    new_users,
    chunk_size=500  # 分批插入大小
)

更新数据

# 条件更新
db.execute_sql(
    "UPDATE users SET status = %s WHERE last_login < %s",
    params=(0, '2023-01-01')
)

# 使用 DataFrame 更新
update_df = pd.DataFrame({
    'id': [1, 2],
    'status': [1, 0]
})
affected_rows = db.update_from_df(
    'users',
    update_df,
    key_columns='id'  # 用于匹配记录的关键列
)

删除数据

# 条件删除
db.execute_sql(
    "DELETE FROM logs WHERE created_at < %s",
    params=('2022-01-01',)
)

4. 表结构管理

创建表

# 根据 DataFrame 自动创建表
sample_data = pd.DataFrame({
    'id': pd.Series(dtype='int'),
    'name': pd.Series(dtype='str'),
    'created_at': pd.Series(dtype='datetime64[ns]')
})
db.create_table_from_df(
    'new_table',
    sample_data,
    primary_key='id'  # 指定主键
)

# 手动创建表
db.execute_sql("""
CREATE TABLE IF NOT EXISTS products (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    price DECIMAL(10,2),
    stock INT DEFAULT 0
)
""")

表操作

# 检查表是否存在
if db.table_exists('users'):
    print("用户表已存在")

# 删除表
db.drop_table('temp_table')

# 获取表结构
schema = db._get_table_info('products')

字段修改

# 字段b修改为c并转换数据类型为datetime
try:
    db.execute_sql("ALTER TABLE a CHANGE COLUMN b c DATETIME")
except pymysql.err.InternalError as e:
    print(f"修改失败: {str(e)}")

5. 事务管理

基本事务

conn = db.begin_transaction()
try:
    cursor = conn.cursor()
    cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
    db.commit_transaction(conn)
except Exception as e:
    db.rollback_transaction(conn)
raise

上下文管理器

with db.begin_transaction() as conn:
    conn.cursor().execute("INSERT INTO logs (message) VALUES ('Transaction start')")
    # 其他操作...
    # 无需显式 commit/rollback

6. 高级功能

大数据量处理

# 分块读取大数据
chunk_size = 10000
for chunk in pd.read_sql_query(
    "SELECT * FROM large_table",
    con=db.get_connection(),
    chunksize=chunk_size
):
    process_chunk(chunk)

# 批量插入优化
large_df = generate_large_data()  # 假设返回 10 万行数据
db.insert_from_df(
    'target_table',
    large_df,
    chunk_size=2000  # 根据平台自动调整
)

并发查询

from concurrent.futures import ThreadPoolExecutor

def fetch_user(user_id):
    return db.query_to_df(
    "SELECT * FROM users WHERE id = %s",
    params=(user_id,)
    )

with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(fetch_user, range(1, 1001)))