""" 检查K线数据是否下载完成 """ import logging from typing import List from MySQLHelper import MySQLHelper # 假设您已有MySQLHelper类 # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('Debug.log', encoding='utf-8'), # 关键在这里 logging.StreamHandler() ] ) class StockTableChecker: def __init__(self, db_config: dict): self.db_config = db_config self.conditional_selection_table = "conditionalselection" def get_stock_codes(self) -> List[str]: """从conditionalselection表获取所有股票代码""" try: with MySQLHelper(**self.db_config) as db: sql = f"SELECT DISTINCT stock_code FROM {self.conditional_selection_table}" results = db.execute_query(sql) return [row['stock_code'] for row in results if row['stock_code']] except Exception as e: logging.error(f"获取股票代码失败: {str(e)}") return [] def check_tables_exist(self, stock_codes: List[str]) -> dict: """ 检查数据库中是否存在与股票代码同名的表 返回: {'exists': [], 'not_exists': []} """ if not stock_codes: return {'exists': [], 'not_exists': []} try: with MySQLHelper(**self.db_config) as db: # 获取数据库中所有表名 sql = "SHOW TABLES" tables = [list(row.values())[0] for row in db.execute_query(sql)] # 检查每个股票代码对应的表是否存在 result = {'exists': [], 'not_exists': []} for code in stock_codes: # 处理股票代码中的特殊字符,例如将'.'替换为'_' table_name = 'hk_' + code[3:] if table_name in tables: result['exists'].append(code) else: result['not_exists'].append(code) return result except Exception as e: logging.error(f"检查表存在性失败: {str(e)}") return {'exists': [], 'not_exists': []} def write_missing_codes_to_txt(self, missing_codes: list, filename: str = "missing_tables.txt"): """将缺失的股票代码写入TXT文件""" try: with open(filename, 'w', encoding='utf-8') as f: for code in missing_codes: f.write(f"{code}\n") logging.info(f"已将 {len(missing_codes)} 个缺失表对应的股票代码写入 {filename}") except Exception as e: logging.error(f"写入TXT文件失败: {str(e)}") def read_missing_codes_basic(file_path='missing_tables.txt'): """基础读取方法 - 按行读取所有内容""" try: with open(file_path, 'r', encoding='utf-8') as f: lines = f.readlines() # 去除每行末尾的换行符,并过滤空行 codes = [line.strip() for line in lines if line.strip()] return codes except FileNotFoundError: print(f"文件 {file_path} 不存在") return [] except Exception as e: print(f"读取文件失败: {str(e)}") return [] def run_check(self): """执行完整的检查流程""" logging.info("开始检查股票代码对应表...") # 1. 获取所有股票代码 stock_codes = self.get_stock_codes() if not stock_codes: logging.error("没有获取到任何股票代码") return logging.info(f"共获取到 {len(stock_codes)} 个股票代码") # 2. 检查表存在性 check_result = self.check_tables_exist(stock_codes) exists_count = len(check_result['exists']) not_exists_count = len(check_result['not_exists']) # 3. 输出结果 logging.info("\n检查结果:") logging.info(f"存在的表数量: {exists_count}") logging.info(f"不存在的表数量: {not_exists_count}") if not_exists_count > 0: logging.info("\n不存在的表对应的股票代码:") for code in check_result['not_exists']: logging.info(code) # 4. 统计信息 logging.info("\n统计摘要:") logging.info(f"总股票代码数: {len(stock_codes)}") logging.info(f"存在对应表的比例: {exists_count/len(stock_codes):.2%}") logging.info(f"缺失对应表的比例: {not_exists_count/len(stock_codes):.2%}") # 4. 将缺失的股票代码写入TXT文件 if check_result['not_exists']: self.write_missing_codes_to_txt(check_result['not_exists']) # 使用示例 if __name__ == "__main__": # 数据库配置 db_config = { 'host': 'localhost', 'user': 'root', 'password': 'bzskmysql', 'database': 'klinedata_1d_hk' } # 创建检查器并运行 checker = StockTableChecker(db_config) checker.run_check()