Files
MADataManagment/MySQLHelper.py

135 lines
4.3 KiB
Python
Raw Normal View History

2025-08-15 23:52:12 +08:00
import pymysql
from pymysql import Error
from typing import List, Dict, Union, Optional, Tuple
class MySQLHelper:
def __init__(self, host: str, user: str, password: str, database: str,
port: int = 3306, charset: str = 'utf8mb4'):
"""
初始化MySQL连接参数
:param host: 数据库地址
:param user: 用户名
:param password: 密码
:param database: 数据库名
:param port: 端口默认3306
:param charset: 字符集默认utf8mb4
"""
self.host = host
self.user = user
self.password = password
self.database = database
self.port = port
self.charset = charset
self.connection = None
self.cursor = None
def connect(self) -> bool:
"""
连接到MySQL数据库
:return: 连接成功返回True失败返回False
"""
try:
self.connection = pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
port=self.port,
charset=self.charset,
cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
)
self.cursor = self.connection.cursor()
print("MySQL数据库连接成功")
return True
except Error as e:
print(f"连接MySQL数据库失败: {e}")
return False
def close(self) -> None:
"""
关闭数据库连接
"""
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
print("MySQL数据库连接已关闭")
def execute_query(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> List[Dict]:
"""
执行查询操作
:param sql: SQL语句
:param params: 参数可以是元组列表或字典
:return: 查询结果列表
"""
try:
self.cursor.execute(sql, params)
return self.cursor.fetchall()
except Error as e:
print(f"查询执行失败: {e}")
return []
def execute_update(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> int:
"""
执行更新操作INSERT/UPDATE/DELETE
:param sql: SQL语句
:param params: 参数可以是元组列表或字典
:return: 影响的行数
"""
try:
affected_rows = self.cursor.execute(sql, params)
self.connection.commit()
return affected_rows
except Error as e:
self.connection.rollback()
print(f"更新执行失败: {e}")
return 0
def execute_many(self, sql: str, params_list: List[Union[Tuple, List, Dict]]) -> int:
"""
批量执行操作
:param sql: SQL语句
:param params_list: 参数列表
:return: 影响的行数
"""
try:
affected_rows = self.cursor.executemany(sql, params_list)
self.connection.commit()
return affected_rows
except Error as e:
self.connection.rollback()
print(f"批量执行失败: {e}")
return 0
def get_one(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> Optional[Dict]:
"""
获取单条记录
:param sql: SQL语句
:param params: 参数可以是元组列表或字典
:return: 单条记录或None
"""
try:
self.cursor.execute(sql, params)
return self.cursor.fetchone()
except Error as e:
print(f"获取单条记录失败: {e}")
return None
def table_exists(self, table_name: str) -> bool:
"""
检查表是否存在
:param table_name: 表名
:return: 存在返回True否则返回False
"""
sql = "SHOW TABLES LIKE %s"
result = self.execute_query(sql, (table_name,))
return len(result) > 0
def __enter__(self):
"""支持with上下文管理"""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""支持with上下文管理"""
self.close()