Files
HKDataManagment/PyCode/updatekline.py

364 lines
15 KiB
Python
Raw Normal View History

2025-08-15 13:22:58 +08:00
"""
2025-08-20 00:00:14 +08:00
本代码用于更新日 K 数据
由于行情数据限制至少需要3个账号进行数据更新
更新内容每个交易日日K数据以及当日流通股数量(用于计算流通和市值)
程序流程
更新 hk_stock_list table 存放全部静态数据这是一个总表
读取股票列表三个OpenD账号的数据存放在三个 TXT 文件中
根据股票代码获取股票历史数据
根据股票代码获取流通股数量
根据股票代码新建/更新数据表
2025-08-20 14:09:35 +08:00
更新完成需检查 数据是否完整 -> checktable.py 操作界面做好了之后再融合到整体代码中
2025-08-15 13:22:58 +08:00
"""
from futu import *
from MySQLHelper import MySQLHelper # MySQLHelper类保存为单独文件
2025-08-20 00:00:14 +08:00
from LogHelper import LogHelper
2025-08-15 13:22:58 +08:00
from datetime import datetime
2025-08-20 14:09:35 +08:00
from typing import Optional, List, Dict
2025-08-20 00:00:14 +08:00
from ConditionalSelection import FutuStockFilter
from tqdm import tqdm
import pandas as pd
2025-08-20 14:09:35 +08:00
import time
# 基本用法(自动创建日期日志+控制台输出)
logger = LogHelper(logger_name = 'KLine').setup()
2025-08-15 13:22:58 +08:00
def get_market_data(market: Market) -> List[str]:
"""
从Futu API获取指定市场的股票代码列表
Args:
market (Market): 市场枚举值 Market.SH, Market.SZ
Returns:
List[str]: 股票代码列表
"""
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
try:
ret, data = quote_ctx.get_stock_basicinfo(market, SecurityType.STOCK)
if ret == RET_OK:
# 提取code列并转换为列表
codes = data['code'].astype(str).tolist()
2025-08-20 14:09:35 +08:00
logger.info(f"获取到 {market} 市场 {len(codes)} 个股票代码")
2025-08-15 13:22:58 +08:00
return codes
else:
2025-08-20 14:09:35 +08:00
logger.error(f"获取股票代码失败: {data}")
2025-08-15 13:22:58 +08:00
return []
except Exception as e:
2025-08-20 14:09:35 +08:00
logger.error(f"获取股票代码时发生异常: {str(e)}")
2025-08-15 13:22:58 +08:00
return []
finally:
quote_ctx.close()
2025-08-20 00:00:14 +08:00
def preprocess_quote_data(df: pd.DataFrame, float_share: Optional[int] = None) -> List[Dict]:
2025-08-15 13:22:58 +08:00
"""
预处理行情数据转换为适合数据库存储的格式
Args:
df (pd.DataFrame): 原始行情数据DataFrame
2025-08-20 00:00:14 +08:00
float_share (Optional[int]): 流通股数量
2025-08-15 13:22:58 +08:00
Returns:
List[Dict]: 处理后的数据列表
"""
processed_data = []
for _, row in df.iterrows():
try:
# 提取市场标识
market = row['code'].split('.')[0] if '.' in row['code'] else 'UNKNOWN'
# 转换时间格式
trade_time = datetime.strptime(row['time_key'], '%Y-%m-%d %H:%M:%S')
item = {
'stock_code': row['code'],
'stock_name': row['name'],
'trade_date': trade_time,
'open_price': float(row['open']),
'close_price': float(row['close']),
'high_price': float(row['high']),
'low_price': float(row['low']),
'pe_ratio': float(row['pe_ratio']) if pd.notna(row['pe_ratio']) else None,
'turnover_rate': float(row['turnover_rate']) if pd.notna(row['turnover_rate']) else None,
'volume': int(row['volume']) if pd.notna(row['volume']) else None,
'turnover': float(row['turnover']) if pd.notna(row['turnover']) else None,
'change_rate': float(row['change_rate']) if pd.notna(row['change_rate']) else None,
'last_close': float(row['last_close']) if pd.notna(row['last_close']) else None,
2025-08-20 00:00:14 +08:00
'market': market,
'float_share': float_share # 添加流通股数量字段
2025-08-15 13:22:58 +08:00
}
processed_data.append(item)
except Exception as e:
2025-08-20 14:09:35 +08:00
logger.warning(f"处理行情数据时跳过异常行 {row.get('code', '未知')}: {str(e)}")
2025-08-15 13:22:58 +08:00
continue
return processed_data
2025-08-20 00:00:14 +08:00
def get_float_share(quote_ctx: OpenQuoteContext, code: str) -> Optional[int]:
"""
获取股票的流通股数量
Args:
quote_ctx (OpenQuoteContext): Futu API连接上下文
code (str): 股票代码
Returns:
Optional[int]: 流通股数量获取失败返回None
"""
try:
# 获取股票快照
ret, snapshot = quote_ctx.get_market_snapshot([code])
if ret == RET_OK and not snapshot.empty:
# 提取流通股数量
float_share = snapshot.iloc[0].get('float_share')
if pd.notna(float_share):
return int(float_share)
return None
except Exception as e:
2025-08-20 14:09:35 +08:00
logger.error(f"获取股票 {code} 的流通股数量失败: {str(e)}")
2025-08-20 00:00:14 +08:00
return None
def get_float_share_data(db_config: dict, table_name: str) -> Optional[Dict[str, int]]:
"""
从指定表读取流通股本数据并构建字典
Args:
db_config: 数据库配置
table_name: 源数据表名
Returns:
Dict[str, int]: 股票代码到流通股数量的映射字典失败返回None
"""
try:
with MySQLHelper(**db_config) as db:
# 查询流通股本数据
data = db.execute_query(f"""
SELECT stock_code, float_share
FROM {table_name}
WHERE float_share IS NOT NULL
""")
if not data:
2025-08-20 14:09:35 +08:00
logger.error(f"{table_name} 中没有流通股本数据")
2025-08-20 00:00:14 +08:00
return None
# 构建股票代码到流通股数量的映射字典
float_share_dict = {}
for row in data:
stock_code = row['stock_code']
float_share = row['float_share']
if stock_code and float_share is not None:
float_share_dict[stock_code] = int(float_share)
2025-08-20 14:09:35 +08:00
logger.info(f"成功从 {table_name} 表加载 {len(float_share_dict)} 条流通股数据")
2025-08-20 00:00:14 +08:00
return float_share_dict
except Exception as e:
2025-08-20 14:09:35 +08:00
logger.error(f"从数据库读取流通股本数据失败: {str(e)}")
2025-08-20 00:00:14 +08:00
return None
def save_quotes_to_db(db_config: dict, quote_data: pd.DataFrame, table_name: str = 'stock_quotes', float_share: Optional[int] = None) -> bool:
2025-08-15 13:22:58 +08:00
"""
将行情数据保存到数据库
Args:
db_config (dict): 数据库配置
quote_data (pd.DataFrame): 行情数据DataFrame
table_name (str): 目标表名(默认为'stock_quotes')
2025-08-20 00:00:14 +08:00
float_share (Optional[int]): 流通股数量
2025-08-15 13:22:58 +08:00
Returns:
bool: 是否成功保存
"""
# 预处理数据
2025-08-20 00:00:14 +08:00
processed_data = preprocess_quote_data(quote_data, float_share)
2025-08-15 13:22:58 +08:00
if not processed_data:
2025-08-20 14:09:35 +08:00
logger.error("没有有效数据需要保存")
2025-08-15 13:22:58 +08:00
return False
# 动态生成SQL插入语句
insert_sql = f"""
INSERT INTO {table_name} (
stock_code, stock_name, trade_date, open_price, close_price, high_price, low_price,
2025-08-20 00:00:14 +08:00
pe_ratio, turnover_rate, volume, turnover, change_rate, last_close, market, float_share
2025-08-15 13:22:58 +08:00
) VALUES (
%(stock_code)s, %(stock_name)s, %(trade_date)s, %(open_price)s, %(close_price)s, %(high_price)s, %(low_price)s,
2025-08-20 00:00:14 +08:00
%(pe_ratio)s, %(turnover_rate)s, %(volume)s, %(turnover)s, %(change_rate)s, %(last_close)s, %(market)s, %(float_share)s
2025-08-15 13:22:58 +08:00
)
ON DUPLICATE KEY UPDATE
stock_name = VALUES(stock_name),
open_price = VALUES(open_price),
close_price = VALUES(close_price),
high_price = VALUES(high_price),
low_price = VALUES(low_price),
pe_ratio = VALUES(pe_ratio),
turnover_rate = VALUES(turnover_rate),
volume = VALUES(volume),
turnover = VALUES(turnover),
change_rate = VALUES(change_rate),
2025-08-20 00:00:14 +08:00
last_close = VALUES(last_close),
float_share = VALUES(float_share)
2025-08-15 13:22:58 +08:00
"""
try:
with MySQLHelper(**db_config) as db:
# 检查表是否存在,不存在则创建
if not db.table_exists(table_name):
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id INT AUTO_INCREMENT PRIMARY KEY,
stock_code VARCHAR(20) NOT NULL COMMENT '股票代码',
stock_name VARCHAR(50) COMMENT '股票名称',
trade_date DATETIME NOT NULL COMMENT '交易日期时间',
open_price DECIMAL(10, 3) COMMENT '开盘价',
close_price DECIMAL(10, 3) COMMENT '收盘价',
high_price DECIMAL(10, 3) COMMENT '最高价',
low_price DECIMAL(10, 3) COMMENT '最低价',
pe_ratio DECIMAL(10, 3) COMMENT '市盈率',
turnover_rate DECIMAL(10, 6) COMMENT '换手率(%)',
volume BIGINT COMMENT '成交量(股)',
turnover DECIMAL(20, 2) COMMENT '成交额(元)',
change_rate DECIMAL(10, 6) COMMENT '涨跌幅(%)',
last_close DECIMAL(10, 3) COMMENT '昨收价',
market VARCHAR(10) COMMENT '市场标识',
2025-08-20 00:00:14 +08:00
float_share BIGINT COMMENT '流通股数量',
2025-08-15 13:22:58 +08:00
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
UNIQUE KEY uk_code_date (stock_code, trade_date),
KEY idx_trade_date (trade_date),
KEY idx_stock_code (stock_code)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='股票行情数据表'
"""
db.execute_update(create_table_sql)
2025-08-20 14:09:35 +08:00
logger.info(f"创建了新表: {table_name}")
2025-08-15 13:22:58 +08:00
affected_rows = db.execute_many(insert_sql, processed_data)
2025-08-20 14:09:35 +08:00
logger.info(f"成功插入/更新 {affected_rows} 条行情记录到表 {table_name}")
2025-08-15 13:22:58 +08:00
return True
except Exception as e:
2025-08-20 14:09:35 +08:00
logger.error(f"保存行情数据到表 {table_name} 失败: {str(e)}")
2025-08-15 13:22:58 +08:00
return False
2025-08-20 00:00:14 +08:00
def read_single_account_stock_codes(file_path='data\missing_tables_小航富途.txt'):
2025-08-15 13:22:58 +08:00
"""基础读取方法 - 按行读取所有内容"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 去除每行末尾的换行符,并过滤空行
codes = [line.strip() for line in lines if line.strip()]
return codes
except FileNotFoundError:
print(f"文件 {file_path} 不存在")
return []
except Exception as e:
print(f"读取文件失败: {str(e)}")
return []
2025-08-20 00:00:14 +08:00
def get_stock_codes() -> List[str]:
"""从conditionalselection表获取所有股票代码"""
try:
with MySQLHelper(**db_config) as db:
sql = f"SELECT DISTINCT stock_code FROM stock_filter"
results = db.execute_query(sql)
return [row['stock_code'] for row in results if row['stock_code']]
except Exception as e:
2025-08-20 14:09:35 +08:00
logger.error(f"获取股票代码失败: {str(e)}")
2025-08-20 00:00:14 +08:00
return []
2025-08-20 14:09:35 +08:00
def write_missing_codes_to_txt(missing_codes: list, filename: str = "config\HK_futu.txt"):
"""将缺失的股票代码追加到TXT文件如果文件不存在则创建"""
try:
# 确保目录存在
directory = os.path.dirname(filename)
if directory and not os.path.exists(directory):
os.makedirs(directory)
logger.info(f"创建目录: {directory}")
# # 检查文件是否存在
# file_exists = os.path.exists(filename)
with open(filename, 'a', encoding='utf-8') as f:
for code in missing_codes:
f.write(f"\n{code}")
logger.info(f"已将 {len(missing_codes)} 个缺失表对应的股票代码写入 {filename}")
except Exception as e:
logger.error(f"写入TXT文件失败: {str(e)}")
2025-08-20 00:00:14 +08:00
2025-08-15 13:22:58 +08:00
if __name__ == "__main__":
# 数据库配置
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'bzskmysql',
2025-08-20 00:00:14 +08:00
'database': 'hk_kline_1d'
2025-08-15 13:22:58 +08:00
}
2025-08-20 14:09:35 +08:00
# 创建导入器并运行, 用于每日更新流通股数量 -> 操作界面中增加一个开关,每天只需要更新一次
2025-08-20 00:00:14 +08:00
futuStockFilter = FutuStockFilter(db_config)
futuStockFilter.run_direct_import()
2025-08-15 13:22:58 +08:00
2025-08-20 14:09:35 +08:00
# 每个账号获取的数据独立开来 -> 操作见面可以选择
2025-08-20 00:00:14 +08:00
market_data_all = get_stock_codes()
market_data_hang = read_single_account_stock_codes('config\hang_futu.txt')
market_data_kevin= read_single_account_stock_codes('config\kevin_futu.txt')
2025-08-20 14:09:35 +08:00
market_data_HK= read_single_account_stock_codes('config\HK_futu.txt')
market_data_new = list(set(market_data_all) - set(market_data_hang) - set(market_data_kevin) - set(market_data_HK))
# write_missing_codes_to_txt(market_data_new) # 新股票添加到文件中 -> 暂时手动设置
2025-08-15 13:22:58 +08:00
2025-08-20 14:09:35 +08:00
# 动态调整
"""*********************************************"""
market_data = market_data_new
2025-08-20 00:00:14 +08:00
2025-08-20 14:09:35 +08:00
# 每天收盘后更新数据 -> 操作界面中,这个参数需要放出来
start_date = (datetime.now() - timedelta(days = 5)).strftime("%Y-%m-%d")
2025-08-20 00:00:14 +08:00
end_date = (datetime.now() + timedelta(days = 1)).strftime("%Y-%m-%d")
# 获取流通股数据字典
float_share_dict = get_float_share_data(db_config, 'stock_filter') # 假设数据在stock_filter表中
# 使用tqdm创建进度条
for code in tqdm(market_data, desc="下载股票数据", unit=""):
full_code = f"{code}"
2025-08-15 13:22:58 +08:00
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
2025-08-20 00:00:14 +08:00
# 从字典中获取流通股数量
float_share = float_share_dict.get(code) if float_share_dict else None
# 如果字典中没有找到尝试从API获取
if float_share is None:
float_share = get_float_share(quote_ctx, code)
if float_share is not None:
logger.info(f"从API获取股票 {code} 的流通股数量: {float_share}")
else:
2025-08-20 14:09:35 +08:00
logger.warning(f"无法获取股票 {code} 的流通股数量")
2025-08-20 00:00:14 +08:00
# 获取历史K线数据
ret, data, page_req_key = quote_ctx.request_history_kline(code, start=start_date, end=end_date, max_count=100)
# 保存数据到自定义表
2025-08-15 13:22:58 +08:00
custom_table_name = 'hk_' + code[3:] # 自定义表名
if ret == RET_OK:
2025-08-20 00:00:14 +08:00
success = save_quotes_to_db(db_config, data, table_name=custom_table_name, float_share=float_share)
2025-08-15 13:22:58 +08:00
else:
2025-08-20 00:00:14 +08:00
logger.error(f'error:{data}')
2025-08-15 13:22:58 +08:00
2025-08-20 00:00:14 +08:00
isWhile = False # 只返回一页时增加请求延迟
2025-08-15 13:22:58 +08:00
while page_req_key != None: # 请求后面的所有结果
2025-08-20 00:00:14 +08:00
isWhile = True
ret, data, page_req_key = quote_ctx.request_history_kline(code, start=start_date, end=end_date, max_count=100, page_req_key=page_req_key) # 请求翻页后的数据
2025-08-15 13:22:58 +08:00
if ret == RET_OK:
2025-08-20 00:00:14 +08:00
success = save_quotes_to_db(db_config, data, table_name=custom_table_name, float_share=float_share)
2025-08-15 13:22:58 +08:00
else:
2025-08-20 00:00:14 +08:00
logger.error(f'error:{ data}')
2025-08-15 13:22:58 +08:00
2025-08-20 14:09:35 +08:00
time.sleep(0.7)
2025-08-15 13:22:58 +08:00
2025-08-20 00:00:14 +08:00
if isWhile == False:
2025-08-20 14:09:35 +08:00
time.sleep(0.7)
2025-08-15 13:22:58 +08:00
quote_ctx.close() # 结束后记得关闭当条连接,防止连接条数用尽