Files
MADataManagment/MySQLHelper.py

236 lines
7.1 KiB
Python
Raw Permalink Normal View History

2025-08-18 14:05:59 +08:00
"""
MySqlHelper 增强版
增加事务管理
增加ID获取
增加表操作等使用功能
2025-08-20 17:30:14 +08:00
数据使用过程中出现异常时才输出日志
2025-08-18 14:05:59 +08:00
"""
2025-08-15 23:52:12 +08:00
import pymysql
from pymysql import Error
from typing import List, Dict, Union, Optional, Tuple
2025-08-18 14:05:59 +08:00
from contextlib import contextmanager
from LogHelper import LogHelper
2025-08-20 17:30:14 +08:00
# 基本用法(自动创建日期日志+控制台输出)
logger = LogHelper(logger_name = 'database').setup()
2025-08-18 14:05:59 +08:00
2025-08-20 17:30:14 +08:00
# # 高级用法(自定义配置)
# logger = LogHelper(
# level=logging.DEBUG,
# log_dir="databaselogs",
# format='%(levelname)s - %(message)s'
# ).setup()
2025-08-18 14:05:59 +08:00
2025-08-15 23:52:12 +08:00
class MySQLHelper:
def __init__(self, host: str, user: str, password: str, database: str,
port: int = 3306, charset: str = 'utf8mb4'):
"""
初始化MySQL连接参数
:param host: 数据库地址
:param user: 用户名
:param password: 密码
:param database: 数据库名
:param port: 端口默认3306
:param charset: 字符集默认utf8mb4
"""
self.host = host
self.user = user
self.password = password
self.database = database
self.port = port
self.charset = charset
self.connection = None
self.cursor = None
def connect(self) -> bool:
"""
连接到MySQL数据库
:return: 连接成功返回True失败返回False
"""
try:
self.connection = pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
port=self.port,
charset=self.charset,
cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
)
self.cursor = self.connection.cursor()
2025-08-20 17:30:14 +08:00
# logger.info("MySQL数据库连接成功")
2025-08-15 23:52:12 +08:00
return True
except Error as e:
2025-08-18 14:05:59 +08:00
logger.error(f"连接MySQL数据库失败: {e}")
2025-08-15 23:52:12 +08:00
return False
def close(self) -> None:
"""
关闭数据库连接
"""
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
2025-08-20 17:30:14 +08:00
# logger.info("MySQL数据库连接已关闭")
2025-08-15 23:52:12 +08:00
def execute_query(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> List[Dict]:
"""
执行查询操作
:param sql: SQL语句
:param params: 参数可以是元组列表或字典
:return: 查询结果列表
"""
try:
self.cursor.execute(sql, params)
return self.cursor.fetchall()
except Error as e:
2025-08-18 14:05:59 +08:00
logger.error(f"查询执行失败: {e}")
2025-08-15 23:52:12 +08:00
return []
def execute_update(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> int:
"""
执行更新操作INSERT/UPDATE/DELETE
:param sql: SQL语句
:param params: 参数可以是元组列表或字典
:return: 影响的行数
"""
try:
affected_rows = self.cursor.execute(sql, params)
self.connection.commit()
return affected_rows
except Error as e:
self.connection.rollback()
2025-08-18 14:05:59 +08:00
logger.error(f"更新执行失败: {e}")
2025-08-15 23:52:12 +08:00
return 0
def execute_many(self, sql: str, params_list: List[Union[Tuple, List, Dict]]) -> int:
"""
批量执行操作
:param sql: SQL语句
:param params_list: 参数列表
:return: 影响的行数
"""
try:
affected_rows = self.cursor.executemany(sql, params_list)
self.connection.commit()
return affected_rows
except Error as e:
self.connection.rollback()
2025-08-18 14:05:59 +08:00
logger.error(f"批量执行失败: {e}")
2025-08-15 23:52:12 +08:00
return 0
2025-08-18 14:05:59 +08:00
# ================== 新增功能方法 ==================
def get_last_insert_id(self) -> int:
"""
获取最后插入行的自增ID
:return: 自增ID值
"""
return self.cursor.lastrowid if self.cursor else 0
def execute_insert(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> int:
"""
执行插入操作并返回自增ID
:return: 自增ID值
"""
self.execute_update(sql, params)
return self.get_last_insert_id()
@contextmanager
def transaction(self):
"""
事务上下文管理器确保操作原子性
用法
with db.transaction():
db.execute_update(...)
db.execute_many(...)
"""
try:
yield
self.connection.commit()
except Exception as e:
self.connection.rollback()
raise e
2025-08-15 23:52:12 +08:00
def get_one(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> Optional[Dict]:
"""
获取单条记录
:param sql: SQL语句
:param params: 参数可以是元组列表或字典
:return: 单条记录或None
"""
try:
self.cursor.execute(sql, params)
return self.cursor.fetchone()
except Error as e:
2025-08-18 14:05:59 +08:00
logger.error(f"获取单条记录失败: {e}")
2025-08-15 23:52:12 +08:00
return None
def table_exists(self, table_name: str) -> bool:
"""
检查表是否存在
:param table_name: 表名
:return: 存在返回True否则返回False
"""
sql = "SHOW TABLES LIKE %s"
result = self.execute_query(sql, (table_name,))
return len(result) > 0
2025-08-18 14:05:59 +08:00
def create_table(self, sql: str) -> bool:
"""
执行建表语句
:param sql: CREATE TABLE语句
:return: 是否成功
"""
try:
self.cursor.execute(sql)
self.connection.commit()
return True
except Error as e:
logger.error(f"创建表失败: {e}")
return False
def drop_table(self, table_name: str) -> bool:
"""
删除表
:param table_name: 表名
:return: 是否成功
"""
try:
self.cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
self.connection.commit()
return True
except Error as e:
logger.error(f"删除表失败: {e}")
return False
def get_columns(self, table_name: str) -> List[Dict]:
"""
获取表的列信息
:param table_name: 表名
:return: 列信息字典列表
"""
return self.execute_query(f"DESCRIBE {table_name}")
# ================== 事务控制方法 ==================
def start_transaction(self):
"""显式开始事务"""
self.connection.begin()
def commit(self):
"""提交事务"""
self.connection.commit()
def rollback(self):
"""回滚事务"""
self.connection.rollback()
2025-08-15 23:52:12 +08:00
2025-08-18 14:05:59 +08:00
# ================== 上下文管理器 ==================
2025-08-15 23:52:12 +08:00
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()