update log file

This commit is contained in:
2025-08-18 14:05:59 +08:00
parent d38303698b
commit e51d6ee653
6 changed files with 220 additions and 221 deletions

View File

@@ -1,119 +1,37 @@
"""
读取上海证券交易所官网股票列表数据写入数据库
存储上海证券交易股票列表数据
上海和深圳拿到的数据表头不一样,所以分开解析和存储
不确定其数据爬取规则,防止 IP 被封
暂时使用该方案,获取股票列表数据
—— 下载excel,收到导入到数据库
"""
import pandas as pd
import pymysql
from pymysql import Error
from pathlib import Path
import os
import logging
from datetime import datetime
import sys
import csv
import chardet # 用于检测文件编码
from typing import List, Dict, Union, Tuple, Optional
from pathlib import Path
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('Debug.log', encoding='utf-8'), # 关键在这里
logging.StreamHandler()
]
from MySQLHelper import MySQLHelper
from LogHelper import LogHelper
# 创建配置实例
logHelper = LogHelper(
level=logging.DEBUG, # 设置日志级别为 DEBUG
format='%(asctime)s [%(levelname)s] %(message)s' # 自定义格式
)
# # 添加处理器
logHelper.add_console_handler() # 默认输出到 stdout
logHelper.add_file_handler('Debug.log') # 添加文件日志
# # 应用配置
logHelper.setup()
logger = logging.getLogger('StockDataImporter')
class MySQLHelper:
"""MySQL 数据库操作助手类"""
def __init__(self, host: str, user: str, password: str, database: str,
port: int = 3306, charset: str = 'utf8mb4'):
self.host = host
self.user = user
self.password = password
self.database = database
self.port = port
self.charset = charset
self.connection = None
self.cursor = None
def connect(self) -> bool:
try:
self.connection = pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
port=self.port,
charset=self.charset,
cursorclass=pymysql.cursors.DictCursor
)
self.cursor = self.connection.cursor()
logger.info("MySQL数据库连接成功")
return True
except Error as e:
logger.error(f"连接MySQL数据库失败: {e}")
return False
def close(self) -> None:
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
logger.info("MySQL数据库连接已关闭")
def execute_query(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> List[Dict]:
try:
self.cursor.execute(sql, params)
return self.cursor.fetchall()
except Error as e:
logger.error(f"查询执行失败: {e}")
return []
def execute_update(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> int:
try:
affected_rows = self.cursor.execute(sql, params)
self.connection.commit()
return affected_rows
except Error as e:
self.connection.rollback()
logger.error(f"更新执行失败: {e}")
return 0
def execute_many(self, sql: str, params_list: List[Union[Tuple, List, Dict]]) -> int:
try:
affected_rows = self.cursor.executemany(sql, params_list)
self.connection.commit()
return affected_rows
except Error as e:
self.connection.rollback()
logger.error(f"批量执行失败: {e}")
return 0
def get_one(self, sql: str, params: Union[Tuple, List, Dict, None] = None) -> Optional[Dict]:
try:
self.cursor.execute(sql, params)
return self.cursor.fetchone()
except Error as e:
logger.error(f"获取单条记录失败: {e}")
return None
def table_exists(self, table_name: str) -> bool:
sql = "SHOW TABLES LIKE %s"
result = self.execute_query(sql, (table_name,))
return len(result) > 0
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class StockDataImporter:
"""股票数据导入工具支持CSV"""