From b33d61c774cc14c9ec38ba08d6b5000552796391 Mon Sep 17 00:00:00 2001 From: Administrator <1415243231@qq.com> Date: Wed, 6 Aug 2025 18:01:26 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E6=93=8D=E4=BD=9C?= =?UTF-8?q?=E8=AF=B4=E6=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/数据库操作.md | 398 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 398 insertions(+) create mode 100644 doc/数据库操作.md diff --git a/doc/数据库操作.md b/doc/数据库操作.md new file mode 100644 index 0000000..2e1da29 --- /dev/null +++ b/doc/数据库操作.md @@ -0,0 +1,398 @@ +目录 +1. +类概述 +2. +初始化配置 +3. +基础CRUD操作 +4. +表结构管理 +5. +事务管理 +6. +高级功能 +7. +注意事项 +8. +示例代码 +9. +性能优化 +10. +错误处理 +类概述 +MySQLAgent 是一个全平台兼容的MySQL数据库操作类,支持Windows/macOS/Linux系统,提供连接池管理、数据操作和事务处理等功能。 + +​核心特性:​​ + +• +线程安全的连接池管理 +• +自动适配各平台配置 +• +支持DataFrame直接交互 +• +完善的事务处理机制 +• +详细的日志记录 +初始化配置 +基本配置参数 +python +下载 +复制 +运行 +{ +'host': 'localhost', # 数据库主机 +'port': 3306, # 端口 +'user': 'root', # 用户名 +'password': '123123', # 密码 +'database': 'test_db', # 数据库名 +'charset': 'utf8mb4', # 字符集(默认utf8mb4) +'max_connections': 5, # 最大连接数(默认5) +'connect_timeout': 10, # 连接超时(秒) +'read_timeout': 30, # 读取超时(秒) +'write_timeout': 30, # 写入超时(秒) +'ssl': None # SSL配置 +} +获取平台默认配置 +python +下载 +复制 +运行 +from mysql_agent import get_default_config + +# 自动根据当前操作系统返回优化配置 +config = get_default_config() + +# 可覆盖默认值 +config.update({ +'host': '192.168.1.100', +'database': 'production_db' +}) + +db = MySQLAgent(config) +各平台特殊配置 +平台 默认超时 SSL配置 批处理优化 +Windows 10/30/30秒 禁用 小批次(100-500) +macOS 15/60/60秒 自动检测证书 中批次(500-1000) +Linux 15/60/60秒 禁用 大批次(1000+) +基础CRUD操作 +查询数据 +python +下载 +复制 +运行 +# 返回DataFrame +df = db.query_to_df( +"SELECT * FROM users WHERE age > %s", +params=(18,), # 参数可以是元组或字典 +parse_dates=['create_time'] # 自动解析日期字段 +) + +# 直接执行SQL返回原始结果 +result = db.execute_sql( +"SELECT name, email FROM users WHERE status = %s", +params={'status': 1}, # 使用字典参数 +fetch=True # 设为True返回查询结果 +) +插入数据 +python +下载 +复制 +运行 +# 单条插入 +data = {'name': '张三', 'age': 25} +db.execute_sql( +"INSERT INTO users (name, age) VALUES (%(name)s, %(age)s)", +params=data +) + +# 批量插入DataFrame +import pandas as pd +new_users = pd.DataFrame({ +'name': ['李四', '王五'], +'age': [28, 32] +}) +inserted_rows = db.insert_from_df( +'users', +new_users, +chunk_size=500 # 分批插入大小 +) +更新数据 +python +下载 +复制 +运行 +# 条件更新 +db.execute_sql( +"UPDATE users SET status = %s WHERE last_login < %s", +params=(0, '2023-01-01') +) + +# 使用DataFrame更新 +update_df = pd.DataFrame({ +'id': [1, 2], +'status': [1, 0] +}) +affected_rows = db.update_from_df( +'users', +update_df, +key_columns='id' # 用于匹配记录的关键列 +) +删除数据 +python +下载 +复制 +运行 +# 条件删除 +db.execute_sql( +"DELETE FROM logs WHERE created_at < %s", +params=('2022-01-01',) +) +表结构管理 +创建表 +python +下载 +复制 +运行 +# 根据DataFrame自动创建表 +sample_data = pd.DataFrame({ +'id': pd.Series(dtype='int'), +'name': pd.Series(dtype='str'), +'created_at': pd.Series(dtype='datetime64[ns]') +}) +db.create_table_from_df( +'new_table', +sample_data, +primary_key='id' # 指定主键 +) + +# 手动创建表 +db.execute_sql(""" +CREATE TABLE IF NOT EXISTS products ( +id INT AUTO_INCREMENT PRIMARY KEY, +name VARCHAR(100) NOT NULL, +price DECIMAL(10,2), +stock INT DEFAULT 0 +) +""") +表操作 +python +下载 +复制 +运行 +# 检查表是否存在 +if db.table_exists('users'): +print("用户表已存在") + +# 删除表 +db.drop_table('temp_table') + +# 获取表结构 +schema = db._get_table_info('products') +事务管理 +基本事务 +python +下载 +复制 +运行 +conn = db.begin_transaction() +try: +cursor = conn.cursor() +cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1") +cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE id = 2") +db.commit_transaction(conn) +except Exception as e: +db.rollback_transaction(conn) +raise +上下文管理器 +python +下载 +复制 +运行 +with db.begin_transaction() as conn: +conn.cursor().execute("INSERT INTO logs (message) VALUES ('Transaction start')") +# 其他操作... +# 无需显式commit/rollback +高级功能 +大数据量处理 +python +下载 +复制 +运行 +# 分块读取大数据 +chunk_size = 10000 +for chunk in pd.read_sql_query( +"SELECT * FROM large_table", +con=db.get_connection(), +chunksize=chunk_size +): +process_chunk(chunk) + +# 批量插入优化 +large_df = generate_large_data() # 假设返回10万行数据 +db.insert_from_df( +'target_table', +large_df, +chunk_size=2000 # 根据平台自动调整 +) +并发查询 +python +下载 +复制 +运行 +from concurrent.futures import ThreadPoolExecutor + +def fetch_user(user_id): +return db.query_to_df( +"SELECT * FROM users WHERE id = %s", +params=(user_id,) +) + +with ThreadPoolExecutor(max_workers=10) as executor: +results = list(executor.map(fetch_user, range(1, 1001))) +注意事项 +1. +​连接管理​ + +• +获取连接后必须确保关闭 +• +推荐使用with语句或try/finally +2. +​事务隔离​ + +• +长时间事务会占用连接池资源 +• +复杂事务考虑使用存储过程 +3. +​性能要点​ + +• +Windows平台减少批次大小 +• +macOS注意SSL证书路径 +• +Linux可增大连接池大小 +4. +​类型映射​ + +Pandas类型 MySQL类型 +int64 BIGINT +float64 DOUBLE +datetime64 DATETIME +object TEXT +示例代码 +完整业务场景 +python +下载 +复制 +运行 +class OrderSystem: +def __init__(self): +self.db = MySQLAgent(get_default_config()) + + def create_order(self, user_id, items): + """创建订单(完整事务示例)""" + conn = self.db.begin_transaction() + try: + # 1. 插入订单主表 + cursor = conn.cursor() + cursor.execute( + "INSERT INTO orders (user_id, total) VALUES (%s, %s)", + (user_id, sum(item['price']*item['quantity'] for item in items)) + ) + order_id = cursor.lastrowid + + # 2. 插入订单明细 + order_items = pd.DataFrame([{ + 'order_id': order_id, + 'product_id': item['product_id'], + 'quantity': item['quantity'], + 'price': item['price'] + } for item in items]) + + self.db.insert_from_df('order_items', order_items, conn=conn) + + # 3. 更新库存 + for item in items: + cursor.execute( + "UPDATE products SET stock = stock - %s WHERE id = %s", + (item['quantity'], item['product_id']) + ) + + self.db.commit_transaction(conn) + return order_id + except Exception as e: + self.db.rollback_transaction(conn) + raise +性能优化 +1. +​连接池调优​ + +python +下载 +复制 +运行 +# 生产环境推荐配置 +config = { +**get_default_config(), +'max_connections': 20, # 根据服务器配置调整 +'maxcached': 15, # 最大空闲连接 +'ping': 2 # 连接检查级别 +} +2. +​查询优化​ + +• +使用EXPLAIN分析慢查询 +• +添加适当索引 +• +避免SELECT * +3. +​批处理建议​ + +操作类型 Windows macOS/Linux +插入批次 100-500 1000-5000 +更新批次 50-200 500-2000 +错误处理 +常见错误码处理 +python +下载 +复制 +运行 +try: +db.execute_sql("INSERT INTO...") +except pymysql.err.IntegrityError as e: +# 唯一键冲突 +if e.args[0] == 1062: +handle_duplicate_entry() +except pymysql.err.OperationalError as e: +# 连接超时 +if "timed out" in str(e): +retry_connection() +except Exception as e: +logger.error(f"Database error: {str(e)}") +raise +连接重试机制 +python +下载 +复制 +运行 +def safe_query(sql, params=None, max_retries=3): +for attempt in range(max_retries): +try: +return db.query_to_df(sql, params) +except (pymysql.err.OperationalError, pymysql.err.InterfaceError) as e: +if attempt == max_retries - 1: +raise +time.sleep(2 ** attempt) # 指数退避 +​提示​:本文档对应代码版本1.2.0,最后更新于2023-08-06。使用前请确保您的环境满足: + +• +Python ≥ 3.8 +• +PyMySQL ≥ 1.0.2 +• +pandas ≥ 1.3.0