Files
HKDataManagment/DataAnalysis/exportExcelToDB_HK.py
2025-08-22 11:20:41 +08:00

478 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
香港证券交易所下载的过票数据,导入数据库
包含所有的港股主板股票:
正在交易的、停牌的、人民币交易的
后续-> 优化日志输出
"""
import pandas as pd
import os
import sys
import csv
import chardet # 用于检测文件编码
from pathlib import Path
from datetime import datetime
import re
# 获取当前文件的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 获取项目根目录假设base文件夹在项目根目录下
project_root = os.path.dirname(current_dir)
# 将项目根目录添加到Python路径
sys.path.append(project_root)
from base.LogHelper import LogHelper
from base.MySQLHelper import MySQLHelper
logger = LogHelper(logger_name = 'HK_Import').setup()
class StockDataImporter:
"""股票数据导入工具支持CSV"""
COLUMN_MAPPING = {
'股票代码': 'stock_code',
'股票': 'stock_name',
'证券简称': 'short_name',
'更新时间': 'updated_at'
}
def __init__(self, data_dir: Path, db_config: dict):
self.data_dir = data_dir
self.db_config = db_config
self.df = None
self.csv_file = None
self.encoding = 'utf-8' # 默认编码
self.delimiter = ',' # 默认分隔符
def find_csv_file(self) -> Path:
"""在data文件夹中查找CSV文件"""
# 查找所有CSV文件
csv_files = list(self.data_dir.glob("香港股票列表.csv"))
if not csv_files:
logger.error(f"{self.data_dir} 中没有找到CSV文件")
return None
# 如果有多个文件,选择最新的
if len(csv_files) > 1:
csv_files.sort(key=os.path.getmtime, reverse=True)
logger.info(f"找到多个CSV文件选择最新的: {csv_files[0].name}")
return csv_files[0]
def validate_file(self, file_path: Path) -> bool:
"""验证CSV文件是否有效"""
try:
if not file_path.exists():
logger.error(f"CSV文件不存在: {file_path}")
return False
file_size = file_path.stat().st_size
if file_size == 0:
logger.error(f"CSV文件为空: {file_path}")
return False
return True
except Exception as e:
logger.error(f"文件验证失败: {e}")
return False
def detect_file_encoding(self, file_path: Path) -> str:
"""检测文件编码"""
try:
# 读取文件开头部分进行编码检测
with open(file_path, 'rb') as f:
raw_data = f.read(10000) # 读取前10KB
# 使用chardet检测编码
result = chardet.detect(raw_data)
encoding = result['encoding']
confidence = result['confidence']
# 常见编码替代
encoding_map = {
'GB2312': 'GBK',
'gb2312': 'GBK',
'ISO-8859-1': 'latin1',
'ascii': 'utf-8'
}
# 应用映射
encoding = encoding_map.get(encoding, encoding)
logger.info(f"检测到编码: {encoding} (置信度: {confidence:.2f})")
return encoding or 'utf-8'
except Exception as e:
logger.error(f"编码检测失败: {e}, 使用默认UTF-8")
return 'utf-8'
def detect_csv_delimiter(self, file_path: Path) -> str:
"""自动检测CSV分隔符"""
try:
# 使用检测到的编码打开文件
with open(file_path, 'r', encoding=self.encoding) as f:
# 读取前5行
lines = [f.readline() for _ in range(5) if f.readline()]
# 尝试常见分隔符
delimiters = [',', '\t', ';', '|']
delimiter_counts = {}
for delim in delimiters:
count = 0
for line in lines:
count += line.count(delim)
delimiter_counts[delim] = count
# 选择出现次数最多的分隔符
best_delim = max(delimiter_counts, key=delimiter_counts.get)
# 如果没有任何分隔符,则使用逗号
if delimiter_counts[best_delim] == 0:
logger.warning(f"无法检测到有效的分隔符,使用默认逗号分隔符")
return ','
logger.info(f"检测到分隔符: {repr(best_delim)}")
return best_delim
except Exception as e:
logger.error(f"检测分隔符失败: {e}, 使用默认逗号分隔符")
return ','
def read_csv_data(self, file_path: Path) -> bool:
"""从CSV文件读取数据"""
try:
# 1. 检测文件编码
self.encoding = self.detect_file_encoding(file_path)
# 2. 检测分隔符
self.delimiter = self.detect_csv_delimiter(file_path)
# 3. 读取CSV文件
logger.info(f"使用编码 '{self.encoding}' 和分隔符 '{self.delimiter}' 读取文件")
self.df = pd.read_csv(
file_path,
delimiter=self.delimiter,
dtype=str,
encoding=self.encoding,
on_bad_lines='warn',
quoting=csv.QUOTE_MINIMAL,
engine='python' # 更健壮的引擎
)
# 检查是否读取到数据
if self.df.empty:
logger.error("CSV文件没有包含有效数据")
return False
# 重命名列
self.df = self.df.rename(columns=self.COLUMN_MAPPING)
# 移除可能存在的空行
self.df = self.df.dropna(how='all')
logger.info(f"成功读取CSV数据{len(self.df)} 条记录")
return True
except UnicodeDecodeError:
# 尝试其他编码
encodings_to_try = ['GBK', 'latin1', 'ISO-8859-1', 'utf-16']
for enc in encodings_to_try:
try:
logger.warning(f"尝试使用 {enc} 编码读取文件")
self.df = pd.read_csv(
file_path,
delimiter=self.delimiter,
dtype=str,
encoding=enc
)
self.encoding = enc
logger.info(f"成功使用 {enc} 编码读取文件")
return True
except:
continue
logger.error("所有编码尝试均失败")
return False
except PermissionError:
logger.error(f"文件被占用,请关闭后重试: {file_path}")
return False
except Exception as e:
logger.error(f"读取CSV文件失败: {e}")
return False
# 定义一个函数来处理单个 stock_code
def format_stock_code(name, code):
# 将代码转换为字符串
code_str = str(code)
# 检查是否为纯数字
if re.match(r'^\d+$', code_str):
# 如果是纯数字转换为5位数字不足补0
formatted_code = f"HK.{int(code_str):05d}"
return formatted_code
else:
# 如果不是纯数字,保持
return code_str
def clean_stock_data(self) -> bool:
"""清洗股票数据"""
try:
# 处理股票代码:清理空行
self.df = self.df.dropna(subset=['stock_code'])
self.df = self.df[~self.df['stock_code'].astype(str).str.contains('停牌')]
self.df['stock_code'] = self.df['stock_code'].apply(self.format_stock_code)
# # 格式化上市日期
# self.df['listing_date'] = pd.to_datetime(
# self.df['listing_date'],
# format='%Y%m%d',
# errors='coerce'
# ).dt.strftime('%Y-%m-%d')
# # 检查日期转换是否成功
# date_na_count = self.df['listing_date'].isna().sum()
# if date_na_count > 0:
# logger.warning(f"发现 {date_na_count} 条记录的上市日期格式不正确")
# # 提取交易所信息
# self.df['exchange'] = self.df['a_stock_code'].apply(
# lambda x: 'SH' if str(x).startswith('60') else 'SZ' if str(x).startswith(('00', '30')) else 'OTHER'
# )
# # 验证A股代码格式
# invalid_codes = self.df[~self.df['a_stock_code'].astype(str).str.match(r'^\d{6}$')]
# if not invalid_codes.empty:
# logger.warning(f"发现 {len(invalid_codes)} 条无效的A股代码")
# logger.debug(f"无效代码示例: {invalid_codes['a_stock_code'].head().tolist()}")
logger.info("数据清洗完成")
return True
except Exception as e:
logger.error(f"数据清洗失败: {e}")
return False
def create_stocks_table(self, db: MySQLHelper) -> bool:
"""创建股票信息表"""
create_table_sql = """
CREATE TABLE IF NOT EXISTS stocks_hk (
stock_code VARCHAR(128) PRIMARY KEY COMMENT '股票代码',
stock_name VARCHAR(128) COMMENT '股票名称',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='港股主板股票列表';
"""
try:
db.execute_update(create_table_sql)
logger.info("股票信息打开成功")
return True
except Exception as e:
logger.error(f"创建表失败: {e}")
return False
def insert_data_to_db(self, db: MySQLHelper) -> bool:
"""将数据插入数据库"""
if self.df is None or self.df.empty:
logger.error("没有有效数据可插入")
return False
# 准备SQL语句支持重复记录更新
insert_sql = """
INSERT INTO stocks_hk (
stock_code, stock_name
) VALUES (
%s, %s
)
ON DUPLICATE KEY UPDATE
stock_name = VALUES(stock_name)
"""
# 准备参数列表
params_list = []
for _, row in self.df.iterrows():
if pd.notna(row['stock_code']):
params_list.append((
row['stock_code'],
row['stock_name'],
))
# 批量执行插入
try:
total_rows = len(params_list)
if total_rows == 0:
logger.error("没有有效数据可插入")
return False
batch_size = 1000 # 每批插入1000条记录
logger.info(f"开始插入数据,共 {total_rows} 条记录")
# 分批插入,避免大事务问题
for i in range(0, total_rows, batch_size):
batch_params = params_list[i:i+batch_size]
affected_rows = db.execute_many(insert_sql, batch_params)
logger.info(f"已处理 {min(i+batch_size, total_rows)}/{total_rows} 条记录")
logger.info(f"成功插入/更新 {total_rows} 条记录")
return True
except Exception as e:
logger.error(f"插入数据失败: {e}")
# 记录前5个参数以帮助调试
if params_list:
logger.debug(f"前5个参数示例: {params_list[:5]}")
return False
def verify_data_in_db(self, db: MySQLHelper, sample_size: int = 5) -> bool:
"""验证数据库中的数据"""
try:
# 检查记录总数
count_sql = "SELECT COUNT(*) AS total FROM stocks_hk"
result = db.execute_query(count_sql)
db_count = result[0]['total'] if result else 0
logger.info(f"数据库中共有 {db_count} 条记录")
# 随机抽样检查
sample_sql = f"""
SELECT stock_code, stock_name
FROM stocks_hk
ORDER BY RAND()
LIMIT {sample_size}
"""
samples = db.execute_query(sample_sql)
logger.info("\n随机抽样记录:")
for idx, sample in enumerate(samples, 1):
logger.info(f"{idx}. {sample['stock_code']}: {sample['stock_name']}")
return True
except Exception as e:
logger.error(f"数据验证失败: {e}")
return False
def run_import(self) -> bool:
"""执行完整的导入流程"""
logger.info(f"开始导入股票数据,数据目录: {self.data_dir}")
start_time = datetime.now()
# 1. 查找CSV文件
csv_file = self.find_csv_file()
if not csv_file:
return False
# 2. 验证文件
if not self.validate_file(csv_file):
return False
# 3. 读取CSV数据
if not self.read_csv_data(csv_file):
return False
# 4. 清洗数据
if not self.clean_stock_data():
return False
# 数据统计
# 确保stock_code列是字符串类型以便进行字符串操作
self.df['stock_code'] = self.df['stock_code'].astype(str)
# 统计stock_code中包含"停牌"的数量
suspended_count = self.df[self.df['stock_code'].str.contains('停牌')].shape[0]
# 统计stock_name中最后一个字符为"R"的数量
# 首先确保stock_name是字符串类型
self.df['stock_name'] = self.df['stock_name'].astype(str)
# 使用str.endswith()方法检查最后一个字符是否为R
r_ending_count = self.df[self.df['stock_name'].str.endswith('R')].shape[0]
logger.info(f"股票代码中包含'停牌'的数量: {suspended_count}")
logger.info(f"股票名称以'R'结尾的数量: {r_ending_count}")
# 如果你想查看具体是哪些记录满足条件
suspended_stocks = self.df[self.df['stock_code'].str.contains('停牌')]
r_ending_stocks = self.df[self.df['stock_name'].str.endswith('R')]
logger.info("\n股票代码中包含'停牌'的记录:")
logger.info(suspended_stocks)
logger.info("\n股票名称以'R'结尾的记录:")
logger.info(r_ending_stocks)
# 显示前5条数据
logger.info("\n前5条股票数据:")
for i, row in self.df.head().iterrows():
logger.info(f"{row['stock_code']}: {row['stock_name']}")
# 5. 连接数据库并导入
try:
with MySQLHelper(**self.db_config) as db:
# 5.1 创建表
if not self.create_stocks_table(db):
return False
# 5.2 插入数据
if not self.insert_data_to_db(db):
return False
# 5.3 验证数据
if not self.verify_data_in_db(db):
return False
except Exception as e:
logger.error(f"数据库操作异常: {e}")
return False
# 计算执行时间
duration = datetime.now() - start_time
logger.info(f"数据处理成功完成! 总耗时: {duration.total_seconds():.2f}")
return True
def read_stock_codes_list(file_path='Reservedcode.txt'):
"""基础读取方法 - 按行读取所有内容"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 去除每行末尾的换行符,并过滤空行
codes = [line.strip() for line in lines if line.strip()]
return codes
except FileNotFoundError:
print(f"文件 {file_path} 不存在")
return []
except Exception as e:
print(f"读取文件失败: {str(e)}")
return []
if __name__ == "__main__":
# 数据库配置
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'bzskmysql',
'database': 'hk_kline_1d'
}
# 获取当前脚本所在目录
current_dir = Path.cwd().parent
# 设置数据目录
DATA_DIR = current_dir /"HKDataManagment" / "data"
# 确保data目录存在
DATA_DIR.mkdir(exist_ok=True, parents=True)
# 安装依赖 (如果chardet未安装)
try:
import chardet
except ImportError:
logger.info("安装chardet库以支持编码检测...")
import subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "chardet"])
import chardet
# 创建导入器并执行导入
importer = StockDataImporter(DATA_DIR, db_config)
if importer.run_import():
logger.info("股票数据导入成功!")
else:
logger.error("股票数据导入失败,请检查日志了解详情")