Files
HKDataManagment/UpdateFutuData/KLineUpdater.py
2025-09-12 10:25:31 +08:00

416 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
KLineUpdater 类用于更新日 K 数据
封装了 Futu API 操作和数据库存储功能,提供类接口供其他模块调用
"""
from futu import *
from base.MySQLHelper import MySQLHelper
from base.LogHelper import LogHelper
from datetime import datetime
from typing import Optional, List, Dict
from UpdateFutuData.ConditionalSelection import FutuStockFilter
from tqdm import tqdm
import pandas as pd
import time
import os
class KLineUpdater:
"""
K线数据更新器类
功能:
- 从Futu API获取股票K线数据
- 处理数据并保存到MySQL数据库
- 管理流通股数量信息
- 提供完整的股票数据更新流程
"""
def __init__(self, db_config: dict = None, logger_name: str = 'KLine'):
"""
初始化KLineUpdater
Args:
db_config (dict): 数据库配置字典
logger_name (str): 日志器名称
"""
# 默认数据库配置
self.db_config = db_config or {
'host': 'localhost',
'user': 'root',
'password': 'bzskmysql',
'database': 'hk_kline_1d'
}
# 初始化日志
self.logger = LogHelper(logger_name=logger_name).setup()
self.logger.info("KLineUpdater 初始化完成")
def get_market_data(self, market: Market) -> List[str]:
"""
从Futu API获取指定市场的股票代码列表
Args:
market (Market): 市场枚举值,如 Market.SH, Market.SZ
Returns:
List[str]: 股票代码列表
"""
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
try:
ret, data = quote_ctx.get_stock_basicinfo(market, SecurityType.STOCK)
if ret == RET_OK:
# 提取code列并转换为列表
codes = data['code'].astype(str).tolist()
self.logger.info(f"获取到 {market} 市场 {len(codes)} 个股票代码")
return codes
else:
self.logger.error(f"获取股票代码失败: {data}")
return []
except Exception as e:
self.logger.error(f"获取股票代码时发生异常: {str(e)}")
return []
finally:
quote_ctx.close()
def preprocess_quote_data(self, df: pd.DataFrame, float_share: Optional[int] = None) -> List[Dict]:
"""
预处理行情数据,转换为适合数据库存储的格式
Args:
df (pd.DataFrame): 原始行情数据DataFrame
float_share (Optional[int]): 流通股数量
Returns:
List[Dict]: 处理后的数据列表
"""
processed_data = []
for _, row in df.iterrows():
try:
# 提取市场标识
market = row['code'].split('.')[0] if '.' in row['code'] else 'UNKNOWN'
# 转换时间格式
trade_time = datetime.strptime(row['time_key'], '%Y-%m-%d %H:%M:%S')
item = {
'stock_code': row['code'],
'stock_name': row['name'],
'trade_date': trade_time,
'open_price': float(row['open']),
'close_price': float(row['close']),
'high_price': float(row['high']),
'low_price': float(row['low']),
'pe_ratio': float(row['pe_ratio']) if pd.notna(row['pe_ratio']) else None,
'turnover_rate': float(row['turnover_rate']) if pd.notna(row['turnover_rate']) else None,
'volume': int(row['volume']) if pd.notna(row['volume']) else None,
'turnover': float(row['turnover']) if pd.notna(row['turnover']) else None,
'change_rate': float(row['change_rate']) if pd.notna(row['change_rate']) else None,
'last_close': float(row['last_close']) if pd.notna(row['last_close']) else None,
'market': market,
'float_share': float_share # 添加流通股数量字段
}
processed_data.append(item)
except Exception as e:
self.logger.warning(f"处理行情数据时跳过异常行 {row.get('code', '未知')}: {str(e)}")
continue
return processed_data
def get_float_share(self, quote_ctx: OpenQuoteContext, code: str) -> Optional[int]:
"""
获取股票的流通股数量
Args:
quote_ctx (OpenQuoteContext): Futu API连接上下文
code (str): 股票代码
Returns:
Optional[int]: 流通股数量获取失败返回None
"""
try:
# 获取股票快照
ret, snapshot = quote_ctx.get_market_snapshot([code])
if ret == RET_OK and not snapshot.empty:
# 提取流通股数量
float_share = snapshot.iloc[0].get('float_share')
if pd.notna(float_share):
return int(float_share)
return None
except Exception as e:
self.logger.error(f"获取股票 {code} 的流通股数量失败: {str(e)}")
return None
def get_float_share_data(self, table_name: str) -> Optional[Dict[str, int]]:
"""
从指定表读取流通股本数据并构建字典
Args:
table_name: 源数据表名
Returns:
Dict[str, int]: 股票代码到流通股数量的映射字典失败返回None
"""
try:
with MySQLHelper(**self.db_config) as db:
# 查询流通股本数据
data = db.execute_query(f"""
SELECT stock_code, float_share
FROM {table_name}
WHERE float_share IS NOT NULL
""")
if not data:
self.logger.error(f"{table_name} 中没有流通股本数据")
return None
# 构建股票代码到流通股数量的映射字典
float_share_dict = {}
for row in data:
stock_code = row['stock_code']
float_share = row['float_share']
if stock_code and float_share is not None:
float_share_dict[stock_code] = int(float_share)
self.logger.info(f"成功从 {table_name} 表加载 {len(float_share_dict)} 条流通股数据")
return float_share_dict
except Exception as e:
self.logger.error(f"从数据库读取流通股本数据失败: {str(e)}")
return None
def save_quotes_to_db(self, quote_data: pd.DataFrame, table_name: str = 'stock_quotes', float_share: Optional[int] = None) -> bool:
"""
将行情数据保存到数据库
Args:
quote_data (pd.DataFrame): 行情数据DataFrame
table_name (str): 目标表名(默认为'stock_quotes')
float_share (Optional[int]): 流通股数量
Returns:
bool: 是否成功保存
"""
# 预处理数据
processed_data = self.preprocess_quote_data(quote_data, float_share)
if not processed_data:
self.logger.error(f"没有有效数据需要保存,表:{table_name}")
return False
# 动态生成SQL插入语句
insert_sql = f"""
INSERT INTO {table_name} (
stock_code, stock_name, trade_date, open_price, close_price, high_price, low_price,
pe_ratio, turnover_rate, volume, turnover, change_rate, last_close, market, float_share
) VALUES (
%(stock_code)s, %(stock_name)s, %(trade_date)s, %(open_price)s, %(close_price)s, %(high_price)s, %(low_price)s,
%(pe_ratio)s, %(turnover_rate)s, %(volume)s, %(turnover)s, %(change_rate)s, %(last_close)s, %(market)s, %(float_share)s
)
ON DUPLICATE KEY UPDATE
stock_name = VALUES(stock_name),
open_price = VALUES(open_price),
close_price = VALUES(close_price),
high_price = VALUES(high_price),
low_price = VALUES(low_price),
pe_ratio = VALUES(pe_ratio),
turnover_rate = VALUES(turnover_rate),
volume = VALUES(volume),
turnover = VALUES(turnover),
change_rate = VALUES(change_rate),
last_close = VALUES(last_close),
float_share = VALUES(float_share)
"""
try:
with MySQLHelper(**self.db_config) as db:
# 检查表是否存在,不存在则创建
if not db.table_exists(table_name):
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id INT AUTO_INCREMENT PRIMARY KEY,
stock_code VARCHAR(20) NOT NULL COMMENT '股票代码',
stock_name VARCHAR(50) COMMENT '股票名称',
trade_date DATETIME NOT NULL COMMENT '交易日期时间',
open_price DECIMAL(10, 3) COMMENT '开盘价',
close_price DECIMAL(10, 3) COMMENT '收盘价',
high_price DECIMAL(10, 3) COMMENT '最高价',
low_price DECIMAL(10, 3) COMMENT '最低价',
pe_ratio DECIMAL(10, 3) COMMENT '市盈率',
turnover_rate DECIMAL(10, 6) COMMENT '换手率(%)',
volume BIGINT COMMENT '成交量(股)',
turnover DECIMAL(20, 2) COMMENT '成交额(元)',
change_rate DECIMAL(10, 6) COMMENT '涨跌幅(%)',
last_close DECIMAL(10, 3) COMMENT '昨收价',
market VARCHAR(10) COMMENT '市场标识',
float_share BIGINT COMMENT '流通股数量',
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
UNIQUE KEY uk_code_date (stock_code, trade_date),
KEY idx_trade_date (trade_date),
KEY idx_stock_code (stock_code)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='股票行情数据表'
"""
db.execute_update(create_table_sql)
self.logger.info(f"创建了新表: {table_name}")
affected_rows = db.execute_many(insert_sql, processed_data)
# self.logger.info(f"成功插入/更新 {affected_rows} 条行情记录到表 {table_name}")
return True
except Exception as e:
self.logger.error(f"保存行情数据到表 {table_name} 失败: {str(e)}")
return False
def read_stock_codes(self, file_path='config\\hang_futu.txt'):
"""
基础读取方法 - 按行读取所有内容
Args:
file_path (str): 文件路径
Returns:
List[str]: 股票代码列表
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 去除每行末尾的换行符,并过滤空行
codes = [line.strip() for line in lines if line.strip()]
return codes
except FileNotFoundError:
self.logger.error(f"文件 {file_path} 不存在")
return []
except Exception as e:
self.logger.error(f"读取文件失败: {str(e)}")
return []
def read_single_account_stock_codes(self, file_path='data\\missing_tables_小航富途.txt'):
"""
基础读取方法 - 按行读取所有内容
Args:
file_path (str): 文件路径
Returns:
List[str]: 股票代码列表
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 去除每行末尾的换行符,并过滤空行
codes = [line.strip() for line in lines if line.strip()]
return codes
except FileNotFoundError:
self.logger.error(f"文件 {file_path} 不存在")
return []
except Exception as e:
self.logger.error(f"读取文件失败: {str(e)}")
return []
def get_stock_codes(self) -> List[str]:
"""
从conditionalselection表获取所有股票代码
Returns:
List[str]: 股票代码列表
"""
try:
with MySQLHelper(**self.db_config) as db:
sql = f"SELECT DISTINCT stock_code FROM stock_filter"
results = db.execute_query(sql)
return [row['stock_code'] for row in results if row['stock_code']]
except Exception as e:
self.logger.error(f"获取股票代码失败: {str(e)}")
return []
def write_missing_codes_to_txt(self, missing_codes: list, filename: str = "config\\missing_codes.txt"):
"""
将缺失的股票代码追加到TXT文件如果文件不存在则创建
Args:
missing_codes (list): 缺失的股票代码列表
filename (str): 目标文件名
"""
try:
# 确保目录存在
directory = os.path.dirname(filename)
if directory and not os.path.exists(directory):
os.makedirs(directory)
self.logger.info(f"创建目录: {directory}")
with open(filename, 'a', encoding='utf-8') as f:
for code in missing_codes:
f.write(f"\n{code}")
self.logger.info(f"已将 {len(missing_codes)} 个缺失表对应的股票代码写入 {filename}")
except Exception as e:
self.logger.error(f"写入TXT文件失败: {str(e)}")
def update_kline_data(self, stock_codes: List[str] = None, table_name: str = 'stock_quotes'):
"""
主更新方法 - 更新指定股票代码的K线数据
Args:
stock_codes (List[str]): 要更新的股票代码列表如果为None则从数据库获取
table_name (str): 目标表名
"""
if stock_codes is None:
stock_codes = self.get_stock_codes()
if not stock_codes:
self.logger.warning("没有股票代码需要更新")
return
self.logger.info(f"开始更新 {len(stock_codes)} 个股票的K线数据")
# 这里可以添加具体的更新逻辑
# 例如:遍历股票代码,获取数据并保存到数据库
# 每天收盘后更新数据 -> 操作界面中,这个参数需要放出来
start_date = (datetime.now() - timedelta(days = 3)).strftime("%Y-%m-%d")
end_date = (datetime.now() + timedelta(days = 1)).strftime("%Y-%m-%d")
# 获取流通股数据字典
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
float_share_dict = self.get_float_share_data('stock_filter') # 假设数据在stock_filter表中
for code in tqdm(stock_codes, desc="更新K线数据"):
try:
# 从字典中获取流通股数量
float_share = float_share_dict.get(code) if float_share_dict else None
# # 如果字典中没有找到尝试从API获取
# if float_share is None:
# float_share = get_float_share(quote_ctx, code)
# if float_share is not None:
# logger.info(f"从API获取股票 {code} 的流通股数量: {float_share}")
# else:
# logger.warning(f"无法获取股票 {code} 的流通股数量")
# 获取历史K线数据
ret, data, page_req_key = quote_ctx.request_history_kline(code, start=start_date, end=end_date, max_count=100)
# 保存数据到自定义表
custom_table_name = 'hk_' + code[3:] # 自定义表名
if ret == RET_OK:
success = self.save_quotes_to_db( data, custom_table_name, float_share)
else:
logger.error(f'error:{data}')
isWhile = False # 只返回一页时增加请求延迟
while page_req_key != None: # 请求后面的所有结果
isWhile = True
ret, data, page_req_key = quote_ctx.request_history_kline(code, start=start_date, end=end_date, max_count=100, page_req_key=page_req_key) # 请求翻页后的数据
if ret == RET_OK:
success = self.save_quotes_to_db(data, custom_table_name, float_share)
else:
logger.error(f'error:{ data}')
time.sleep(0.7)
if isWhile == False:
time.sleep(0.7)
except Exception as e:
self.logger.error(f"更新股票 {code} 数据失败: {str(e)}")
continue
quote_ctx.close() # 结束后记得关闭当条连接,防止连接条数用尽
self.logger.info("K线数据更新完成")