135 lines
4.3 KiB
Python
135 lines
4.3 KiB
Python
|
|
import pymysql
|
|||
|
|
from pymysql import Error
|
|||
|
|
from typing import List, Dict, Union, Optional, Tuple
|
|||
|
|
|
|||
|
|
class MySQLHelper:
|
|||
|
|
def __init__(self, host: str, user: str, password: str, database: str,
|
|||
|
|
port: int = 3306, charset: str = 'utf8mb4'):
|
|||
|
|
"""
|
|||
|
|
初始化MySQL连接参数
|
|||
|
|
:param host: 数据库地址
|
|||
|
|
:param user: 用户名
|
|||
|
|
:param password: 密码
|
|||
|
|
:param database: 数据库名
|
|||
|
|
:param port: 端口,默认3306
|
|||
|
|
:param charset: 字符集,默认utf8mb4
|
|||
|
|
"""
|
|||
|
|
self.host = host
|
|||
|
|
self.user = user
|
|||
|
|
self.password = password
|
|||
|
|
self.database = database
|
|||
|
|
self.port = port
|
|||
|
|
self.charset = charset
|
|||
|
|
self.connection = None
|
|||
|
|
self.cursor = None
|
|||
|
|
|
|||
|
|
def connect(self) -> bool:
|
|||
|
|
"""
|
|||
|
|
连接到MySQL数据库
|
|||
|
|
:return: 连接成功返回True,失败返回False
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
self.connection = pymysql.connect(
|
|||
|
|
host=self.host,
|
|||
|
|
user=self.user,
|
|||
|
|
password=self.password,
|
|||
|
|
database=self.database,
|
|||
|
|
port=self.port,
|
|||
|
|
charset=self.charset,
|
|||
|
|
cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果
|
|||
|
|
)
|
|||
|
|
self.cursor = self.connection.cursor()
|
|||
|
|
print("MySQL数据库连接成功")
|
|||
|
|
return True
|
|||
|
|
except Error as e:
|
|||
|
|
print(f"连接MySQL数据库失败: {e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def close(self) -> None:
|
|||
|
|
"""
|
|||
|
|
关闭数据库连接
|
|||
|
|
"""
|
|||
|
|
if self.cursor:
|
|||
|
|
self.cursor.close()
|
|||
|
|
if self.connection:
|
|||
|
|
self.connection.close()
|
|||
|
|
print("MySQL数据库连接已关闭")
|
|||
|
|
|
|||
|
|
def execute_query(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> List[Dict]:
|
|||
|
|
"""
|
|||
|
|
执行查询操作
|
|||
|
|
:param sql: SQL语句
|
|||
|
|
:param params: 参数,可以是元组、列表或字典
|
|||
|
|
:return: 查询结果列表
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
self.cursor.execute(sql, params)
|
|||
|
|
return self.cursor.fetchall()
|
|||
|
|
except Error as e:
|
|||
|
|
print(f"查询执行失败: {e}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
def execute_update(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> int:
|
|||
|
|
"""
|
|||
|
|
执行更新操作(INSERT/UPDATE/DELETE)
|
|||
|
|
:param sql: SQL语句
|
|||
|
|
:param params: 参数,可以是元组、列表或字典
|
|||
|
|
:return: 影响的行数
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
affected_rows = self.cursor.execute(sql, params)
|
|||
|
|
self.connection.commit()
|
|||
|
|
return affected_rows
|
|||
|
|
except Error as e:
|
|||
|
|
self.connection.rollback()
|
|||
|
|
print(f"更新执行失败: {e}")
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
def execute_many(self, sql: str, params_list: List[Union[Tuple, List, Dict]]) -> int:
|
|||
|
|
"""
|
|||
|
|
批量执行操作
|
|||
|
|
:param sql: SQL语句
|
|||
|
|
:param params_list: 参数列表
|
|||
|
|
:return: 影响的行数
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
affected_rows = self.cursor.executemany(sql, params_list)
|
|||
|
|
self.connection.commit()
|
|||
|
|
return affected_rows
|
|||
|
|
except Error as e:
|
|||
|
|
self.connection.rollback()
|
|||
|
|
print(f"批量执行失败: {e}")
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
def get_one(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> Optional[Dict]:
|
|||
|
|
"""
|
|||
|
|
获取单条记录
|
|||
|
|
:param sql: SQL语句
|
|||
|
|
:param params: 参数,可以是元组、列表或字典
|
|||
|
|
:return: 单条记录或None
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
self.cursor.execute(sql, params)
|
|||
|
|
return self.cursor.fetchone()
|
|||
|
|
except Error as e:
|
|||
|
|
print(f"获取单条记录失败: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def table_exists(self, table_name: str) -> bool:
|
|||
|
|
"""
|
|||
|
|
检查表是否存在
|
|||
|
|
:param table_name: 表名
|
|||
|
|
:return: 存在返回True,否则返回False
|
|||
|
|
"""
|
|||
|
|
sql = "SHOW TABLES LIKE %s"
|
|||
|
|
result = self.execute_query(sql, (table_name,))
|
|||
|
|
return len(result) > 0
|
|||
|
|
|
|||
|
|
def __enter__(self):
|
|||
|
|
"""支持with上下文管理"""
|
|||
|
|
self.connect()
|
|||
|
|
return self
|
|||
|
|
|
|||
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|||
|
|
"""支持with上下文管理"""
|
|||
|
|
self.close()
|