diff --git a/.gitignore b/.gitignore index e695a2e..0656e59 100644 --- a/.gitignore +++ b/.gitignore @@ -266,3 +266,4 @@ compile_commands.json data/ +log/ diff --git a/LogHelper.py b/LogHelper.py new file mode 100644 index 0000000..31037a0 --- /dev/null +++ b/LogHelper.py @@ -0,0 +1,63 @@ +import logging +import sys + +class LogHelper: + def __init__(self, + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=None): + """ + 初始化日志配置 + + :param level: 日志级别,默认为 logging.INFO + :param format: 日志格式字符串 + :param handlers: 日志处理器列表,默认为空(会自动添加控制台处理器) + """ + self.level = level + self.format = format + self.handlers = handlers if handlers is not None else [] + + def add_console_handler(self, stream=sys.stdout, encoding='utf-8'): + """添加控制台处理器""" + console_handler = logging.StreamHandler(stream) + console_handler.setFormatter(logging.Formatter(self.format)) + # 设置控制台编码 + if encoding: + console_handler.encoding = encoding + self.handlers.append(console_handler) + + def add_file_handler(self, filename, encoding='utf-8'): + """添加文件处理器(解决中文乱码问题)""" + # 使用支持UTF-8编码的文件处理器 + filepath = "log/" + filename + file_handler = logging.FileHandler(filepath, encoding=encoding) + file_handler.setFormatter(logging.Formatter(self.format)) + self.handlers.append(file_handler) + + def setup(self): + """应用日志配置""" + logging.basicConfig( + level=self.level, + format=self.format, + handlers=self.handlers + ) + +# # 使用示例 +# if __name__ == "__main__": +# # 创建配置实例 +# logger_config = LoggerConfig( +# level=logging.DEBUG, # 设置日志级别为 DEBUG +# format='%(asctime)s [%(levelname)s] %(message)s' # 自定义格式 +# ) + +# # # 添加处理器 +# logger_config.add_console_handler() # 默认输出到 stdout +# logger_config.add_file_handler('Debug.log') # 添加文件日志 + +# # # 应用配置 +# logger_config.setup() + +# # 测试日志 +# logging.debug("Debug 信息") +# logging.info("Info 信息") +# logging.warning("警告信息") \ No newline at end of file diff --git a/MySQLHelper.py b/MySQLHelper.py index 8055872..10d5c70 100644 --- a/MySQLHelper.py +++ b/MySQLHelper.py @@ -1,6 +1,29 @@ +""" + MySqlHelper 增强版 + —— 增加事务管理 + —— 增加ID获取 + —— 增加表操作等使用功能 +""" import pymysql from pymysql import Error from typing import List, Dict, Union, Optional, Tuple +from contextlib import contextmanager +from LogHelper import LogHelper +import logging + +# 创建配置实例 +logHelper = LogHelper( + level=logging.DEBUG, # 设置日志级别为 DEBUG + format='%(asctime)s [%(levelname)s] %(message)s' # 自定义格式 +) + +# # 添加处理器 +logHelper.add_console_handler() # 默认输出到 stdout +logHelper.add_file_handler('Debug.log') # 添加文件日志 + +# # 应用配置 +logHelper.setup() +logger = logging.getLogger('StockDataImporter') class MySQLHelper: def __init__(self, host: str, user: str, password: str, database: str, @@ -39,10 +62,10 @@ class MySQLHelper: cursorclass=pymysql.cursors.DictCursor # 返回字典形式的结果 ) self.cursor = self.connection.cursor() - print("MySQL数据库连接成功") + logger.info("MySQL数据库连接成功") return True except Error as e: - print(f"连接MySQL数据库失败: {e}") + logger.error(f"连接MySQL数据库失败: {e}") return False def close(self) -> None: @@ -53,7 +76,7 @@ class MySQLHelper: self.cursor.close() if self.connection: self.connection.close() - print("MySQL数据库连接已关闭") + logger.info("MySQL数据库连接已关闭") def execute_query(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> List[Dict]: """ @@ -66,7 +89,7 @@ class MySQLHelper: self.cursor.execute(sql, params) return self.cursor.fetchall() except Error as e: - print(f"查询执行失败: {e}") + logger.error(f"查询执行失败: {e}") return [] def execute_update(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> int: @@ -82,7 +105,7 @@ class MySQLHelper: return affected_rows except Error as e: self.connection.rollback() - print(f"更新执行失败: {e}") + logger.error(f"更新执行失败: {e}") return 0 def execute_many(self, sql: str, params_list: List[Union[Tuple, List, Dict]]) -> int: @@ -98,9 +121,41 @@ class MySQLHelper: return affected_rows except Error as e: self.connection.rollback() - print(f"批量执行失败: {e}") + logger.error(f"批量执行失败: {e}") return 0 + # ================== 新增功能方法 ================== + def get_last_insert_id(self) -> int: + """ + 获取最后插入行的自增ID + :return: 自增ID值 + """ + return self.cursor.lastrowid if self.cursor else 0 + + def execute_insert(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> int: + """ + 执行插入操作并返回自增ID + :return: 自增ID值 + """ + self.execute_update(sql, params) + return self.get_last_insert_id() + + @contextmanager + def transaction(self): + """ + 事务上下文管理器,确保操作原子性 + 用法: + with db.transaction(): + db.execute_update(...) + db.execute_many(...) + """ + try: + yield + self.connection.commit() + except Exception as e: + self.connection.rollback() + raise e + def get_one(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> Optional[Dict]: """ 获取单条记录 @@ -112,7 +167,7 @@ class MySQLHelper: self.cursor.execute(sql, params) return self.cursor.fetchone() except Error as e: - print(f"获取单条记录失败: {e}") + logger.error(f"获取单条记录失败: {e}") return None def table_exists(self, table_name: str) -> bool: @@ -124,12 +179,61 @@ class MySQLHelper: sql = "SHOW TABLES LIKE %s" result = self.execute_query(sql, (table_name,)) return len(result) > 0 + + def create_table(self, sql: str) -> bool: + """ + 执行建表语句 + :param sql: CREATE TABLE语句 + :return: 是否成功 + """ + try: + self.cursor.execute(sql) + self.connection.commit() + return True + except Error as e: + logger.error(f"创建表失败: {e}") + return False + + def drop_table(self, table_name: str) -> bool: + """ + 删除表 + :param table_name: 表名 + :return: 是否成功 + """ + try: + self.cursor.execute(f"DROP TABLE IF EXISTS {table_name}") + self.connection.commit() + return True + except Error as e: + logger.error(f"删除表失败: {e}") + return False + def get_columns(self, table_name: str) -> List[Dict]: + """ + 获取表的列信息 + :param table_name: 表名 + :return: 列信息字典列表 + """ + return self.execute_query(f"DESCRIBE {table_name}") + + +# ================== 事务控制方法 ================== + def start_transaction(self): + """显式开始事务""" + self.connection.begin() + + def commit(self): + """提交事务""" + self.connection.commit() + + def rollback(self): + """回滚事务""" + self.connection.rollback() + + # ================== 上下文管理器 ================== def __enter__(self): - """支持with上下文管理""" self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): - """支持with上下文管理""" self.close() \ No newline at end of file diff --git a/exportExcelToDB_SH.py b/exportExcelToDB_SH.py index abd8703..06484a2 100644 --- a/exportExcelToDB_SH.py +++ b/exportExcelToDB_SH.py @@ -1,119 +1,37 @@ """ - 读取上海证券交易所官网股票列表数据写入数据库 + 存储上海证券交易股票列表数据 - 上海和深圳拿到的数据表头不一样,所以分开解析和存储 + 不确定其数据爬取规则,防止 IP 被封 + 暂时使用该方案,获取股票列表数据 + —— 下载excel,收到导入到数据库 """ import pandas as pd -import pymysql -from pymysql import Error -from pathlib import Path import os import logging -from datetime import datetime import sys import csv import chardet # 用于检测文件编码 -from typing import List, Dict, Union, Tuple, Optional +from pathlib import Path +from datetime import datetime -# 配置日志 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[ - logging.FileHandler('Debug.log', encoding='utf-8'), # 关键在这里 - logging.StreamHandler() - ] +from MySQLHelper import MySQLHelper +from LogHelper import LogHelper + +# 创建配置实例 +logHelper = LogHelper( + level=logging.DEBUG, # 设置日志级别为 DEBUG + format='%(asctime)s [%(levelname)s] %(message)s' # 自定义格式 ) +# # 添加处理器 +logHelper.add_console_handler() # 默认输出到 stdout +logHelper.add_file_handler('Debug.log') # 添加文件日志 + +# # 应用配置 +logHelper.setup() logger = logging.getLogger('StockDataImporter') -class MySQLHelper: - """MySQL 数据库操作助手类""" - def __init__(self, host: str, user: str, password: str, database: str, - port: int = 3306, charset: str = 'utf8mb4'): - self.host = host - self.user = user - self.password = password - self.database = database - self.port = port - self.charset = charset - self.connection = None - self.cursor = None - - def connect(self) -> bool: - try: - self.connection = pymysql.connect( - host=self.host, - user=self.user, - password=self.password, - database=self.database, - port=self.port, - charset=self.charset, - cursorclass=pymysql.cursors.DictCursor - ) - self.cursor = self.connection.cursor() - logger.info("MySQL数据库连接成功") - return True - except Error as e: - logger.error(f"连接MySQL数据库失败: {e}") - return False - - def close(self) -> None: - if self.cursor: - self.cursor.close() - if self.connection: - self.connection.close() - logger.info("MySQL数据库连接已关闭") - - def execute_query(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> List[Dict]: - try: - self.cursor.execute(sql, params) - return self.cursor.fetchall() - except Error as e: - logger.error(f"查询执行失败: {e}") - return [] - - def execute_update(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> int: - try: - affected_rows = self.cursor.execute(sql, params) - self.connection.commit() - return affected_rows - except Error as e: - self.connection.rollback() - logger.error(f"更新执行失败: {e}") - return 0 - - def execute_many(self, sql: str, params_list: List[Union[Tuple, List, Dict]]) -> int: - try: - affected_rows = self.cursor.executemany(sql, params_list) - self.connection.commit() - return affected_rows - except Error as e: - self.connection.rollback() - logger.error(f"批量执行失败: {e}") - return 0 - - def get_one(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> Optional[Dict]: - try: - self.cursor.execute(sql, params) - return self.cursor.fetchone() - except Error as e: - logger.error(f"获取单条记录失败: {e}") - return None - - def table_exists(self, table_name: str) -> bool: - sql = "SHOW TABLES LIKE %s" - result = self.execute_query(sql, (table_name,)) - return len(result) > 0 - - def __enter__(self): - self.connect() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - class StockDataImporter: """股票数据导入工具(支持CSV)""" diff --git a/exportExcelToDB_SZ.py b/exportExcelToDB_SZ.py index 138cb88..cf84173 100644 --- a/exportExcelToDB_SZ.py +++ b/exportExcelToDB_SZ.py @@ -1,118 +1,36 @@ """ - 下载的深圳交易所的数据 + 存储深圳交易所股票列表数据 - 表头和上海交易所的数据格式不一致,所以分开存储 + 不确定其数据爬取规则,防止 IP 被封 + 暂时使用该方案,获取股票列表数据 + —— 下载excel,收到导入到数据库 """ -import pandas as pd -import pymysql -from pymysql import Error from pathlib import Path +from datetime import datetime +from MySQLHelper import MySQLHelper +from LogHelper import LogHelper +import pandas as pd import os import logging -from datetime import datetime import sys import csv import chardet -import re -from typing import List, Dict, Union, Tuple, Optional -# 配置日志 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - handlers=[ - logging.StreamHandler(sys.stdout), - logging.FileHandler('stock_data_import.log') - ] + +# 创建配置实例 +logHelper = LogHelper( + level=logging.DEBUG, # 设置日志级别为 DEBUG + format='%(asctime)s [%(levelname)s] %(message)s' # 自定义格式 ) + +# # 添加处理器 +logHelper.add_console_handler() # 默认输出到 stdout +logHelper.add_file_handler('Debug.log') # 添加文件日志 + +# # 应用配置 +logHelper.setup() logger = logging.getLogger('StockDataImporter') -class MySQLHelper: - """MySQL 数据库操作助手类""" - def __init__(self, host: str, user: str, password: str, database: str, - port: int = 3306, charset: str = 'utf8mb4'): - self.host = host - self.user = user - self.password = password - self.database = database - self.port = port - self.charset = charset - self.connection = None - self.cursor = None - - def connect(self) -> bool: - try: - self.connection = pymysql.connect( - host=self.host, - user=self.user, - password=self.password, - database=self.database, - port=self.port, - charset=self.charset, - cursorclass=pymysql.cursors.DictCursor - ) - self.cursor = self.connection.cursor() - logger.info("MySQL数据库连接成功") - return True - except Error as e: - logger.error(f"连接MySQL数据库失败: {e}") - return False - - def close(self) -> None: - if self.cursor: - self.cursor.close() - if self.connection: - self.connection.close() - logger.info("MySQL数据库连接已关闭") - - def execute_query(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> List[Dict]: - try: - self.cursor.execute(sql, params) - return self.cursor.fetchall() - except Error as e: - logger.error(f"查询执行失败: {e}") - return [] - - def execute_update(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> int: - try: - affected_rows = self.cursor.execute(sql, params) - self.connection.commit() - return affected_rows - except Error as e: - self.connection.rollback() - logger.error(f"更新执行失败: {e}") - return 0 - - def execute_many(self, sql: str, params_list: List[Union[Tuple, List, Dict]]) -> int: - try: - affected_rows = self.cursor.executemany(sql, params_list) - self.connection.commit() - return affected_rows - except Error as e: - self.connection.rollback() - logger.error(f"批量执行失败: {e}") - return 0 - - def get_one(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> Optional[Dict]: - try: - self.cursor.execute(sql, params) - return self.cursor.fetchone() - except Error as e: - logger.error(f"获取单条记录失败: {e}") - return None - - def table_exists(self, table_name: str) -> bool: - sql = "SHOW TABLES LIKE %s" - result = self.execute_query(sql, (table_name,)) - return len(result) > 0 - - def __enter__(self): - self.connect() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - class StockDataImporter: """股票数据导入工具(支持新版CSV格式)""" @@ -128,13 +46,13 @@ class StockDataImporter: 'A股总股本': 'a_total_shares', 'A股流通股本': 'a_circulating_shares', 'B股代码': 'b_stock_code', - 'B股简称': 'b_stock_short_name', + 'B股 简 称': 'b_stock_short_name', 'B股上市日期': 'b_listing_date', 'B股总股本': 'b_total_shares', 'B股流通股本': 'b_circulating_shares', - '地区': 'region', - '省份': 'province', - '城市': 'city', + '地 区': 'region', + '省 份': 'province', + '城 市': 'city', '所属行业': 'industry', '公司网址': 'website', '未盈利': 'unprofitable', diff --git a/getStockList.py b/getStockList.py deleted file mode 100644 index 2f4ec78..0000000 --- a/getStockList.py +++ /dev/null @@ -1,5 +0,0 @@ -import akshare as ak - -# 获取沪深 A 股股票列表 -stock_list = ak.stock_info_a_code_name() -print(stock_list) \ No newline at end of file