146 lines
5.6 KiB
Python
146 lines
5.6 KiB
Python
"""
|
|
检查K线数据是否下载完成
|
|
|
|
检测表格中的股票是否都有对应的表格
|
|
"""
|
|
from typing import List
|
|
from base import MySQLHelper
|
|
from base import LogHelper
|
|
|
|
# 基本用法(自动创建日期日志+控制台输出)
|
|
logger = LogHelper(logger_name = 'checkTable').setup()
|
|
|
|
class StockTableChecker:
|
|
def __init__(self, db_config: dict):
|
|
self.db_config = db_config
|
|
self.stock_list_table = "stock_filter"
|
|
|
|
def get_stock_codes(self) -> List[str]:
|
|
"""从conditionalselection表获取所有股票代码"""
|
|
try:
|
|
with MySQLHelper(**self.db_config) as db:
|
|
sql = f"SELECT DISTINCT stock_code FROM {self.stock_list_table}"
|
|
results = db.execute_query(sql)
|
|
return [row['stock_code'] for row in results if row['stock_code']]
|
|
except Exception as e:
|
|
logger.error(f"获取股票代码失败: {str(e)}")
|
|
return []
|
|
|
|
def get_stock_hk(self) -> List[str]:
|
|
"""从conditionalselection表获取所有股票代码"""
|
|
try:
|
|
with MySQLHelper(**self.db_config) as db:
|
|
sql = f"SELECT DISTINCT stock_code FROM stocks_hk"
|
|
results = db.execute_query(sql)
|
|
return [row['stock_code'] for row in results if row['stock_code']]
|
|
except Exception as e:
|
|
logger.error(f"获取股票代码失败: {str(e)}")
|
|
return []
|
|
|
|
def check_tables_exist(self, stock_codes: List[str]) -> dict:
|
|
"""
|
|
检查数据库中是否存在与股票代码同名的表
|
|
返回: {'exists': [], 'not_exists': []}
|
|
"""
|
|
if not stock_codes:
|
|
return {'exists': [], 'not_exists': []}
|
|
|
|
try:
|
|
with MySQLHelper(**self.db_config) as db:
|
|
# 获取数据库中所有表名
|
|
sql = "SHOW TABLES"
|
|
tables = [list(row.values())[0] for row in db.execute_query(sql)]
|
|
|
|
# 检查每个股票代码对应的表是否存在
|
|
result = {'exists': [], 'not_exists': []}
|
|
for code in stock_codes:
|
|
# 处理股票代码中的特殊字符,例如将'.'替换为'_'
|
|
table_name = 'hk_' + code[3:]
|
|
if table_name in tables:
|
|
result['exists'].append(code)
|
|
else:
|
|
result['not_exists'].append(code)
|
|
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"检查表存在性失败: {str(e)}")
|
|
return {'exists': [], 'not_exists': []}
|
|
|
|
def write_missing_codes_to_txt(self, missing_codes: list, filename: str = "data\missing_tables.txt"):
|
|
"""将缺失的股票代码写入TXT文件"""
|
|
try:
|
|
with open(filename, 'w', encoding='utf-8') as f:
|
|
for code in missing_codes:
|
|
f.write(f"{code}\n")
|
|
logger.info(f"已将 {len(missing_codes)} 个缺失表对应的股票代码写入 {filename}")
|
|
except Exception as e:
|
|
logger.error(f"写入TXT文件失败: {str(e)}")
|
|
|
|
def read_missing_codes_basic(file_path='missing_tables.txt'):
|
|
"""基础读取方法 - 按行读取所有内容"""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
# 去除每行末尾的换行符,并过滤空行
|
|
codes = [line.strip() for line in lines if line.strip()]
|
|
return codes
|
|
except FileNotFoundError:
|
|
print(f"文件 {file_path} 不存在")
|
|
return []
|
|
except Exception as e:
|
|
print(f"读取文件失败: {str(e)}")
|
|
return []
|
|
|
|
|
|
def run_check(self):
|
|
"""执行完整的检查流程"""
|
|
logger.info("开始检查股票代码对应表...")
|
|
|
|
# 1. 获取所有股票代码
|
|
stock_codes = self.get_stock_codes()
|
|
if not stock_codes:
|
|
logger.error("没有获取到任何股票代码")
|
|
return
|
|
|
|
logger.info(f"共获取到 {len(stock_codes)} 个股票代码")
|
|
|
|
stockhk = self.get_stock_hk()
|
|
# stockhk_ = stockhk[~stockhk['stock_name'].str[-1].eq('R')]
|
|
market_data_new = list(set(stockhk) - set(stock_codes))
|
|
market_data_new2= list(set(stock_codes) - set(stockhk))
|
|
|
|
# 2. 检查表存在性
|
|
check_result = self.check_tables_exist(stock_codes)
|
|
exists_count = len(check_result['exists'])
|
|
not_exists_count = len(check_result['not_exists'])
|
|
|
|
# 3. 输出结果
|
|
logger.info("\n检查结果:")
|
|
logger.info(f"存在的表数量: {exists_count}")
|
|
logger.info(f"不存在的表数量: {not_exists_count}")
|
|
|
|
if not_exists_count > 0:
|
|
logger.info("\n不存在的表对应的股票代码:")
|
|
for code in check_result['not_exists']:
|
|
logger.info(code)
|
|
|
|
# 4. 统计信息
|
|
logger.info("\n统计摘要:")
|
|
logger.info(f"总股票代码数: {len(stock_codes)}")
|
|
logger.info(f"存在对应表的比例: {exists_count/len(stock_codes):.2%}")
|
|
logger.info(f"缺失对应表的比例: {not_exists_count/len(stock_codes):.2%}")
|
|
|
|
# 4. 将缺失的股票代码写入TXT文件
|
|
if check_result['not_exists']:
|
|
self.write_missing_codes_to_txt(check_result['not_exists'])
|
|
|
|
|
|
# 数据库配置
|
|
db_config = {
|
|
'host': 'localhost',
|
|
'user': 'root',
|
|
'password': 'bzskmysql',
|
|
'database': 'hk_kline_1d'
|
|
}
|
|
|