数据库操作说明

This commit is contained in:
2025-08-06 18:01:26 +08:00
parent c8456ce3f6
commit b33d61c774
+398
View File
@@ -0,0 +1,398 @@
目录
1.
类概述
2.
初始化配置
3.
基础CRUD操作
4.
表结构管理
5.
事务管理
6.
高级功能
7.
注意事项
8.
示例代码
9.
性能优化
10.
错误处理
类概述
MySQLAgent 是一个全平台兼容的MySQL数据库操作类,支持Windows/macOS/Linux系统,提供连接池管理、数据操作和事务处理等功能。
​核心特性:​
线程安全的连接池管理
自动适配各平台配置
支持DataFrame直接交互
完善的事务处理机制
详细的日志记录
初始化配置
基本配置参数
python
下载
复制
运行
{
'host': 'localhost', # 数据库主机
'port': 3306, # 端口
'user': 'root', # 用户名
'password': '123123', # 密码
'database': 'test_db', # 数据库名
'charset': 'utf8mb4', # 字符集(默认utf8mb4)
'max_connections': 5, # 最大连接数(默认5)
'connect_timeout': 10, # 连接超时(秒)
'read_timeout': 30, # 读取超时(秒)
'write_timeout': 30, # 写入超时(秒)
'ssl': None # SSL配置
}
获取平台默认配置
python
下载
复制
运行
from mysql_agent import get_default_config
# 自动根据当前操作系统返回优化配置
config = get_default_config()
# 可覆盖默认值
config.update({
'host': '192.168.1.100',
'database': 'production_db'
})
db = MySQLAgent(config)
各平台特殊配置
平台 默认超时 SSL配置 批处理优化
Windows 10/30/30秒 禁用 小批次(100-500)
macOS 15/60/60秒 自动检测证书 中批次(500-1000)
Linux 15/60/60秒 禁用 大批次(1000+)
基础CRUD操作
查询数据
python
下载
复制
运行
# 返回DataFrame
df = db.query_to_df(
"SELECT * FROM users WHERE age > %s",
params=(18,), # 参数可以是元组或字典
parse_dates=['create_time'] # 自动解析日期字段
)
# 直接执行SQL返回原始结果
result = db.execute_sql(
"SELECT name, email FROM users WHERE status = %s",
params={'status': 1}, # 使用字典参数
fetch=True # 设为True返回查询结果
)
插入数据
python
下载
复制
运行
# 单条插入
data = {'name': '张三', 'age': 25}
db.execute_sql(
"INSERT INTO users (name, age) VALUES (%(name)s, %(age)s)",
params=data
)
# 批量插入DataFrame
import pandas as pd
new_users = pd.DataFrame({
'name': ['李四', '王五'],
'age': [28, 32]
})
inserted_rows = db.insert_from_df(
'users',
new_users,
chunk_size=500 # 分批插入大小
)
更新数据
python
下载
复制
运行
# 条件更新
db.execute_sql(
"UPDATE users SET status = %s WHERE last_login < %s",
params=(0, '2023-01-01')
)
# 使用DataFrame更新
update_df = pd.DataFrame({
'id': [1, 2],
'status': [1, 0]
})
affected_rows = db.update_from_df(
'users',
update_df,
key_columns='id' # 用于匹配记录的关键列
)
删除数据
python
下载
复制
运行
# 条件删除
db.execute_sql(
"DELETE FROM logs WHERE created_at < %s",
params=('2022-01-01',)
)
表结构管理
创建表
python
下载
复制
运行
# 根据DataFrame自动创建表
sample_data = pd.DataFrame({
'id': pd.Series(dtype='int'),
'name': pd.Series(dtype='str'),
'created_at': pd.Series(dtype='datetime64[ns]')
})
db.create_table_from_df(
'new_table',
sample_data,
primary_key='id' # 指定主键
)
# 手动创建表
db.execute_sql("""
CREATE TABLE IF NOT EXISTS products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
price DECIMAL(10,2),
stock INT DEFAULT 0
)
""")
表操作
python
下载
复制
运行
# 检查表是否存在
if db.table_exists('users'):
print("用户表已存在")
# 删除表
db.drop_table('temp_table')
# 获取表结构
schema = db._get_table_info('products')
事务管理
基本事务
python
下载
复制
运行
conn = db.begin_transaction()
try:
cursor = conn.cursor()
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")
cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2")
db.commit_transaction(conn)
except Exception as e:
db.rollback_transaction(conn)
raise
上下文管理器
python
下载
复制
运行
with db.begin_transaction() as conn:
conn.cursor().execute("INSERT INTO logs (message) VALUES ('Transaction start')")
# 其他操作...
# 无需显式commit/rollback
高级功能
大数据量处理
python
下载
复制
运行
# 分块读取大数据
chunk_size = 10000
for chunk in pd.read_sql_query(
"SELECT * FROM large_table",
con=db.get_connection(),
chunksize=chunk_size
):
process_chunk(chunk)
# 批量插入优化
large_df = generate_large_data() # 假设返回10万行数据
db.insert_from_df(
'target_table',
large_df,
chunk_size=2000 # 根据平台自动调整
)
并发查询
python
下载
复制
运行
from concurrent.futures import ThreadPoolExecutor
def fetch_user(user_id):
return db.query_to_df(
"SELECT * FROM users WHERE id = %s",
params=(user_id,)
)
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(fetch_user, range(1, 1001)))
注意事项
1.
​连接管理​
获取连接后必须确保关闭
推荐使用with语句或try/finally
2.
​事务隔离​
长时间事务会占用连接池资源
复杂事务考虑使用存储过程
3.
​性能要点​
Windows平台减少批次大小
macOS注意SSL证书路径
Linux可增大连接池大小
4.
​类型映射​
Pandas类型 MySQL类型
int64 BIGINT
float64 DOUBLE
datetime64 DATETIME
object TEXT
示例代码
完整业务场景
python
下载
复制
运行
class OrderSystem:
def __init__(self):
self.db = MySQLAgent(get_default_config())
def create_order(self, user_id, items):
"""创建订单(完整事务示例)"""
conn = self.db.begin_transaction()
try:
# 1. 插入订单主表
cursor = conn.cursor()
cursor.execute(
"INSERT INTO orders (user_id, total) VALUES (%s, %s)",
(user_id, sum(item['price']*item['quantity'] for item in items))
)
order_id = cursor.lastrowid
# 2. 插入订单明细
order_items = pd.DataFrame([{
'order_id': order_id,
'product_id': item['product_id'],
'quantity': item['quantity'],
'price': item['price']
} for item in items])
self.db.insert_from_df('order_items', order_items, conn=conn)
# 3. 更新库存
for item in items:
cursor.execute(
"UPDATE products SET stock = stock - %s WHERE id = %s",
(item['quantity'], item['product_id'])
)
self.db.commit_transaction(conn)
return order_id
except Exception as e:
self.db.rollback_transaction(conn)
raise
性能优化
1.
​连接池调优​
python
下载
复制
运行
# 生产环境推荐配置
config = {
**get_default_config(),
'max_connections': 20, # 根据服务器配置调整
'maxcached': 15, # 最大空闲连接
'ping': 2 # 连接检查级别
}
2.
​查询优化​
使用EXPLAIN分析慢查询
添加适当索引
避免SELECT *
3.
​批处理建议​
操作类型 Windows macOS/Linux
插入批次 100-500 1000-5000
更新批次 50-200 500-2000
错误处理
常见错误码处理
python
下载
复制
运行
try:
db.execute_sql("INSERT INTO...")
except pymysql.err.IntegrityError as e:
# 唯一键冲突
if e.args[0] == 1062:
handle_duplicate_entry()
except pymysql.err.OperationalError as e:
# 连接超时
if "timed out" in str(e):
retry_connection()
except Exception as e:
logger.error(f"Database error: {str(e)}")
raise
连接重试机制
python
下载
复制
运行
def safe_query(sql, params=None, max_retries=3):
for attempt in range(max_retries):
try:
return db.query_to_df(sql, params)
except (pymysql.err.OperationalError, pymysql.err.InterfaceError) as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # 指数退避
​提示​:本文档对应代码版本1.2.0,最后更新于2023-08-06。使用前请确保您的环境满足:
Python ≥ 3.8
PyMySQL ≥ 1.0.2
pandas ≥ 1.3.0