Files
MADataManagment/MySQLHelper.py
2025-08-20 17:30:14 +08:00

236 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
MySqlHelper 增强版
—— 增加事务管理
—— 增加ID获取
—— 增加表操作等使用功能
—— 数据使用过程中出现异常时,才输出日志
"""
import pymysql
from pymysql import Error
from typing import List, Dict, Union, Optional, Tuple
from contextlib import contextmanager
from LogHelper import LogHelper
# 基本用法(自动创建日期日志+控制台输出)
logger = LogHelper(logger_name = 'database').setup()
# # 高级用法(自定义配置)
# logger = LogHelper(
# level=logging.DEBUG,
# log_dir="databaselogs",
# format='%(levelname)s - %(message)s'
# ).setup()
class MySQLHelper:
def __init__(self, host: str, user: str, password: str, database: str,
port: int = 3306, charset: str = 'utf8mb4'):
"""
初始化MySQL连接参数
:param host: 数据库地址
:param user: 用户名
:param password: 密码
:param database: 数据库名
:param port: 端口默认3306
:param charset: 字符集默认utf8mb4
"""
self.host = host
self.user = user
self.password = password
self.database = database
self.port = port
self.charset = charset
self.connection = None
self.cursor = None
def connect(self) -> bool:
"""
连接到MySQL数据库
:return: 连接成功返回True失败返回False
"""
try:
self.connection = pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
port=self.port,
charset=self.charset,
cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
)
self.cursor = self.connection.cursor()
# logger.info("MySQL数据库连接成功")
return True
except Error as e:
logger.error(f"连接MySQL数据库失败: {e}")
return False
def close(self) -> None:
"""
关闭数据库连接
"""
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
# logger.info("MySQL数据库连接已关闭")
def execute_query(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> List[Dict]:
"""
执行查询操作
:param sql: SQL语句
:param params: 参数,可以是元组、列表或字典
:return: 查询结果列表
"""
try:
self.cursor.execute(sql, params)
return self.cursor.fetchall()
except Error as e:
logger.error(f"查询执行失败: {e}")
return []
def execute_update(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> int:
"""
执行更新操作INSERT/UPDATE/DELETE
:param sql: SQL语句
:param params: 参数,可以是元组、列表或字典
:return: 影响的行数
"""
try:
affected_rows = self.cursor.execute(sql, params)
self.connection.commit()
return affected_rows
except Error as e:
self.connection.rollback()
logger.error(f"更新执行失败: {e}")
return 0
def execute_many(self, sql: str, params_list: List[Union[Tuple, List, Dict]]) -> int:
"""
批量执行操作
:param sql: SQL语句
:param params_list: 参数列表
:return: 影响的行数
"""
try:
affected_rows = self.cursor.executemany(sql, params_list)
self.connection.commit()
return affected_rows
except Error as e:
self.connection.rollback()
logger.error(f"批量执行失败: {e}")
return 0
# ================== 新增功能方法 ==================
def get_last_insert_id(self) -> int:
"""
获取最后插入行的自增ID
:return: 自增ID值
"""
return self.cursor.lastrowid if self.cursor else 0
def execute_insert(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> int:
"""
执行插入操作并返回自增ID
:return: 自增ID值
"""
self.execute_update(sql, params)
return self.get_last_insert_id()
@contextmanager
def transaction(self):
"""
事务上下文管理器,确保操作原子性
用法:
with db.transaction():
db.execute_update(...)
db.execute_many(...)
"""
try:
yield
self.connection.commit()
except Exception as e:
self.connection.rollback()
raise e
def get_one(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> Optional[Dict]:
"""
获取单条记录
:param sql: SQL语句
:param params: 参数,可以是元组、列表或字典
:return: 单条记录或None
"""
try:
self.cursor.execute(sql, params)
return self.cursor.fetchone()
except Error as e:
logger.error(f"获取单条记录失败: {e}")
return None
def table_exists(self, table_name: str) -> bool:
"""
检查表是否存在
:param table_name: 表名
:return: 存在返回True否则返回False
"""
sql = "SHOW TABLES LIKE %s"
result = self.execute_query(sql, (table_name,))
return len(result) > 0
def create_table(self, sql: str) -> bool:
"""
执行建表语句
:param sql: CREATE TABLE语句
:return: 是否成功
"""
try:
self.cursor.execute(sql)
self.connection.commit()
return True
except Error as e:
logger.error(f"创建表失败: {e}")
return False
def drop_table(self, table_name: str) -> bool:
"""
删除表
:param table_name: 表名
:return: 是否成功
"""
try:
self.cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
self.connection.commit()
return True
except Error as e:
logger.error(f"删除表失败: {e}")
return False
def get_columns(self, table_name: str) -> List[Dict]:
"""
获取表的列信息
:param table_name: 表名
:return: 列信息字典列表
"""
return self.execute_query(f"DESCRIBE {table_name}")
# ================== 事务控制方法 ==================
def start_transaction(self):
"""显式开始事务"""
self.connection.begin()
def commit(self):
"""提交事务"""
self.connection.commit()
def rollback(self):
"""回滚事务"""
self.connection.rollback()
# ================== 上下文管理器 ==================
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()