diff --git a/DataAnalysis/DataExporter.py b/DataAnalysis/DataExporter.py index 88c9151..01f9b8c 100644 --- a/DataAnalysis/DataExporter.py +++ b/DataAnalysis/DataExporter.py @@ -1,195 +1,10 @@ - -# import csv -# from typing import List, Dict, Optional -# from datetime import datetime -# from base import LogHelper,MySQLHelper - -# logger = LogHelper(logger_name = 'export').setup() - -# def get_monthly_avg_data(db_config: dict, table_name: str) -> Optional[List[Dict]]: -# """ -# 从数据库读取月度均值数据 - -# Args: -# db_config: 数据库配置 -# table_name: 源数据表名 - -# Returns: -# List[Dict]: 查询结果数据集,失败返回None -# """ -# try: -# with MySQLHelper(**db_config) as db: -# # 获取表结构信息 -# columns = db.execute_query(f""" -# SELECT COLUMN_NAME -# FROM INFORMATION_SCHEMA.COLUMNS -# WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s -# ORDER BY ORDINAL_POSITION -# """, (db_config['database'], table_name)) - -# if not columns: -# logger.error(f"表 {table_name} 不存在或没有列") -# return None - -# # 获取列名列表(排除id和update_time) -# field_names = [col['COLUMN_NAME'] for col in columns -# if col['COLUMN_NAME'] not in ('id', 'update_time')] - -# # 查询数据 -# data = db.execute_query(f""" -# SELECT {', '.join(field_names)} -# FROM {table_name} -# ORDER BY stock_code -# """) - -# if not data: -# logger.error(f"表 {table_name} 中没有数据") -# return None - -# return data - -# except Exception as e: -# logger.error(f"从数据库读取月度均值数据失败: {str(e)}") -# return None - -# def get_float_share_data(db_config: dict, table_name: str) -> Optional[List[Dict]]: -# """ -# 从conditionalselection表读取流通股本数据 - -# Args: -# db_config: 数据库配置 -# table_name: 源数据表名 - -# Returns: -# List[Dict]: 查询结果数据集,失败返回None -# """ -# try: -# with MySQLHelper(**db_config) as db: -# # 查询流通股本数据 -# data = db.execute_query(f""" -# SELECT stock_code, stock_name, float_share -# FROM {table_name} -# ORDER BY stock_code -# """) - -# if not data: -# logger.error(f"表 {table_name} 中没有流通股本数据") -# return None - -# return data - -# except Exception as e: -# logger.error(f"从数据库读取流通股本数据失败: {str(e)}") -# return None - -# def export_to_csv(data: List[Dict], output_file: str) -> bool: -# """ -# 将合并后的数据导出到CSV文件 - -# Args: -# data: 要导出的数据集 -# output_file: 输出的CSV文件路径 - -# Returns: -# bool: 是否导出成功 -# """ -# if not data: -# return False - -# try: -# # 获取字段名(使用第一个数据的键) -# field_names = list(data[0].keys()) - -# # 字段名到中文的映射 -# header_map = { -# 'stock_code': '股票代码', -# 'stock_name': '股票名称', -# 'ym_2410': '2024年10月', -# 'ym_2411': '2024年11月', -# 'ym_2412': '2024年12月', -# 'ym_2501': '2025年01月', -# 'ym_2502': '2025年02月', -# 'ym_2503': '2025年03月', -# 'ym_2504': '2025年04月', -# 'ym_2505': '2025年05月', -# 'ym_2506': '2025年06月', -# 'ym_2507': '2025年07月', -# 'ym_2508': '2025年08月', -# 'avg_all': '月度均值' -# } - -# with open(output_file, mode='w', newline='', encoding='utf-8-sig') as csvfile: -# writer = csv.DictWriter(csvfile, fieldnames=field_names) - -# # 写入中文表头 -# writer.writerow({col: header_map.get(col, col) for col in field_names}) - -# # 写入数据 -# writer.writerows(data) - -# logger.info(f"成功导出 {len(data)} 条记录到CSV文件: {output_file}") -# return True - -# except Exception as e: -# logger.error(f"导出到CSV失败: {str(e)}") -# return False - - -# def export_data(db_config: dict, -# monthly_table: str, -# csv_file: str = None) -> bool: -# """ -# 导出合并后的数据到CSV和/或Excel - -# Args: -# db_config: 数据库配置 -# monthly_table: 月度均价表名 -# float_share_table: 流通股本表名 -# csv_file: CSV输出路径(可选) -# excel_file: Excel输出路径(可选) - -# Returns: -# bool: 是否至少有一种格式导出成功 -# """ - -# # 从数据库获取数据 -# monthly_data = get_monthly_avg_data(db_config, monthly_table) -# if not monthly_data: -# logger.error("无法获取月度均价数据") -# return False - -# # 导出结果 -# filePath = 'data/' + csv_file -# csv_success = True -# if csv_file: -# csv_success = export_to_csv(monthly_data, filePath) - -# return csv_success import csv from typing import List, Dict, Optional -from base import LogHelper, MySQLHelper +from base import LogHelper, MySQLHelper, Config class DataExporter: """数据导出器类,用于从数据库提取数据并导出到CSV文件""" - - # 表头映射配置 - HEADER_MAP = { - 'stock_code': '股票代码', - 'stock_name': '股票名称', - 'ym_2410': '2024年10月', - 'ym_2411': '2024年11月', - 'ym_2412': '2024年12月', - 'ym_2501': '2025年01月', - 'ym_2502': '2025年02月', - 'ym_2503': '2025年03月', - 'ym_2504': '2025年04月', - 'ym_2505': '2025年05月', - 'ym_2506': '2025年06月', - 'ym_2507': '2025年07月', - 'ym_2508': '2025年08月', - 'avg_all': '月度均值' - } - + def __init__(self, db_config: dict, logger_name: str = 'export'): """ 初始化数据导出器 @@ -200,6 +15,7 @@ class DataExporter: """ self.db_config = db_config self.logger = LogHelper(logger_name=logger_name).setup() + self.head_map = Config.ConfigInfo.HEADER_MAP def get_monthly_avg_data(self, table_name: str) -> Optional[List[Dict]]: """ @@ -298,7 +114,7 @@ class DataExporter: writer = csv.DictWriter(csvfile, fieldnames=field_names) # 写入中文表头 - writer.writerow({col: self.HEADER_MAP.get(col, col) for col in field_names}) + writer.writerow({col: self.head_map.get(col, col) for col in field_names}) # 写入数据 writer.writerows(data) diff --git a/DataAnalysis/MarketDataCalculator.py b/DataAnalysis/MarketDataCalculator.py index 47fdd12..e9d943f 100644 --- a/DataAnalysis/MarketDataCalculator.py +++ b/DataAnalysis/MarketDataCalculator.py @@ -7,319 +7,16 @@ # """ -# import pandas as pd -# from datetime import datetime -# from futu import * -# from tqdm import tqdm -# from pathlib import Path -# from base.MySQLHelper import MySQLHelper -# from typing import Optional, List, Dict, Union, Tuple -# from base.LogHelper import LogHelper - - -# # 基本用法(自动创建日期日志+控制台输出) -# logger = LogHelper(logger_name = 'Calculate').setup() - -# def create_monthly_avg_table(db_config: dict, target_table: str = "monthly_close_avg") -> bool: -# """ -# 创建专门存储2024年10月至2025年8月月度均值的表结构 -> 后面再根据实际需要,设计通用表格 - -# Args: -# db_config: 数据库配置 -# target_table: 目标表名 - -# Returns: -# bool: 是否创建成功 -# """ -# try: -# with MySQLHelper(**db_config) as db: -# create_sql = f""" -# CREATE TABLE IF NOT EXISTS {target_table} ( -# id INT AUTO_INCREMENT PRIMARY KEY, -# stock_code VARCHAR(20) NOT NULL COMMENT '股票代码', -# stock_name VARCHAR(50) COMMENT '股票名称', -# ym_2410 DECIMAL(20, 5) COMMENT '2024年10月', -# ym_2411 DECIMAL(20, 5) COMMENT '2024年11月', -# ym_2412 DECIMAL(20, 5) COMMENT '2024年12月', -# ym_2501 DECIMAL(20, 5) COMMENT '2025年01月', -# ym_2502 DECIMAL(20, 5) COMMENT '2025年02月', -# ym_2503 DECIMAL(20, 5) COMMENT '2025年03月', -# ym_2504 DECIMAL(20, 5) COMMENT '2025年04月', -# ym_2505 DECIMAL(20, 5) COMMENT '2025年05月', -# ym_2506 DECIMAL(20, 5) COMMENT '2025年06月', -# ym_2507 DECIMAL(20, 5) COMMENT '2025年07月', -# ym_2508 DECIMAL(20, 5) COMMENT '2025年08月', -# avg_all DECIMAL(20, 5) COMMENT '月间均值', -# update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', -# UNIQUE KEY uk_stock_code (stock_code) -# ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='月均流通市值表(2024.10-2025.08)' -# """ -# db.execute_update(create_sql) -# logger.info(f"创建/确认表 {target_table} 结构成功") -# return True -# except Exception as e: -# logger.error(f"创建表失败: {str(e)}") -# return False - -# def calculate_and_save_monthly_avg(db_config: dict, -# source_table: str = "stock_quotes", -# target_table: str = "monthly_close_avg") -> bool: -# """ -# 计算并保存2024年10月至2025年8月的月均流通市值 -> 后面修改为,指定时间间隔 - -# Args: -# db_config: 数据库配置 -# source_table: 源数据表名 -# target_table: 目标表名 - -# Returns: -# bool: 是否成功 -# """ -# # 定义分析的时间范围 -> 根据实际需要进行调整时间范围 -# month_ranges = { -# 'ym_2410': ('2024-10-01', '2024-10-31'), -# 'ym_2411': ('2024-11-01', '2024-11-30'), -# 'ym_2412': ('2024-12-01', '2024-12-31'), -# 'ym_2501': ('2025-01-01', '2025-01-31'), -# 'ym_2502': ('2025-02-01', '2025-02-28'), -# 'ym_2503': ('2025-03-01', '2025-03-31'), -# 'ym_2504': ('2025-04-01', '2025-04-30'), -# 'ym_2505': ('2025-05-01', '2025-05-31'), -# 'ym_2506': ('2025-06-01', '2025-06-30'), -# 'ym_2507': ('2025-07-01', '2025-07-31'), -# 'ym_2508': ('2025-08-01', '2025-08-31') -# } - -# try: -# # 确保表结构存在 -# if not create_monthly_avg_table(db_config, target_table): -# return False - -# with MySQLHelper(**db_config) as db: -# # 获取所有股票代码和名称 -# stock_info = db.execute_query( -# f"SELECT DISTINCT stock_code, stock_name FROM {source_table}" -# ) - -# if not stock_info: -# logging.error("没有获取到股票基本信息") -# return False - -# # 为每只股票计算各月均值 -# for stock in stock_info: -# stock_code = stock['stock_code'] -# stock_name = stock['stock_name'] - -# monthly_data = {'stock_code': stock_code, 'stock_name': stock_name} - -# # 计算每个月的均值 -# for month_col, (start_date, end_date) in month_ranges.items(): -# sql = """ -# SELECT AVG(close_price * float_share) as avg_close -# FROM {} -# WHERE stock_code = %s -# AND trade_date BETWEEN %s AND %s -# AND close_price IS NOT NULL -# AND float_share IS NOT NULL -# """.format(source_table) - -# result = db.execute_query(sql, (stock_code, start_date, end_date)) -# monthly_data[month_col] = float(result[0]['avg_close']) * 1000 if result and result[0]['avg_close'] else None # 流通股数量单位:1000 -> 可以考虑直接按照亿为单位存储 - -# # 提取所有以 'ym_' 开头的键的值 -# ym_values = [value for key, value in monthly_data.items() if key.startswith('ym_')] -# valid_ym_values = [value for value in ym_values if value is not None] - -# # 计算全部月的均值 -# if valid_ym_values: -# average = sum(valid_ym_values) / len(valid_ym_values) -# monthly_data['avg_all'] = average -# logger.info(f"月间流通市值平均值为: {average}") -# else: -# logger.error("没有找到以 'ym_' 开头的键") - -# # 插入或更新数据 -# upsert_sql = f""" -# INSERT INTO {target_table} ( -# stock_code, stock_name, -# ym_2410, ym_2411, ym_2412, -# ym_2501, ym_2502, ym_2503, ym_2504, -# ym_2505, ym_2506, ym_2507, ym_2508, -# avg_all -# ) VALUES ( -# %(stock_code)s, %(stock_name)s, -# %(ym_2410)s, %(ym_2411)s, %(ym_2412)s, -# %(ym_2501)s, %(ym_2502)s, %(ym_2503)s, %(ym_2504)s, -# %(ym_2505)s, %(ym_2506)s, %(ym_2507)s, %(ym_2508)s, -# %(avg_all)s -# ) -# ON DUPLICATE KEY UPDATE -# stock_name = VALUES(stock_name), -# ym_2410 = VALUES(ym_2410), -# ym_2411 = VALUES(ym_2411), -# ym_2412 = VALUES(ym_2412), -# ym_2501 = VALUES(ym_2501), -# ym_2502 = VALUES(ym_2502), -# ym_2503 = VALUES(ym_2503), -# ym_2504 = VALUES(ym_2504), -# ym_2505 = VALUES(ym_2505), -# ym_2506 = VALUES(ym_2506), -# ym_2507 = VALUES(ym_2507), -# ym_2508 = VALUES(ym_2508), -# avg_all = VALUES(avg_all), -# update_time = CURRENT_TIMESTAMP -# """ -# db.execute_update(upsert_sql, monthly_data) - -# logger.info("月度均值计算和保存完成") -# return True -# except Exception as e: -# logger.error(f"计算和保存月度均值失败: {str(e)}") -# return False - -# # 安全转换函数 -# def safe_float(v) -> Optional[float]: -# """安全转换为float,处理N/A和空值""" -# try: -# return float(v) if pd.notna(v) and str(v).upper() != 'N/A' else None -# except (ValueError, TypeError): -# return None - -# def safe_int(v) -> Optional[int]: -# """安全转换为int,处理N/A和空值""" -# try: -# return int(v) if pd.notna(v) and str(v).upper() != 'N/A' else None -# except (ValueError, TypeError): -# return None - -# def safe_parse_date(date_str, date_format='%Y-%m-%d'): -# """ -# 安全解析日期字符串 -# :param date_str: 日期字符串 -# :param date_format: 日期格式 -# :return: 解析后的datetime对象或None -# """ -# if not date_str or pd.isna(date_str) or str(date_str).strip() == '': -# return None -# try: -# return datetime.strptime(str(date_str), date_format) -# except ValueError: -# logger.warning(f"无法解析日期字符串: {date_str}") -# return None - -# def validate_market_data(dataset: list) -> list: -# """ -# 验证市场数据有效性 - -# Args: -# dataset (list): 原始数据集 - -# Returns: -# list: 通过验证的数据集 -# """ -# validated_data = [] -# for item in dataset: -# try: -# # 必要字段检查 -# if not item.get('code') or not item.get('name'): -# logger.warning(f"跳过无效数据: 缺少必要字段 code或name") -# continue - -# # 筛选股票名称 -# if item.get('name')[-1] == 'R': -# continue - -# # 数值范围验证 -# if item.get('lot_size') is not None and item['lot_size'] < 0: -# logger.warning(f"股票 {item['code']} 的lot_size为负值: {item['lot_size']}") -# item['lot_size'] = None - -# validated_data.append(item) -# except Exception as e: -# logger.warning(f"数据验证失败,跳过记录 {item.get('code')}: {str(e)}") -# continue - -# return validated_data - -# def get_market_data(market: Market) -> List[str]: -# """ -# 从Futu API获取指定市场的股票代码列表 - -# Args: -# market (Market): 市场枚举值,如 Market.SH, Market.SZ - -# Returns: -# List[str]: 股票代码列表 -# """ -# quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111) -# try: -# ret, data = quote_ctx.get_stock_basicinfo(market, SecurityType.STOCK) -# if ret == RET_OK: -# # 提取code列并转换为列表 -# codes = data['code'].astype(str).tolist() -# logger.info(f"获取到 {market} 市场 {len(codes)} 个股票代码") -# return codes -# else: -# logger.error(f"获取股票代码失败: {data}") -# return [] -# except Exception as e: -# logger.error(f"获取股票代码时发生异常: {str(e)}") -# return [] -# finally: -# quote_ctx.close() - -# def get_stock_codes() -> List[str]: -# """从conditionalselection表获取所有股票代码""" -# try: -# with MySQLHelper(**db_config) as db: -# sql = f"SELECT DISTINCT stock_code,stock_name FROM stock_filter" -# results = db.execute_query(sql) -# return [ -# row['stock_code'] -# for row in results -# if row['stock_code'] and (row.get('stock_name', '') and not (row.get('stock_name') and str(row['stock_name'])[-1] == 'R')) # 排除 name,上一个版本的排除了R结尾的股票,实际上多排除了3个,这里改成全部计算,导出的时候进行筛选处理 -# ] -# except Exception as e: -# logger.error(f"获取股票代码失败: {str(e)}") -# return [] - -# def read_stock_codes_list(file_path='Reservedcode.txt'): -# """基础读取方法 - 按行读取所有内容""" -# try: -# with open(file_path, 'r', encoding='utf-8') as f: -# lines = f.readlines() -# # 去除每行末尾的换行符,并过滤空行 -# codes = [line.strip() for line in lines if line.strip()] -# return codes -# except FileNotFoundError: -# print(f"文件 {file_path} 不存在") -# return [] -# except Exception as e: -# print(f"读取文件失败: {str(e)}") -# return [] - - -# # 数据库配置 -# db_config = { -# 'host': 'localhost', -# 'user': 'root', -# 'password': 'bzskmysql', -# 'database': 'hk_kline_1d' -# } - import pandas as pd from datetime import datetime from futu import * from tqdm import tqdm from pathlib import Path -from base.MySQLHelper import MySQLHelper from typing import Optional, List, Dict, Union, Tuple -from base.LogHelper import LogHelper import csv from typing import List, Dict, Optional from datetime import datetime -from base import LogHelper, MySQLHelper - +from base import LogHelper, MySQLHelper, Config class MarketDataCalculator: """ @@ -334,38 +31,32 @@ class MarketDataCalculator: - 导出数据到CSV文件 """ - # 表头映射配置 - HEADER_MAP = { - 'stock_code': '股票代码', - 'stock_name': '股票名称', - 'ym_2410': '2024年10月', - 'ym_2411': '2024年11月', - 'ym_2412': '2024年12月', - 'ym_2501': '2025年01月', - 'ym_2502': '2025年02月', - 'ym_2503': '2025年03月', - 'ym_2504': '2025年04月', - 'ym_2505': '2025年05月', - 'ym_2506': '2025年06月', - 'ym_2507': '2025年07月', - 'ym_2508': '2025年08月', - 'avg_all': '月度均值' - } + # # 表头映射配置 + # HEADER_MAP = { + # 'stock_code': '股票代码', + # 'stock_name': '股票名称', + # 'ym_2501': '2025年01月', + # 'ym_2502': '2025年02月', + # 'ym_2503': '2025年03月', + # 'ym_2504': '2025年04月', + # 'ym_2505': '2025年05月', + # 'ym_2506': '2025年06月', + # 'ym_2507': '2025年07月', + # 'ym_2508': '2025年08月', + # 'avg_all': '月度均值' + # } - # 月份范围配置 - MONTH_RANGES = { - 'ym_2410': ('2024-10-01', '2024-10-31'), - 'ym_2411': ('2024-11-01', '2024-11-30'), - 'ym_2412': ('2024-12-01', '2024-12-31'), - 'ym_2501': ('2025-01-01', '2025-01-31'), - 'ym_2502': ('2025-02-01', '2025-02-28'), - 'ym_2503': ('2025-03-01', '2025-03-31'), - 'ym_2504': ('2025-04-01', '2025-04-30'), - 'ym_2505': ('2025-05-01', '2025-05-31'), - 'ym_2506': ('2025-06-01', '2025-06-30'), - 'ym_2507': ('2025-07-01', '2025-07-31'), - 'ym_2508': ('2025-08-01', '2025-08-31') - } + # # 月份范围配置 + # MONTH_RANGES = { + # 'ym_2501': ('2025-01-01', '2025-01-31'), + # 'ym_2502': ('2025-02-01', '2025-02-28'), + # 'ym_2503': ('2025-03-01', '2025-03-31'), + # 'ym_2504': ('2025-04-01', '2025-04-30'), + # 'ym_2505': ('2025-05-01', '2025-05-31'), + # 'ym_2506': ('2025-06-01', '2025-06-30'), + # 'ym_2507': ('2025-07-01', '2025-07-31'), + # 'ym_2508': ('2025-08-01', '2025-08-31') + # } def __init__(self, db_config: dict, logger_name: str = 'Calculate'): """ @@ -377,6 +68,8 @@ class MarketDataCalculator: """ self.db_config = db_config self.logger = LogHelper(logger_name=logger_name).setup() + self.month_ranges = Config.ConfigInfo.MONTH_RANGES + self.head_map = Config.ConfigInfo.HEADER_MAP def create_monthly_avg_table(self, target_table: str = "monthly_close_avg") -> bool: """ @@ -395,9 +88,6 @@ class MarketDataCalculator: id INT AUTO_INCREMENT PRIMARY KEY, stock_code VARCHAR(20) NOT NULL COMMENT '股票代码', stock_name VARCHAR(50) COMMENT '股票名称', - ym_2410 DECIMAL(20, 5) COMMENT '2024年10月', - ym_2411 DECIMAL(20, 5) COMMENT '2024年11月', - ym_2412 DECIMAL(20, 5) COMMENT '2024年12月', ym_2501 DECIMAL(20, 5) COMMENT '2025年01月', ym_2502 DECIMAL(20, 5) COMMENT '2025年02月', ym_2503 DECIMAL(20, 5) COMMENT '2025年03月', @@ -412,7 +102,7 @@ class MarketDataCalculator: ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='月均流通市值表(2024.10-2025.08)' """ db.execute_update(create_sql) - self.logger.info(f"创建/确认表 {target_table} 结构成功") + # self.logger.info(f"创建/确认表 {target_table} 结构成功") return True except Exception as e: self.logger.error(f"创建表失败: {str(e)}") @@ -454,7 +144,7 @@ class MarketDataCalculator: monthly_data = {'stock_code': stock_code, 'stock_name': stock_name} # 计算每个月的均值 - for month_col, (start_date, end_date) in self.MONTH_RANGES.items(): + for month_col, (start_date, end_date) in self.month_ranges.items(): sql = """ SELECT AVG(close_price * float_share) as avg_close FROM {} @@ -483,22 +173,17 @@ class MarketDataCalculator: upsert_sql = f""" INSERT INTO {target_table} ( stock_code, stock_name, - ym_2410, ym_2411, ym_2412, ym_2501, ym_2502, ym_2503, ym_2504, - ym_2505, ym_2506, ym_2507, ym_2508, + ym_2505, ym_2506,ym_2507, ym_2508, avg_all ) VALUES ( %(stock_code)s, %(stock_name)s, - %(ym_2410)s, %(ym_2411)s, %(ym_2412)s, %(ym_2501)s, %(ym_2502)s, %(ym_2503)s, %(ym_2504)s, %(ym_2505)s, %(ym_2506)s, %(ym_2507)s, %(ym_2508)s, %(avg_all)s ) ON DUPLICATE KEY UPDATE stock_name = VALUES(stock_name), - ym_2410 = VALUES(ym_2410), - ym_2411 = VALUES(ym_2411), - ym_2412 = VALUES(ym_2412), ym_2501 = VALUES(ym_2501), ym_2502 = VALUES(ym_2502), ym_2503 = VALUES(ym_2503), @@ -512,7 +197,7 @@ class MarketDataCalculator: """ db.execute_update(upsert_sql, monthly_data) - self.logger.info("月度均值计算和保存完成") + # self.logger.info("月度均值计算和保存完成") return True except Exception as e: self.logger.error(f"计算和保存月度均值失败: {str(e)}") @@ -610,11 +295,12 @@ class MarketDataCalculator: finally: quote_ctx.close() + def get_stock_codes(self) -> List[str]: - """从conditionalselection表获取所有股票代码""" + """从 stock_filter 表获取所有股票代码,使用筛选接口得到的股票列表数据""" try: with MySQLHelper(**self.db_config) as db: - sql = f"SELECT DISTINCT stock_code,stock_name FROM stocks_hk" + sql = f"SELECT DISTINCT stock_code,stock_name FROM stock_filter" results = db.execute_query(sql) return [ row['stock_code'] @@ -738,7 +424,7 @@ class MarketDataCalculator: writer = csv.DictWriter(csvfile, fieldnames=field_names) # 写入中文表头 - writer.writerow({col: self.HEADER_MAP.get(col, col) for col in field_names}) + writer.writerow({col: self.head_map.get(col, col) for col in field_names}) # 写入数据 writer.writerows(data) diff --git a/UpdateFutuData/KLineUpdater.py b/UpdateFutuData/KLineUpdater.py new file mode 100644 index 0000000..2dfdf45 --- /dev/null +++ b/UpdateFutuData/KLineUpdater.py @@ -0,0 +1,415 @@ +""" +KLineUpdater 类用于更新日 K 数据 + +封装了 Futu API 操作和数据库存储功能,提供类接口供其他模块调用 +""" +from futu import * +from base.MySQLHelper import MySQLHelper +from base.LogHelper import LogHelper +from datetime import datetime +from typing import Optional, List, Dict +from UpdateFutuData.ConditionalSelection import FutuStockFilter +from tqdm import tqdm +import pandas as pd +import time +import os + +class KLineUpdater: + """ + K线数据更新器类 + + 功能: + - 从Futu API获取股票K线数据 + - 处理数据并保存到MySQL数据库 + - 管理流通股数量信息 + - 提供完整的股票数据更新流程 + """ + + def __init__(self, db_config: dict = None, logger_name: str = 'KLine'): + """ + 初始化KLineUpdater + + Args: + db_config (dict): 数据库配置字典 + logger_name (str): 日志器名称 + """ + # 默认数据库配置 + self.db_config = db_config or { + 'host': 'localhost', + 'user': 'root', + 'password': 'bzskmysql', + 'database': 'hk_kline_1d' + } + + # 初始化日志 + self.logger = LogHelper(logger_name=logger_name).setup() + self.logger.info("KLineUpdater 初始化完成") + + def get_market_data(self, market: Market) -> List[str]: + """ + 从Futu API获取指定市场的股票代码列表 + + Args: + market (Market): 市场枚举值,如 Market.SH, Market.SZ + + Returns: + List[str]: 股票代码列表 + """ + quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111) + try: + ret, data = quote_ctx.get_stock_basicinfo(market, SecurityType.STOCK) + if ret == RET_OK: + # 提取code列并转换为列表 + codes = data['code'].astype(str).tolist() + self.logger.info(f"获取到 {market} 市场 {len(codes)} 个股票代码") + return codes + else: + self.logger.error(f"获取股票代码失败: {data}") + return [] + except Exception as e: + self.logger.error(f"获取股票代码时发生异常: {str(e)}") + return [] + finally: + quote_ctx.close() + + def preprocess_quote_data(self, df: pd.DataFrame, float_share: Optional[int] = None) -> List[Dict]: + """ + 预处理行情数据,转换为适合数据库存储的格式 + + Args: + df (pd.DataFrame): 原始行情数据DataFrame + float_share (Optional[int]): 流通股数量 + + Returns: + List[Dict]: 处理后的数据列表 + """ + processed_data = [] + for _, row in df.iterrows(): + try: + # 提取市场标识 + market = row['code'].split('.')[0] if '.' in row['code'] else 'UNKNOWN' + + # 转换时间格式 + trade_time = datetime.strptime(row['time_key'], '%Y-%m-%d %H:%M:%S') + + item = { + 'stock_code': row['code'], + 'stock_name': row['name'], + 'trade_date': trade_time, + 'open_price': float(row['open']), + 'close_price': float(row['close']), + 'high_price': float(row['high']), + 'low_price': float(row['low']), + 'pe_ratio': float(row['pe_ratio']) if pd.notna(row['pe_ratio']) else None, + 'turnover_rate': float(row['turnover_rate']) if pd.notna(row['turnover_rate']) else None, + 'volume': int(row['volume']) if pd.notna(row['volume']) else None, + 'turnover': float(row['turnover']) if pd.notna(row['turnover']) else None, + 'change_rate': float(row['change_rate']) if pd.notna(row['change_rate']) else None, + 'last_close': float(row['last_close']) if pd.notna(row['last_close']) else None, + 'market': market, + 'float_share': float_share # 添加流通股数量字段 + } + processed_data.append(item) + except Exception as e: + self.logger.warning(f"处理行情数据时跳过异常行 {row.get('code', '未知')}: {str(e)}") + continue + + return processed_data + + def get_float_share(self, quote_ctx: OpenQuoteContext, code: str) -> Optional[int]: + """ + 获取股票的流通股数量 + + Args: + quote_ctx (OpenQuoteContext): Futu API连接上下文 + code (str): 股票代码 + + Returns: + Optional[int]: 流通股数量,获取失败返回None + """ + try: + # 获取股票快照 + ret, snapshot = quote_ctx.get_market_snapshot([code]) + if ret == RET_OK and not snapshot.empty: + # 提取流通股数量 + float_share = snapshot.iloc[0].get('float_share') + if pd.notna(float_share): + return int(float_share) + return None + except Exception as e: + self.logger.error(f"获取股票 {code} 的流通股数量失败: {str(e)}") + return None + + def get_float_share_data(self, table_name: str) -> Optional[Dict[str, int]]: + """ + 从指定表读取流通股本数据并构建字典 + + Args: + table_name: 源数据表名 + + Returns: + Dict[str, int]: 股票代码到流通股数量的映射字典,失败返回None + """ + try: + with MySQLHelper(**self.db_config) as db: + # 查询流通股本数据 + data = db.execute_query(f""" + SELECT stock_code, float_share + FROM {table_name} + WHERE float_share IS NOT NULL + """) + + if not data: + self.logger.error(f"表 {table_name} 中没有流通股本数据") + return None + + # 构建股票代码到流通股数量的映射字典 + float_share_dict = {} + for row in data: + stock_code = row['stock_code'] + float_share = row['float_share'] + if stock_code and float_share is not None: + float_share_dict[stock_code] = int(float_share) + + self.logger.info(f"成功从 {table_name} 表加载 {len(float_share_dict)} 条流通股数据") + return float_share_dict + + except Exception as e: + self.logger.error(f"从数据库读取流通股本数据失败: {str(e)}") + return None + + def save_quotes_to_db(self, quote_data: pd.DataFrame, table_name: str = 'stock_quotes', float_share: Optional[int] = None) -> bool: + """ + 将行情数据保存到数据库 + + Args: + quote_data (pd.DataFrame): 行情数据DataFrame + table_name (str): 目标表名(默认为'stock_quotes') + float_share (Optional[int]): 流通股数量 + + Returns: + bool: 是否成功保存 + """ + # 预处理数据 + processed_data = self.preprocess_quote_data(quote_data, float_share) + if not processed_data: + self.logger.error("没有有效数据需要保存") + return False + + # 动态生成SQL插入语句 + insert_sql = f""" + INSERT INTO {table_name} ( + stock_code, stock_name, trade_date, open_price, close_price, high_price, low_price, + pe_ratio, turnover_rate, volume, turnover, change_rate, last_close, market, float_share + ) VALUES ( + %(stock_code)s, %(stock_name)s, %(trade_date)s, %(open_price)s, %(close_price)s, %(high_price)s, %(low_price)s, + %(pe_ratio)s, %(turnover_rate)s, %(volume)s, %(turnover)s, %(change_rate)s, %(last_close)s, %(market)s, %(float_share)s + ) + ON DUPLICATE KEY UPDATE + stock_name = VALUES(stock_name), + open_price = VALUES(open_price), + close_price = VALUES(close_price), + high_price = VALUES(high_price), + low_price = VALUES(low_price), + pe_ratio = VALUES(pe_ratio), + turnover_rate = VALUES(turnover_rate), + volume = VALUES(volume), + turnover = VALUES(turnover), + change_rate = VALUES(change_rate), + last_close = VALUES(last_close), + float_share = VALUES(float_share) + """ + try: + with MySQLHelper(**self.db_config) as db: + # 检查表是否存在,不存在则创建 + if not db.table_exists(table_name): + create_table_sql = f""" + CREATE TABLE IF NOT EXISTS {table_name} ( + id INT AUTO_INCREMENT PRIMARY KEY, + stock_code VARCHAR(20) NOT NULL COMMENT '股票代码', + stock_name VARCHAR(50) COMMENT '股票名称', + trade_date DATETIME NOT NULL COMMENT '交易日期时间', + open_price DECIMAL(10, 3) COMMENT '开盘价', + close_price DECIMAL(10, 3) COMMENT '收盘价', + high_price DECIMAL(10, 3) COMMENT '最高价', + low_price DECIMAL(10, 3) COMMENT '最低价', + pe_ratio DECIMAL(10, 3) COMMENT '市盈率', + turnover_rate DECIMAL(10, 6) COMMENT '换手率(%)', + volume BIGINT COMMENT '成交量(股)', + turnover DECIMAL(20, 2) COMMENT '成交额(元)', + change_rate DECIMAL(10, 6) COMMENT '涨跌幅(%)', + last_close DECIMAL(10, 3) COMMENT '昨收价', + market VARCHAR(10) COMMENT '市场标识', + float_share BIGINT COMMENT '流通股数量', + create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + + UNIQUE KEY uk_code_date (stock_code, trade_date), + KEY idx_trade_date (trade_date), + KEY idx_stock_code (stock_code) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='股票行情数据表' + """ + db.execute_update(create_table_sql) + self.logger.info(f"创建了新表: {table_name}") + + affected_rows = db.execute_many(insert_sql, processed_data) + self.logger.info(f"成功插入/更新 {affected_rows} 条行情记录到表 {table_name}") + return True + except Exception as e: + self.logger.error(f"保存行情数据到表 {table_name} 失败: {str(e)}") + return False + + def read_stock_codes(self, file_path='config\\hang_futu.txt'): + """ + 基础读取方法 - 按行读取所有内容 + + Args: + file_path (str): 文件路径 + + Returns: + List[str]: 股票代码列表 + """ + try: + with open(file_path, 'r', encoding='utf-8') as f: + lines = f.readlines() + # 去除每行末尾的换行符,并过滤空行 + codes = [line.strip() for line in lines if line.strip()] + return codes + except FileNotFoundError: + self.logger.error(f"文件 {file_path} 不存在") + return [] + except Exception as e: + self.logger.error(f"读取文件失败: {str(e)}") + return [] + + def read_single_account_stock_codes(self, file_path='data\\missing_tables_小航富途.txt'): + """ + 基础读取方法 - 按行读取所有内容 + + Args: + file_path (str): 文件路径 + + Returns: + List[str]: 股票代码列表 + """ + try: + with open(file_path, 'r', encoding='utf-8') as f: + lines = f.readlines() + # 去除每行末尾的换行符,并过滤空行 + codes = [line.strip() for line in lines if line.strip()] + return codes + except FileNotFoundError: + self.logger.error(f"文件 {file_path} 不存在") + return [] + except Exception as e: + self.logger.error(f"读取文件失败: {str(e)}") + return [] + + def get_stock_codes(self) -> List[str]: + """ + 从conditionalselection表获取所有股票代码 + + Returns: + List[str]: 股票代码列表 + """ + try: + with MySQLHelper(**self.db_config) as db: + sql = f"SELECT DISTINCT stock_code FROM stock_filter" + results = db.execute_query(sql) + return [row['stock_code'] for row in results if row['stock_code']] + except Exception as e: + self.logger.error(f"获取股票代码失败: {str(e)}") + return [] + + def write_missing_codes_to_txt(self, missing_codes: list, filename: str = "config\\missing_codes.txt"): + """ + 将缺失的股票代码追加到TXT文件,如果文件不存在则创建 + + Args: + missing_codes (list): 缺失的股票代码列表 + filename (str): 目标文件名 + """ + try: + # 确保目录存在 + directory = os.path.dirname(filename) + if directory and not os.path.exists(directory): + os.makedirs(directory) + self.logger.info(f"创建目录: {directory}") + + with open(filename, 'a', encoding='utf-8') as f: + for code in missing_codes: + f.write(f"\n{code}") + self.logger.info(f"已将 {len(missing_codes)} 个缺失表对应的股票代码写入 {filename}") + except Exception as e: + self.logger.error(f"写入TXT文件失败: {str(e)}") + + def update_kline_data(self, stock_codes: List[str] = None, table_name: str = 'stock_quotes'): + """ + 主更新方法 - 更新指定股票代码的K线数据 + + Args: + stock_codes (List[str]): 要更新的股票代码列表,如果为None则从数据库获取 + table_name (str): 目标表名 + """ + if stock_codes is None: + stock_codes = self.get_stock_codes() + + if not stock_codes: + self.logger.warning("没有股票代码需要更新") + return + + self.logger.info(f"开始更新 {len(stock_codes)} 个股票的K线数据") + + # 这里可以添加具体的更新逻辑 + # 例如:遍历股票代码,获取数据并保存到数据库 + # 每天收盘后更新数据 -> 操作界面中,这个参数需要放出来 + start_date = (datetime.now() - timedelta(days = 3 * 356)).strftime("%Y-%m-%d") + end_date = (datetime.now() + timedelta(days = 1)).strftime("%Y-%m-%d") + + # 获取流通股数据字典 + quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111) + float_share_dict = self.get_float_share_data('stock_filter') # 假设数据在stock_filter表中 + for code in tqdm(stock_codes, desc="更新K线数据"): + try: + + # 从字典中获取流通股数量 + float_share = float_share_dict.get(code) if float_share_dict else None + + # # 如果字典中没有找到,尝试从API获取 + # if float_share is None: + # float_share = get_float_share(quote_ctx, code) + # if float_share is not None: + # logger.info(f"从API获取股票 {code} 的流通股数量: {float_share}") + # else: + # logger.warning(f"无法获取股票 {code} 的流通股数量") + + # 获取历史K线数据 + ret, data, page_req_key = quote_ctx.request_history_kline(code, start=start_date, end=end_date, max_count=100) + + # 保存数据到自定义表 + custom_table_name = 'hk_' + code[3:] # 自定义表名 + + if ret == RET_OK: + success = self.save_quotes_to_db( data, custom_table_name, float_share) + else: + logger.error(f'error:{data}') + + isWhile = False # 只返回一页时增加请求延迟 + while page_req_key != None: # 请求后面的所有结果 + isWhile = True + ret, data, page_req_key = quote_ctx.request_history_kline(code, start=start_date, end=end_date, max_count=100, page_req_key=page_req_key) # 请求翻页后的数据 + if ret == RET_OK: + success = self.save_quotes_to_db(data, custom_table_name, float_share) + else: + logger.error(f'error:{ data}') + + time.sleep(0.7) + + if isWhile == False: + time.sleep(0.7) + + except Exception as e: + self.logger.error(f"更新股票 {code} 数据失败: {str(e)}") + continue + quote_ctx.close() # 结束后记得关闭当条连接,防止连接条数用尽 + + self.logger.info("K线数据更新完成") diff --git a/UpdateFutuData/__init__.py b/UpdateFutuData/__init__.py index 26fc52e..9a9530f 100644 --- a/UpdateFutuData/__init__.py +++ b/UpdateFutuData/__init__.py @@ -1,2 +1,3 @@ -from .updatekline import * -from .checktable import * \ No newline at end of file +from .KLineUpdater import * +from .checktable import * +from .KLineUpdater import KLineUpdater \ No newline at end of file diff --git a/UpdateFutuData/updateExecl.py b/UpdateFutuData/updateExcel.py similarity index 100% rename from UpdateFutuData/updateExecl.py rename to UpdateFutuData/updateExcel.py diff --git a/UpdateFutuData/updatekline.py b/UpdateFutuData/updatekline.py deleted file mode 100644 index 8ec2f44..0000000 --- a/UpdateFutuData/updatekline.py +++ /dev/null @@ -1,294 +0,0 @@ -""" - 本代码用于更新日 K 数据 - - —— 由于行情数据限制,至少需要3个账号,进行数据更新 - —— 更新内容:每个交易日日K数据,以及当日流通股数量(用于计算流通和市值) - - 程序流程: - —— 更新 hk_stock_list table 存放全部静态数据(这是一个总表) - —— 读取股票列表:三个OpenD账号的数据存放在三个 TXT 文件中 - —— 根据股票代码,获取股票历史数据 - —— 根据股票代码,获取流通股数量 - —— 根据股票代码,新建/更新数据表 - —— 更新完成需检查 ,数据是否完整 -> checktable.py 操作界面做好了之后,再融合到整体代码中 -""" -from futu import * -from base.MySQLHelper import MySQLHelper # MySQLHelper类保存为单独文件 -from base.LogHelper import LogHelper -from datetime import datetime -from typing import Optional, List, Dict -from UpdateFutuData.ConditionalSelection import FutuStockFilter -from tqdm import tqdm -import pandas as pd -import time - -# 基本用法(自动创建日期日志+控制台输出) -logger = LogHelper(logger_name = 'KLine').setup() - -def get_market_data(market: Market) -> List[str]: - """ - 从Futu API获取指定市场的股票代码列表 - - Args: - market (Market): 市场枚举值,如 Market.SH, Market.SZ - - Returns: - List[str]: 股票代码列表 - """ - quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111) - try: - ret, data = quote_ctx.get_stock_basicinfo(market, SecurityType.STOCK) - if ret == RET_OK: - # 提取code列并转换为列表 - codes = data['code'].astype(str).tolist() - logger.info(f"获取到 {market} 市场 {len(codes)} 个股票代码") - return codes - else: - logger.error(f"获取股票代码失败: {data}") - return [] - except Exception as e: - logger.error(f"获取股票代码时发生异常: {str(e)}") - return [] - finally: - quote_ctx.close() - -def preprocess_quote_data(df: pd.DataFrame, float_share: Optional[int] = None) -> List[Dict]: - """ - 预处理行情数据,转换为适合数据库存储的格式 - - Args: - df (pd.DataFrame): 原始行情数据DataFrame - float_share (Optional[int]): 流通股数量 - - Returns: - List[Dict]: 处理后的数据列表 - """ - processed_data = [] - for _, row in df.iterrows(): - try: - # 提取市场标识 - market = row['code'].split('.')[0] if '.' in row['code'] else 'UNKNOWN' - - # 转换时间格式 - trade_time = datetime.strptime(row['time_key'], '%Y-%m-%d %H:%M:%S') - - item = { - 'stock_code': row['code'], - 'stock_name': row['name'], - 'trade_date': trade_time, - 'open_price': float(row['open']), - 'close_price': float(row['close']), - 'high_price': float(row['high']), - 'low_price': float(row['low']), - 'pe_ratio': float(row['pe_ratio']) if pd.notna(row['pe_ratio']) else None, - 'turnover_rate': float(row['turnover_rate']) if pd.notna(row['turnover_rate']) else None, - 'volume': int(row['volume']) if pd.notna(row['volume']) else None, - 'turnover': float(row['turnover']) if pd.notna(row['turnover']) else None, - 'change_rate': float(row['change_rate']) if pd.notna(row['change_rate']) else None, - 'last_close': float(row['last_close']) if pd.notna(row['last_close']) else None, - 'market': market, - 'float_share': float_share # 添加流通股数量字段 - } - processed_data.append(item) - except Exception as e: - logger.warning(f"处理行情数据时跳过异常行 {row.get('code', '未知')}: {str(e)}") - continue - - return processed_data - -def get_float_share(quote_ctx: OpenQuoteContext, code: str) -> Optional[int]: - """ - 获取股票的流通股数量 - - Args: - quote_ctx (OpenQuoteContext): Futu API连接上下文 - code (str): 股票代码 - - Returns: - Optional[int]: 流通股数量,获取失败返回None - """ - try: - # 获取股票快照 - ret, snapshot = quote_ctx.get_market_snapshot([code]) - if ret == RET_OK and not snapshot.empty: - # 提取流通股数量 - float_share = snapshot.iloc[0].get('float_share') - if pd.notna(float_share): - return int(float_share) - return None - except Exception as e: - logger.error(f"获取股票 {code} 的流通股数量失败: {str(e)}") - return None - -def get_float_share_data(db_config: dict, table_name: str) -> Optional[Dict[str, int]]: - """ - 从指定表读取流通股本数据并构建字典 - - Args: - db_config: 数据库配置 - table_name: 源数据表名 - - Returns: - Dict[str, int]: 股票代码到流通股数量的映射字典,失败返回None - """ - try: - with MySQLHelper(**db_config) as db: - # 查询流通股本数据 - data = db.execute_query(f""" - SELECT stock_code, float_share - FROM {table_name} - WHERE float_share IS NOT NULL - """) - - if not data: - logger.error(f"表 {table_name} 中没有流通股本数据") - return None - - # 构建股票代码到流通股数量的映射字典 - float_share_dict = {} - for row in data: - stock_code = row['stock_code'] - float_share = row['float_share'] - if stock_code and float_share is not None: - float_share_dict[stock_code] = int(float_share) - - logger.info(f"成功从 {table_name} 表加载 {len(float_share_dict)} 条流通股数据") - return float_share_dict - - except Exception as e: - logger.error(f"从数据库读取流通股本数据失败: {str(e)}") - return None - -def save_quotes_to_db(db_config: dict, quote_data: pd.DataFrame, table_name: str = 'stock_quotes', float_share: Optional[int] = None) -> bool: - """ - 将行情数据保存到数据库 - - Args: - db_config (dict): 数据库配置 - quote_data (pd.DataFrame): 行情数据DataFrame - table_name (str): 目标表名(默认为'stock_quotes') - float_share (Optional[int]): 流通股数量 - - Returns: - bool: 是否成功保存 - """ - # 预处理数据 - processed_data = preprocess_quote_data(quote_data, float_share) - if not processed_data: - logger.error("没有有效数据需要保存") - return False - - # 动态生成SQL插入语句 - insert_sql = f""" - INSERT INTO {table_name} ( - stock_code, stock_name, trade_date, open_price, close_price, high_price, low_price, - pe_ratio, turnover_rate, volume, turnover, change_rate, last_close, market, float_share - ) VALUES ( - %(stock_code)s, %(stock_name)s, %(trade_date)s, %(open_price)s, %(close_price)s, %(high_price)s, %(low_price)s, - %(pe_ratio)s, %(turnover_rate)s, %(volume)s, %(turnover)s, %(change_rate)s, %(last_close)s, %(market)s, %(float_share)s - ) - ON DUPLICATE KEY UPDATE - stock_name = VALUES(stock_name), - open_price = VALUES(open_price), - close_price = VALUES(close_price), - high_price = VALUES(high_price), - low_price = VALUES(low_price), - pe_ratio = VALUES(pe_ratio), - turnover_rate = VALUES(turnover_rate), - volume = VALUES(volume), - turnover = VALUES(turnover), - change_rate = VALUES(change_rate), - last_close = VALUES(last_close), - float_share = VALUES(float_share) - """ - - try: - with MySQLHelper(**db_config) as db: - # 检查表是否存在,不存在则创建 - if not db.table_exists(table_name): - create_table_sql = f""" - CREATE TABLE IF NOT EXISTS {table_name} ( - id INT AUTO_INCREMENT PRIMARY KEY, - stock_code VARCHAR(20) NOT NULL COMMENT '股票代码', - stock_name VARCHAR(50) COMMENT '股票名称', - trade_date DATETIME NOT NULL COMMENT '交易日期时间', - open_price DECIMAL(10, 3) COMMENT '开盘价', - close_price DECIMAL(10, 3) COMMENT '收盘价', - high_price DECIMAL(10, 3) COMMENT '最高价', - low_price DECIMAL(10, 3) COMMENT '最低价', - pe_ratio DECIMAL(10, 3) COMMENT '市盈率', - turnover_rate DECIMAL(10, 6) COMMENT '换手率(%)', - volume BIGINT COMMENT '成交量(股)', - turnover DECIMAL(20, 2) COMMENT '成交额(元)', - change_rate DECIMAL(10, 6) COMMENT '涨跌幅(%)', - last_close DECIMAL(10, 3) COMMENT '昨收价', - market VARCHAR(10) COMMENT '市场标识', - float_share BIGINT COMMENT '流通股数量', - create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', - - UNIQUE KEY uk_code_date (stock_code, trade_date), - KEY idx_trade_date (trade_date), - KEY idx_stock_code (stock_code) - ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='股票行情数据表' - """ - db.execute_update(create_table_sql) - logger.info(f"创建了新表: {table_name}") - - affected_rows = db.execute_many(insert_sql, processed_data) - logger.info(f"成功插入/更新 {affected_rows} 条行情记录到表 {table_name}") - return True - except Exception as e: - logger.error(f"保存行情数据到表 {table_name} 失败: {str(e)}") - return False - -def read_single_account_stock_codes(file_path='data\missing_tables_小航富途.txt'): - """基础读取方法 - 按行读取所有内容""" - try: - with open(file_path, 'r', encoding='utf-8') as f: - lines = f.readlines() - # 去除每行末尾的换行符,并过滤空行 - codes = [line.strip() for line in lines if line.strip()] - return codes - except FileNotFoundError: - print(f"文件 {file_path} 不存在") - return [] - except Exception as e: - print(f"读取文件失败: {str(e)}") - return [] - -def get_stock_codes() -> List[str]: - """从conditionalselection表获取所有股票代码""" - try: - with MySQLHelper(**db_config) as db: - sql = f"SELECT DISTINCT stock_code FROM stock_filter" - results = db.execute_query(sql) - return [row['stock_code'] for row in results if row['stock_code']] - except Exception as e: - logger.error(f"获取股票代码失败: {str(e)}") - return [] - -def write_missing_codes_to_txt(missing_codes: list, filename: str = "config\missing_codes.txt"): - """将缺失的股票代码追加到TXT文件,如果文件不存在则创建""" - try: - # 确保目录存在 - directory = os.path.dirname(filename) - if directory and not os.path.exists(directory): - os.makedirs(directory) - logger.info(f"创建目录: {directory}") - - # # 检查文件是否存在 - # file_exists = os.path.exists(filename) - with open(filename, 'a', encoding='utf-8') as f: - for code in missing_codes: - f.write(f"\n{code}") - logger.info(f"已将 {len(missing_codes)} 个缺失表对应的股票代码写入 {filename}") - except Exception as e: - logger.error(f"写入TXT文件失败: {str(e)}") - -# 数据库配置 -db_config = { - 'host': 'localhost', - 'user': 'root', - 'password': 'bzskmysql', - 'database': 'hk_kline_1d' -} diff --git a/base/Config.py b/base/Config.py index e38723d..4b1be2b 100644 --- a/base/Config.py +++ b/base/Config.py @@ -7,3 +7,34 @@ class ConfigInfo: 'database': 'hk_kline_1d' } + + """ + 映射表发生变化时,流通值计算的 sql 语句同样需要进行修改 + """ + # 表头映射配置 + HEADER_MAP = { + 'stock_code': '股票代码', + 'stock_name': '股票名称', + 'ym_2501': '2025年01月', + 'ym_2502': '2025年02月', + 'ym_2503': '2025年03月', + 'ym_2504': '2025年04月', + 'ym_2505': '2025年05月', + 'ym_2506': '2025年06月', + 'ym_2507': '2025年07月', + 'ym_2508': '2025年08月', + 'avg_all': '月度均值' + } + + # 月份范围配置 + MONTH_RANGES = { + 'ym_2501': ('2025-01-01', '2025-01-31'), + 'ym_2502': ('2025-02-01', '2025-02-28'), + 'ym_2503': ('2025-03-01', '2025-03-31'), + 'ym_2504': ('2025-04-01', '2025-04-30'), + 'ym_2505': ('2025-05-01', '2025-05-31'), + 'ym_2506': ('2025-06-01', '2025-06-30'), + 'ym_2507': ('2025-07-01', '2025-07-31'), + 'ym_2508': ('2025-08-01', '2025-08-31') + } + diff --git a/config/HK_futu.txt b/config/HK_futu.txt index 79dafa6..7b7700a 100644 --- a/config/HK_futu.txt +++ b/config/HK_futu.txt @@ -889,4 +889,7 @@ HK.01417 HK.00300 HK.02563 HK.01591 -HK.01715 \ No newline at end of file +HK.01715 +HK.00489 +HK.08218 +HK.06960 \ No newline at end of file diff --git a/config/hang_futu.txt b/config/hang_futu.txt index 8839f0d..291ebf9 100644 --- a/config/hang_futu.txt +++ b/config/hang_futu.txt @@ -700,4 +700,5 @@ HK.02926 HK.02927 HK.02929 HK.02631 -HK.02930 \ No newline at end of file +HK.02930 +HK.08566 \ No newline at end of file diff --git a/config/missing_codes.txt b/config/missing_codes.txt index a67995e..e69de29 100644 --- a/config/missing_codes.txt +++ b/config/missing_codes.txt @@ -1,12 +0,0 @@ -HK.02461 -HK.02478 -HK.02572 -HK.02612 -HK.04332 -HK.04335 -HK.04336 -HK.04337 -HK.04338 -HK.04620 -HK.04621 -HK.04855 \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index 9d768eb..0000000 --- a/main.py +++ /dev/null @@ -1,154 +0,0 @@ -import config - -from DataAnalysis import DataExporter -from base import LogHelper, Config -from datetime import datetime -from DataAnalysis import MarketDataCalculator -from tqdm import tqdm -from pathlib import Path -from UpdateFutuData.updatekline import * -from DataAnalysis.checktable import * - -# 导出数据 -def exportMonthlyAvgData() -> bool: - exporter = DataExporter(Config.ConfigInfo.db_hk_kline_1d) - - # 根据导出时间 -> 一般是当天计算,当天导出。(根据实际情况进行调整) - target_table_name = 'hk_monthly_avg_' + datetime.now().strftime("%Y%m%d") - target_file_name = 'hk_monthly_avg_' + datetime.now().strftime("%Y%m%d") + ".csv" - - return exporter.export_data( - monthly_table = target_table_name, - csv_file = target_file_name - ) - - -def calculate_update_monthly_avg_table() -> bool: - calculator = MarketDataCalculator(Config.ConfigInfo.db_hk_kline_1d) - try: - # 移除人民币交易的股票:股票名称最后一个字符为R,误删除的从配置文件读回来 - Reservedcode = calculator.read_stock_codes_list(Path.cwd().parent/"HKDataManagment" / "config"/"Reservedcode.txt") - market_data_ll = calculator.get_stock_codes() # 使用按照价格和流通股数量筛选的那个表格 - market_data = market_data_ll + Reservedcode - - # 根据统计时间进行命名 - target_table_name = 'hk_monthly_avg_' + datetime.now().strftime("%Y%m%d") - - # 使用tqdm创建进度条 - for code in tqdm(market_data, desc="处理股票数据", unit="支"): - tablename = 'hk_' + code[3:] - # 计算并保存月度均值 - calculator.calculate_and_save_monthly_avg( - source_table = tablename, - target_table=target_table_name - ) - return True - except Exception as e: - # logger.error(f"更新月均流通市值失败: {str(e)}") - return False - -if __name__ == "__main__": - # # 应用配置 - logger = LogHelper(logger_name = 'main').setup() - - # if True: - # logger.info("开始统计月度均值") - # success = calculate_update_monthly_avg_table() - # if success: - # logger.info("月度均值统计成功完成") - # else: - # logger.error("处理过程中出现错误") - - # if True: - # logger.info("开始导出数据") - # success = exportMonthlyAvgData() - # if success: - # logger.info("数据导出成功完成") - # else: - # logger.error("数据导出过程中出现错误") - - - # 数据库配置 - db_config = { - 'host': 'localhost', - 'user': 'root', - 'password': 'bzskmysql', - 'database': 'hk_kline_1d' - } - - # 创建检查器并运行 - checker = StockTableChecker(db_config) - checker.run_check() - - - futuStockFilter = FutuStockFilter(db_config) - futuStockFilter.run_direct_import() - - - # 创建导入器并运行, 用于每日更新流通股数量 -> 操作界面中增加一个开关,每天只需要更新一次 - futuStockFilter = FutuStockFilter(db_config) - futuStockFilter.run_direct_import() - - # 每个账号获取的数据独立开来 -> 操作见面可以选择 - market_data_all = get_stock_codes() - market_data_hang = read_single_account_stock_codes('config\hang_futu.txt') - market_data_kevin= read_single_account_stock_codes('config\kevin_futu.txt') - market_data_HK= read_single_account_stock_codes('config\HK_futu.txt') - market_data_new = list(set(market_data_all) - set(market_data_hang) - set(market_data_kevin) - set(market_data_HK)) - - write_missing_codes_to_txt(market_data_new) # 新股票添加到文件中 -> 暂时手动设置 - - # 动态调整 - """*********************************************""" - market_data = read_single_account_stock_codes('config\missing_codes.txt') - - # 每天收盘后更新数据 -> 操作界面中,这个参数需要放出来 - start_date = (datetime.now() - timedelta(days = 1)).strftime("%Y-%m-%d") - end_date = (datetime.now() + timedelta(days = 1)).strftime("%Y-%m-%d") - - # 获取流通股数据字典 - float_share_dict = get_float_share_data(db_config, 'stock_filter') # 假设数据在stock_filter表中 - - # 使用tqdm创建进度条 - for code in tqdm(market_data, desc="下载股票数据", unit="支"): - full_code = f"{code}" - - quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111) - - # 从字典中获取流通股数量 - float_share = float_share_dict.get(code) if float_share_dict else None - - # 如果字典中没有找到,尝试从API获取 - if float_share is None: - float_share = get_float_share(quote_ctx, code) - if float_share is not None: - logger.info(f"从API获取股票 {code} 的流通股数量: {float_share}") - else: - logger.warning(f"无法获取股票 {code} 的流通股数量") - - # 获取历史K线数据 - ret, data, page_req_key = quote_ctx.request_history_kline(code, start=start_date, end=end_date, max_count=100) - - # 保存数据到自定义表 - custom_table_name = 'hk_' + code[3:] # 自定义表名 - - if ret == RET_OK: - success = save_quotes_to_db(db_config, data, table_name=custom_table_name, float_share=float_share) - else: - logger.error(f'error:{data}') - - isWhile = False # 只返回一页时增加请求延迟 - while page_req_key != None: # 请求后面的所有结果 - isWhile = True - ret, data, page_req_key = quote_ctx.request_history_kline(code, start=start_date, end=end_date, max_count=100, page_req_key=page_req_key) # 请求翻页后的数据 - if ret == RET_OK: - success = save_quotes_to_db(db_config, data, table_name=custom_table_name, float_share=float_share) - else: - logger.error(f'error:{ data}') - - time.sleep(0.7) - - if isWhile == False: - time.sleep(0.7) - - quote_ctx.close() # 结束后记得关闭当条连接,防止连接条数用尽 diff --git a/main_gui.py b/main_gui.py new file mode 100644 index 0000000..c7e8f45 --- /dev/null +++ b/main_gui.py @@ -0,0 +1,555 @@ +""" +香港股票数据管理系统 - GUI界面 + +基于PyQt5的图形用户界面,通过按钮控制各个功能模块的执行 +""" +import sys +import os +from PyQt5.QtWidgets import (QApplication, QMainWindow, QPushButton, + QTextEdit, QVBoxLayout, QWidget, QProgressBar, + QHBoxLayout, QGroupBox, QLabel, QMessageBox, QComboBox) +from PyQt5.QtCore import QThread, pyqtSignal, Qt +from PyQt5.QtGui import QFont + +# 添加项目根目录到Python路径 +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +from UpdateFutuData.KLineUpdater import KLineUpdater +from base.LogHelper import LogHelper +from base import Config +from DataAnalysis import DataExporter, MarketDataCalculator +from DataAnalysis.checktable import StockTableChecker +from UpdateFutuData.ConditionalSelection import FutuStockFilter +from datetime import datetime, timedelta +from tqdm import tqdm +from pathlib import Path +import time + +class LogHandler: + """日志处理器,用于在GUI中显示日志""" + def __init__(self, text_widget): + self.text_widget = text_widget + + def write(self, message): + if message.strip(): + self.text_widget.append(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {message}") + + def flush(self): + pass + +class WorkerThread(QThread): + """工作线程类,用于执行耗时操作""" + log_signal = pyqtSignal(str) + progress_signal = pyqtSignal(int) + finished_signal = pyqtSignal(bool, str) + + def __init__(self, task_func, *args, **kwargs): + super().__init__() + self.task_func = task_func + self.args = args + self.kwargs = kwargs + + def run(self): + try: + self.log_signal.emit("开始执行任务...") + result = self.task_func(*self.args, **self.kwargs) + self.finished_signal.emit(True, "任务执行成功") + except Exception as e: + self.log_signal.emit(f"执行失败: {str(e)}") + self.finished_signal.emit(False, f"任务执行失败: {str(e)}") + +class MainWindow(QMainWindow): + """主窗口类""" + + def __init__(self): + super().__init__() + self.worker_threads = [] + self.initUI() + + def initUI(self): + """初始化用户界面""" + self.setWindowTitle('白泽数科数据管理平台') + self.setGeometry(100, 100, 1000, 800) + + # 设置字体 + font = QFont("Microsoft YaHei", 10) + self.setFont(font) + + # 应用白色主题样式 + self.apply_light_theme() + + # 创建中央部件和布局 + central_widget = QWidget() + main_layout = QVBoxLayout() + + # 创建标题 + title_label = QLabel("港股数据管理系统") + title_label.setAlignment(Qt.AlignCenter) + title_label.setStyleSheet(""" + font-size: 18px; + font-weight: bold; + margin: 10px; + color: #333333; + background-color: #e0e0e0; + padding: 10px; + border-radius: 5px; + """) + main_layout.addWidget(title_label) + + # 创建配置选择组 + self.create_config_selection_group(main_layout) + + # 创建功能按钮组 + self.create_button_group(main_layout) + + # 创建进度条 + self.progress_bar = QProgressBar() + # self.progress_bar.setVisible(False) + main_layout.addWidget(self.progress_bar) + self.progress_bar.setStyleSheet(""" + QProgressBar { + border: 2px solid grey; + border-radius: 5px; + text-align: center; /* 文本水平居中 */ + color: black; /* 设置文本颜色,确保与背景对比明显 */ + background-color: #FFFFFF; /* 进度条背景色 */ + } + QProgressBar::chunk { + background-color: #05B8CC; /* 进度块颜色 */ + width: 20px; /* 进度块宽度 */ + margin: 0.5px; /* 进度块间隔 */ + } + """) + + # 创建日志显示区域 + self.create_log_area(main_layout) + + # 设置状态栏 + self.statusBar().showMessage('就绪') + + central_widget.setLayout(main_layout) + self.setCentralWidget(central_widget) + + def create_config_selection_group(self, layout): + """创建配置选择组""" + config_group = QGroupBox("读取股票列表") + config_layout = QHBoxLayout() + + # 配置文件选择标签 + config_label = QLabel("选择股票列表配置文件:") + config_layout.addWidget(config_label) + + # 配置文件选择下拉框 + self.config_combo = QComboBox() + self.config_combo.setToolTip("选择要使用的股票列表配置文件") + + # 获取config目录下的所有txt文件 + config_dir = Path("config") + if config_dir.exists(): + txt_files = [f for f in config_dir.glob("*.txt") if f.is_file()] + for file in txt_files: + self.config_combo.addItem(file.name, str(file)) + + # 如果没有找到文件,添加默认选项 + if self.config_combo.count() == 0: + self.config_combo.addItem("未找到配置文件", "") + + config_layout.addWidget(self.config_combo) + + # 当前选择显示 + self.selected_config_label = QLabel("当前选择: 无") + config_layout.addWidget(self.selected_config_label) + + # 连接选择变化信号 + self.config_combo.currentIndexChanged.connect(self.on_config_changed) + + config_group.setLayout(config_layout) + layout.addWidget(config_group) + + def on_config_changed(self, index): + """配置文件选择变化事件""" + if index >= 0: + file_path = self.config_combo.itemData(index) + if file_path: + self.selected_config_label.setText(f"当前选择: {self.config_combo.itemText(index)}") + self.log_message(f"已选择配置文件: {self.config_combo.itemText(index)}") + else: + self.selected_config_label.setText("当前选择: 无") + + def apply_light_theme(self): + """应用白色主题样式""" + light_stylesheet = """ + QMainWindow { + background-color: #f5f5f5; + color: #333333; + } + QWidget { + background-color: #f5f5f5; + color: #333333; + } + QPushButton { + background-color: #007acc; + color: #ffffff; + border: none; + padding: 8px 16px; + border-radius: 4px; + font-weight: bold; + } + QPushButton:hover { + background-color: #005a9e; + } + QPushButton:pressed { + background-color: #004275; + } + QPushButton:disabled { + background-color: #cccccc; + color: #666666; + } + QGroupBox { + font-weight: bold; + border: 1px solid #cccccc; + border-radius: 5px; + margin-top: 1ex; + padding-top: 10px; + background-color: #ffffff; + } + QGroupBox::title { + subcontrol-origin: margin; + subcontrol-position: top left; + padding: 0 8px; + color: #333333; + } + QTextEdit { + background-color: #ffffff; + color: #333333; + border: 1px solid #cccccc; + border-radius: 3px; + selection-background-color: #007acc; + selection-color: #ffffff; + } + QLabel { + color: #333333; + } + QComboBox { + background-color: #ffffff; + color: #333333; + border: 1px solid #cccccc; + border-radius: 3px; + padding: 5px; + } + QComboBox:drop-down { + subcontrol-origin: padding; + subcontrol-position: top right; + width: 20px; + border-left-width: 1px; + border-left-color: #cccccc; + border-left-style: solid; + } + QComboBox QAbstractItemView { + background-color: #ffffff; + color: #333333; + selection-background-color: #007acc; + selection-color: #ffffff; + } + QProgressBar { + border: 1px solid #cccccc; + border-radius: 3px; + text-align: center; + background-color: #ffffff; + color: #333333; + } + QProgressBar::chunk { + background-color: #007acc; + width: 10px; + } + QStatusBar { + background-color: #ffffff; + color: #333333; + border-top: 1px solid #cccccc; + } + """ + self.setStyleSheet(light_stylesheet) + + def create_button_group(self, layout): + """创建功能按钮组""" + button_group = QGroupBox("功能操作") + button_layout = QHBoxLayout() + + # 更新流通股数量更新流通股数量 + self.btn_float_share = QPushButton('更新流通股数量') + self.btn_float_share.clicked.connect(self.on_share_clicked) + self.btn_float_share.setToolTip('更新流通股数量') + button_layout.addWidget(self.btn_float_share) + + # 检查数据按钮,检测列表中的股票是不是全部下载好了k线数据 + self.btn_check = QPushButton('检查数据完整性') + self.btn_check.clicked.connect(self.on_check_clicked) + self.btn_check.setToolTip('检查数据表的完整性') + button_layout.addWidget(self.btn_check) + + # 更新数据按钮,根据登录OpenD账号的不同,加载不同的股票列表,用于更新K线数据 + self.btn_update = QPushButton('更新K线数据') + self.btn_update.clicked.connect(self.on_update_clicked) + self.btn_update.setToolTip('从Futu API获取并更新股票K线数据') + button_layout.addWidget(self.btn_update) + + # 计算月度平均按钮 + self.btn_calculate = QPushButton('计算月度平均') + self.btn_calculate.clicked.connect(self.on_calculate_clicked) + self.btn_calculate.setToolTip('计算股票的月度平均数据') + button_layout.addWidget(self.btn_calculate) + + # 导出数据按钮 + self.btn_export = QPushButton('导出数据') + self.btn_export.clicked.connect(self.on_export_clicked) + self.btn_export.setToolTip('导出月度平均数据到CSV文件') + button_layout.addWidget(self.btn_export) + + button_group.setLayout(button_layout) + layout.addWidget(button_group) + + def create_log_area(self, layout): + """创建日志显示区域""" + log_group = QGroupBox("操作日志") + log_layout = QVBoxLayout() + + self.log_text = QTextEdit() + self.log_text.setReadOnly(True) + # self.log_text.setMaximumHeight(400) + log_layout.addWidget(self.log_text) + + # 清空日志按钮 + btn_clear = QPushButton('清空日志') + btn_clear.clicked.connect(self.on_clear_log) + log_layout.addWidget(btn_clear) + + log_group.setLayout(log_layout) + layout.addWidget(log_group) + + def log_message(self, message): + """添加日志消息""" + self.log_text.append(f"{datetime.now().strftime('%H:%M:%S')} - {message}") + + def on_clear_log(self): + """清空日志""" + self.log_text.clear() + self.log_message("日志已清空") + + def set_buttons_enabled(self, enabled): + """设置按钮启用状态""" + self.btn_update.setEnabled(enabled) + self.btn_export.setEnabled(enabled) + self.btn_calculate.setEnabled(enabled) + self.btn_check.setEnabled(enabled) + + def on_update_clicked(self): + """更新数据按钮点击事件""" + self.log_message("开始更新K线数据...") + self.set_buttons_enabled(False) + self.progress_bar.setVisible(True) + self.progress_bar.setRange(0, 0) # 不确定进度 + + self.statusBar().showMessage('正在更新数据') + + # 创建工作线程执行更新任务 + worker = WorkerThread(self.update_data) + worker.log_signal.connect(self.log_message) + worker.finished_signal.connect(self.on_task_finished) + worker.start() + self.worker_threads.append(worker) + + def on_export_clicked(self): + """导出数据按钮点击事件""" + self.log_message("开始导出数据...") + self.set_buttons_enabled(False) + + worker = WorkerThread(self.export_data) + worker.log_signal.connect(self.log_message) + worker.finished_signal.connect(self.on_task_finished) + worker.start() + self.worker_threads.append(worker) + + def on_calculate_clicked(self): + """计算月度平均按钮点击事件""" + self.log_message("开始计算月度平均数据...") + self.set_buttons_enabled(False) + + worker = WorkerThread(self.calculate_monthly_avg) + worker.log_signal.connect(self.log_message) + worker.finished_signal.connect(self.on_task_finished) + worker.start() + self.worker_threads.append(worker) + + def on_share_clicked(self): + """更新流通股数量""" + self.log_message("开始更新流通股数量...") + self.set_buttons_enabled(False) + + worker = WorkerThread(self.update_float_share) + worker.log_signal.connect(self.log_message) + worker.finished_signal.connect(self.on_task_finished) + worker.start() + self.worker_threads.append(worker) + + def on_check_clicked(self): + """检查数据按钮点击事件""" + self.log_message("开始检查数据完整性...") + self.set_buttons_enabled(False) + + worker = WorkerThread(self.check_data) + worker.log_signal.connect(self.log_message) + worker.finished_signal.connect(self.on_task_finished) + worker.start() + self.worker_threads.append(worker) + + def on_task_finished(self, success, message): + """任务完成回调""" + self.set_buttons_enabled(True) + # self.progress_bar.setVisible(False) + + if success: + self.log_message(message) + self.statusBar().showMessage('任务完成') + else: + self.log_message(message) + self.statusBar().showMessage('任务失败') + QMessageBox.warning(self, "警告", message) + + def update_data(self): + """更新数据任务""" + try: + # 获取选择的配置文件路径 + current_index = self.config_combo.currentIndex() + if current_index < 0: + self.log_message("错误: 未选择配置文件") + return False + + config_file_path = self.config_combo.itemData(current_index) + if not config_file_path: + self.log_message("错误: 配置文件路径无效") + return False + + updater = KLineUpdater() + self.log_message(f"使用配置文件: {Path(config_file_path).name}") + + # 从选择的配置文件读取股票代码 + stock_codes = updater.read_single_account_stock_codes(config_file_path) + if not stock_codes: + self.log_message("错误: 无法从配置文件读取股票代码或文件为空") + return False + + self.log_message(f"从配置文件获取到 {len(stock_codes)} 个股票代码") + + # # 显示前几个代码作为示例 + # if stock_codes: + # sample_codes = stock_codes[:5] + # self.log_message(f"示例代码: {', '.join(sample_codes)}") + + # 更新数据 + updater.update_kline_data(stock_codes=stock_codes) + return True + except Exception as e: + self.log_message(f"更新数据失败: {str(e)}") + raise + + def export_data(self): + """导出数据任务""" + try: + self.log_message("开始导出月度平均数据...") + # 使用现有的导出函数 + exporter = DataExporter(Config.ConfigInfo.db_hk_kline_1d) + + # 根据导出时间命名 + target_table_name = 'hk_monthly_avg_' + datetime.now().strftime("%Y%m%d") + target_file_name = 'hk_monthly_avg_' + datetime.now().strftime("%Y%m%d") + ".csv" + + success = exporter.export_data( + monthly_table=target_table_name, + csv_file=target_file_name + ) + + if success: + self.log_message(f"数据导出成功: {target_file_name}") + return True + else: + self.log_message("数据导出失败") + return False + except Exception as e: + self.log_message(f"导出数据失败: {str(e)}") + raise + + def calculate_monthly_avg(self): + """计算月度平均任务""" + try: + self.log_message("开始计算月度平均数据...") + calculator = MarketDataCalculator(Config.ConfigInfo.db_hk_kline_1d) + + # 移除人民币交易的股票:股票名称最后一个字符为R,误删除的从配置文件读回来 + reserved_codes = calculator.read_stock_codes_list(Path.cwd() / "config" / "Reservedcode.txt") + market_data_ll = calculator.get_stock_codes() # 使用按照价格和流通股数量筛选的那个表格 + market_data = market_data_ll + reserved_codes + + # 根据统计时间进行命名 + target_table_name = 'hk_monthly_avg_' + datetime.now().strftime("%Y%m%d") + + self.log_message(f"开始处理 {len(market_data)} 支股票...") + + # 使用tqdm创建进度条 + for code in tqdm(market_data, desc="处理股票数据", unit="支"): + tablename = 'hk_' + code[3:] + # 计算并保存月度均值 + calculator.calculate_and_save_monthly_avg( + source_table=tablename, + target_table=target_table_name + ) + + # self.log_message("月度平均计算完成") + return True + except Exception as e: + self.log_message(f"计算月度平均失败: {str(e)}") + raise + + def update_float_share(self): + """更新流通股数量任务""" + try: + futuStockFilter = FutuStockFilter(Config.ConfigInfo.db_hk_kline_1d) + futuStockFilter.run_direct_import() + except Exception as e: + self.log_message(f"流通股数量更新失败: {str(e)}") + raise + + def check_data(self): + """检查数据任务""" + try: + self.log_message("开始检查数据完整性...") + checker = StockTableChecker(Config.ConfigInfo.db_hk_kline_1d) + checker.run_check() + self.log_message("数据检查完成") + return True + except Exception as e: + self.log_message(f"数据检查失败: {str(e)}") + raise + + def closeEvent(self, event): + """窗口关闭事件""" + # 确保所有工作线程都已完成 + for thread in self.worker_threads: + if thread.isRunning(): + thread.terminate() + thread.wait() + event.accept() + +def main(): + """主函数""" + app = QApplication(sys.argv) + + # 设置应用程序样式 + app.setStyle('Fusion') + + window = MainWindow() + window.show() + + sys.exit(app.exec_()) + +if __name__ == '__main__': + main() diff --git a/程序调用关系分析.md b/程序调用关系分析.md new file mode 100644 index 0000000..1dc25fb --- /dev/null +++ b/程序调用关系分析.md @@ -0,0 +1,179 @@ +# 香港股票数据管理系统 - 程序结构分析与调用关系 + +## 项目整体结构 + +``` +HKDataManagment/ +├── main_gui.py # PyQt5 GUI主界面 (主要入口点) +├── main.py # 命令行演示程序 +├── UpdateFutuData/ +│ ├── KLineUpdater.py # K线数据更新器类 (核心业务逻辑) +│ ├── ConditionalSelection.py # 股票筛选器 +│ ├── checktable.py # 数据表检查器 +│ └── ...其他Futu相关模块 +├── DataAnalysis/ +│ ├── DataExporter.py # 数据导出器 +│ ├── MarketDataCalculator.py # 市场数据计算器 +│ ├── checktable.py # 数据表检查器 +│ └── ...其他数据分析模块 +├── base/ +│ ├── MySQLHelper.py # MySQL数据库操作助手 +│ ├── LogHelper.py # 日志记录助手 +│ ├── Config.py # 配置管理 +│ └── ...其他基础工具类 +├── export.py # 数据导出脚本 +├── getKline.py # K线数据获取脚本 +└── config/ # 配置文件目录 +``` + +## 主要模块功能说明 + +### 1. GUI层 (main_gui.py) +- **功能**: 提供图形用户界面,通过按钮控制功能执行 +- **核心类**: `MainWindow`, `WorkerThread` +- **依赖**: PyQt5, 所有业务模块 + +### 2. 业务逻辑层 (UpdateFutuData/) +- **KLineUpdater.py**: 封装K线数据更新功能 +- **ConditionalSelection.py**: 股票条件筛选 +- **checktable.py**: 数据表完整性检查 + +### 3. 数据分析层 (DataAnalysis/) +- **DataExporter.py**: 数据导出功能 +- **MarketDataCalculator.py**: 市场数据计算 +- **checktable.py**: 数据表检查 + +### 4. 基础工具层 (base/) +- **MySQLHelper.py**: 数据库操作封装 +- **LogHelper.py**: 日志记录管理 +- **Config.py**: 配置信息管理 + +## 详细调用关系 + +### 核心调用链 + +#### 1. 更新K线数据流程 +``` +main_gui.py (用户点击更新按钮) + → WorkerThread.run() + → MainWindow.update_data() + → KLineUpdater.update_kline_data() + → KLineUpdater.get_stock_codes() + → MySQLHelper.execute_query() + → KLineUpdater.save_quotes_to_db() + → MySQLHelper.execute_many() + → Futu API调用 (通过get_market_data) +``` + +#### 2. 导出数据流程 +``` +main_gui.py (用户点击导出按钮) + → WorkerThread.run() + → MainWindow.export_data() + → DataExporter.export_data() + → MySQLHelper.execute_query() + → 文件系统操作 (生成CSV) +``` + +#### 3. 计算月度平均流程 +``` +main_gui.py (用户点击计算按钮) + → WorkerThread.run() + → MainWindow.calculate_monthly_avg() + → MarketDataCalculator.calculate_and_save_monthly_avg() + → MySQLHelper多次调用 + → 文件系统操作 (读取配置文件) +``` + +#### 4. 检查数据完整性流程 +``` +main_gui.py (用户点击检查按钮) + → WorkerThread.run() + → MainWindow.check_data() + → StockTableChecker.run_check() + → MySQLHelper.execute_query() +``` + +### 模块间依赖关系图 + +``` +main_gui.py (GUI层) + │ + ├── 依赖于 UpdateFutuData.KLineUpdater + ├── 依赖于 DataAnalysis.DataExporter + ├── 依赖于 DataAnalysis.MarketDataCalculator + ├── 依赖于 DataAnalysis.checktable.StockTableChecker + └── 依赖于 base.Config + │ + └── 所有业务模块依赖 base.MySQLHelper 和 base.LogHelper + │ + └── MySQLHelper 依赖 MySQL连接库 + └── LogHelper 依赖 logging模块 +``` + +### 数据流动方向 + +1. **数据输入流**: + ``` + Futu API → KLineUpdater → MySQL数据库 + ``` + +2. **数据处理流**: + ``` + MySQL数据库 → MarketDataCalculator → MySQL数据库 (计算结果) + ``` + +3. **数据输出流**: + ``` + MySQL数据库 → DataExporter → CSV文件 + ``` + +4. **数据验证流**: + ``` + MySQL数据库 → StockTableChecker → 日志输出 + ``` + +## 关键类与方法说明 + +### KLineUpdater 类主要方法 +- `get_market_data()`: 从Futu API获取市场数据 +- `preprocess_quote_data()`: 预处理行情数据 +- `save_quotes_to_db()`: 保存数据到数据库 +- `update_kline_data()`: 主更新方法 + +### MySQLHelper 类主要方法 +- `execute_query()`: 执行查询语句 +- `execute_update()`: 执行更新语句 +- `execute_many()`: 批量执行操作 +- `table_exists()`: 检查表是否存在 + +### DataExporter 类主要方法 +- `export_data()`: 导出数据到CSV文件 +- 其他数据格式化方法 + +### MarketDataCalculator 类主要方法 +- `calculate_and_save_monthly_avg()`: 计算并保存月度平均值 +- `get_stock_codes()`: 获取股票代码列表 + +## 架构特点总结 + +1. **分层架构设计**: GUI层、业务逻辑层、数据访问层清晰分离 +2. **依赖注入模式**: 通过配置类统一管理依赖项 +3. **多线程处理**: 使用QThread避免阻塞GUI界面 +4. **模块化组织**: 功能模块职责单一,便于维护扩展 +5. **错误处理机制**: 统一的日志记录和异常处理 + +## 使用示例 + +```python +# 初始化KLineUpdater +updater = KLineUpdater() + +# 更新指定股票数据 +updater.update_kline_data(stock_codes=['00700.HK', '00941.HK']) + +# 从GUI调用 +# 用户点击按钮 → 后台线程执行 → 更新完成反馈 +``` + +这个调用关系分析帮助理解系统如何从用户界面操作到底层数据处理的完整流程,便于后续维护和功能扩展。 diff --git a/项目改进建议.md b/项目改进建议.md new file mode 100644 index 0000000..4ea4470 --- /dev/null +++ b/项目改进建议.md @@ -0,0 +1,218 @@ +# 香港股票数据管理系统 - 项目分析与改进建议 + +## 项目概述 + +本项目是一个香港股票数据管理系统,主要功能包括: +- 从Futu API获取股票K线数据 +- 处理和分析股票数据 +- 将数据存储到MySQL数据库 +- 导出数据到CSV文件 +- 计算月度平均数据 + +## 当前项目结构分析 + +### 主要文件结构 +``` +HKDataManagment/ +├── base/ # 基础工具类 +│ ├── Config.py # 配置管理 +│ ├── LogHelper.py # 日志工具 +│ └── MySQLHelper.py # 数据库操作工具 +├── config/ # 配置文件目录 +├── DataAnalysis/ # 数据分析模块 +├── UpdateFutuData/ # Futu数据更新模块 +├── export.py # 数据导出脚本 +├── getKline.py # K线数据获取脚本 +├── main.py # 主演示文件 +└── updatekline.py # K线更新类(已重构) +``` + +## 主要问题识别 + +### 1. 代码重复和功能重叠 +- `export.py` 和 `getKline.py` 存在大量重复代码(导入语句、函数定义) +- 两个文件都包含相同的 `exportMonthlyAvgData` 和 `calculate_update_monthly_avg_table` 函数 + +### 2. 缺乏统一的入口点 +- 多个独立的脚本文件(export.py, getKline.py, main.py) +- 没有统一的主程序来协调各个功能模块 + +### 3. 配置管理分散 +- 数据库配置在多个文件中硬编码 +- 缺少统一的配置管理系统 + +### 4. 错误处理不完善 +- 部分异常处理不够详细 +- 日志记录可以更加规范化 + +### 5. 模块化程度不足 +- 功能模块之间的耦合度较高 +- 可复用性有待提高 + +## 改进建议 + +### 1. 代码重构和去重 +**建议措施:** +- 将 `export.py` 和 `getKline.py` 中的重复函数提取到公共模块 +- 创建统一的工具类或模块来共享通用功能 + +**示例代码结构:** +```python +# common/utils.py +def exportMonthlyAvgData(db_config, table_suffix): + # 通用导出函数 + pass + +def calculate_update_monthly_avg_table(db_config, reserved_codes_path): + # 通用计算函数 + pass +``` + +### 2. 创建统一的主程序 +**建议措施:** +- 开发一个主命令行界面(CLI)应用程序 +- 使用 argparse 或 click 库创建命令子系统 + +**示例结构:** +```python +# main_cli.py +import argparse +from UpdateFutuData.KLineUpdater import KLineUpdater +from common.utils import exportMonthlyAvgData, calculate_update_monthly_avg_table + +def main(): + parser = argparse.ArgumentParser(description='香港股票数据管理系统') + subparsers = parser.add_subparsers(dest='command') + + # 更新数据命令 + update_parser = subparsers.add_parser('update', help='更新K线数据') + update_parser.add_argument('--codes', nargs='+', help='股票代码列表') + + # 导出数据命令 + export_parser = subparsers.add_parser('export', help='导出数据') + + args = parser.parse_args() + + if args.command == 'update': + updater = KLineUpdater() + updater.update_kline_data(stock_codes=args.codes) + elif args.command == 'export': + exportMonthlyAvgData() +``` + +### 3. 改进配置管理 +**建议措施:** +- 创建统一的配置文件(如 config.yaml 或 config.ini) +- 使用环境变量进行敏感信息管理 +- 实现配置的热重载功能 + +**示例配置:** +```yaml +# config/config.yaml +database: + host: localhost + user: root + password: ${DB_PASSWORD} + database: hk_kline_1d + +futu: + host: 127.0.0.1 + port: 11111 + +paths: + data: ./data + config: ./config + logs: ./logs +``` + +### 4. 增强错误处理和日志 +**建议措施:** +- 实现更详细的异常分类和处理 +- 添加重试机制对于API调用 +- 完善日志记录,包括操作审计日志 + +**示例改进:** +```python +def get_market_data_with_retry(self, market, max_retries=3): + for attempt in range(max_retries): + try: + return self.get_market_data(market) + except Exception as e: + self.logger.warning(f"获取市场数据失败,尝试 {attempt + 1}/{max_retries}: {e}") + time.sleep(2 ** attempt) # 指数退避 + raise Exception(f"无法获取 {market} 市场数据,已达到最大重试次数") +``` + +### 5. 提高模块化和可测试性 +**建议措施:** +- 使用依赖注入模式 +- 增加单元测试和集成测试 +- 创建接口抽象层 + +**示例改进:** +```python +# 定义数据访问接口 +class IDataProvider(ABC): + @abstractmethod + def get_market_data(self, market) -> List[str]: + pass + + @abstractmethod + def get_kline_data(self, code, start_date, end_date) -> pd.DataFrame: + pass + +# Futu实现 +class FutuDataProvider(IDataProvider): + def __init__(self, host, port): + self.host = host + self.port = port + + def get_market_data(self, market): + # 实现具体逻辑 + pass +``` + +### 6. 性能优化建议 +**建议措施:** +- 实现批量操作减少数据库IO +- 添加缓存机制减少API调用 +- 使用异步IO提高并发性能 + +**示例改进:** +```python +async def update_multiple_stocks(self, stock_codes): + tasks = [] + for code in stock_codes: + task = asyncio.create_task(self.update_single_stock(code)) + tasks.append(task) + + results = await asyncio.gather(*tasks, return_exceptions=True) + return results +``` + +### 7. 文档和维护建议 +**建议措施:** +- 编写详细的API文档 +- 创建项目README和使用指南 +- 设置自动化测试和CI/CD流水线 + +## 实施优先级建议 + +1. **高优先级**: + - 代码去重和重构 + - 统一配置管理 + - 增强错误处理 + +2. **中优先级**: + - 创建统一CLI界面 + - 提高模块化程度 + - 添加基本测试 + +3. **低优先级**: + - 性能优化 + - 高级功能开发 + - 自动化部署 + +## 总结 + +本项目有良好的基础架构,但需要通过重构提高代码质量、可维护性和可扩展性。建议按照上述改进措施逐步优化,首先解决代码重复和配置管理问题,然后逐步实现更高级的功能和优化。