Files
HKDataManagment/UpdateFutuData/fullmarketdata.py
2025-08-22 11:20:41 +08:00

526 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# """
# 全市场数据更新
# - 每日更新一次全市场数据
# - 重点关注:新发、停盘、退市
# """
# from futu import *
# from pymysql import Error
# from MySQLHelper import MySQLHelper # MySQLHelper类保存为单独文件
# from datetime import datetime
# import logging
from typing import Optional, List, Dict, Union, Tuple
# # 配置日志
# logging.basicConfig(
# level=logging.INFO,
# format='%(asctime)s - %(levelname)s - %(message)s',
# handlers=[
# logging.FileHandler('Debug.log', encoding='utf-8'), # 关键在这里
# logging.StreamHandler()
# ]
# )
# 安全转换函数
def safe_float(v) -> Optional[float]:
"""安全转换为float处理N/A和空值"""
try:
return float(v) if pd.notna(v) and str(v).upper() != 'N/A' else None
except (ValueError, TypeError):
return None
def safe_int(v) -> Optional[int]:
"""安全转换为int处理N/A和空值"""
try:
return int(v) if pd.notna(v) and str(v).upper() != 'N/A' else None
except (ValueError, TypeError):
return None
def safe_parse_date(date_str, date_format='%Y-%m-%d'):
"""
安全解析日期字符串
:param date_str: 日期字符串
:param date_format: 日期格式
:return: 解析后的datetime对象或None
"""
if not date_str or pd.isna(date_str) or str(date_str).strip() == '':
return None
try:
return datetime.strptime(str(date_str), date_format)
except ValueError:
logging.warning(f"无法解析日期字符串: {date_str}")
# return None
# def get_market_data(market: Market) -> Optional[List[Dict]]:
# """
# 获取指定市场股票基础数据
# Args:
# market (Market): 市场枚举值,如 Market.SH, Market.SZ 等
# Returns:
# Optional[List[Dict]]: 处理后的数据集或None(获取失败时)
# """
# quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
# try:
# ret, data = quote_ctx.get_stock_basicinfo(market, SecurityType.STOCK)
# if ret == RET_OK:
# logging.info(f"成功获取 {market} 市场 {len(data)} 条股票数据")
# processed_data = []
# for _, row in data.iterrows():
# try:
# item = {
# 'code': str(row['code']),
# 'name': str(row['name']),
# 'lot_size': safe_int(row['lot_size']),
# 'stock_type': str(row['stock_type']),
# 'stock_child_type': str(row['stock_child_type']),
# 'stock_owner': str(row['stock_owner']) if pd.notna(row['stock_owner']) else None,
# 'option_type': str(row['option_type']) if pd.notna(row['option_type']) else None,
# 'strike_time': str(row['strike_time']) if pd.notna(row['strike_time']) else None,
# 'strike_price': safe_float(row['strike_price']),
# 'suspension': safe_int(row['suspension']),
# 'listing_date': safe_parse_date(row['listing_date']),
# 'stock_id': safe_int(row['stock_id']),
# 'delisting': safe_int(row['delisting']),
# 'index_option_type': str(row['index_option_type']) if pd.notna(row['index_option_type']) else None,
# 'main_contract': safe_int(row['main_contract']),
# 'last_trade_time': safe_parse_date(row['last_trade_time']),
# 'exchange_type': safe_int(row['exchange_type']),
# # 'market': market.name # 添加市场标识
# }
# processed_data.append(item)
# except Exception as e:
# logging.warning(f"处理股票数据时跳过异常数据 {row.get('code', '未知')}: {str(e)}")
# continue
# return processed_data
# else:
# logging.error(f"获取 {market} 市场数据失败: {data}")
# return None
# except Exception as e:
# logging.error(f"获取 {market} 市场数据时发生异常: {str(e)}")
# return None
# finally:
# quote_ctx.close()
# def update_market_data(db_config: dict, markets: List[Market], truncate_table: bool = False) -> bool:
# """
# 更新全市场数据到数据库
# Args:
# db_config (dict): 数据库配置
# markets (List[Market]): 要更新的市场列表
# truncate_table (bool): 是否在插入前清空表
# Returns:
# bool: 操作是否成功
# """
# # 设置默认值
# db_config.setdefault('port', 3306)
# db_config.setdefault('charset', 'utf8mb4')
# try:
# all_data = []
# # 获取所有指定市场的数据
# for market in markets:
# market_data = get_market_data(market)
# if market_data:
# all_data.extend(market_data)
# if not all_data:
# logging.error("未获取到任何有效数据,终止更新")
# return False
# # 数据验证
# validated_data = validate_market_data(all_data)
# if not validated_data:
# logging.error("没有通过验证的有效数据,终止更新")
# return False
# logging.info(f"准备插入 {len(validated_data)} 条已验证数据")
# with MySQLHelper(**db_config) as db:
# try:
# if truncate_table:
# db.execute_update("TRUNCATE TABLE staticdata_hk")
# logging.info("已清空数据表")
# insert_sql = """
# INSERT INTO staticdata_hk (
# code, name, lot_size, stock_type, stock_child_type, stock_owner,
# option_type, strike_time, strike_price, suspension, listing_date,
# stock_id, delisting, index_option_type, main_contract, last_trade_time,
# exchange_type
# ) VALUES (
# %(code)s, %(name)s, %(lot_size)s, %(stock_type)s, %(stock_child_type)s, %(stock_owner)s,
# %(option_type)s, %(strike_time)s, %(strike_price)s, %(suspension)s, %(listing_date)s,
# %(stock_id)s, %(delisting)s, %(index_option_type)s, %(main_contract)s, %(last_trade_time)s,
# %(exchange_type)s
# )
# """
# affected_rows = db.execute_many(insert_sql, validated_data)
# logging.info(f"成功插入 {affected_rows} 条记录")
# check_special_cases(db)
# generate_data_quality_report(validated_data)
# return True
# except Error as e:
# logging.error(f"数据库操作失败: {str(e)}")
# return False
# except Exception as e:
# logging.error(f"更新过程中发生未预期异常: {str(e)}")
# return False
# def validate_market_data(dataset: list) -> list:
# """
# 验证市场数据有效性
# Args:
# dataset (list): 原始数据集
# Returns:
# list: 通过验证的数据集
# """
# validated_data = []
# for item in dataset:
# try:
# # 必要字段检查
# if not item.get('code') or not item.get('name'):
# logging.warning(f"跳过无效数据: 缺少必要字段 code或name")
# continue
# # 数值范围验证
# if item.get('lot_size') is not None and item['lot_size'] < 0:
# logging.warning(f"股票 {item['code']} 的lot_size为负值: {item['lot_size']}")
# item['lot_size'] = None
# validated_data.append(item)
# except Exception as e:
# logging.warning(f"数据验证失败,跳过记录 {item.get('code')}: {str(e)}")
# continue
# return validated_data
# def generate_data_quality_report(dataset: list) -> None:
# """
# 生成数据质量报告
# Args:
# dataset (list): 数据集
# """
# if not dataset:
# logging.warning("无法生成数据质量报告: 数据集为空")
# return
# total = len(dataset)
# stats = {
# 'missing_last_trade': sum(1 for x in dataset if x.get('last_trade_time') is None),
# 'missing_listing': sum(1 for x in dataset if x.get('listing_date') is None),
# 'missing_strike_price': sum(1 for x in dataset if x.get('strike_price') is None),
# 'suspended': sum(1 for x in dataset if x.get('suspension') == 1),
# 'delisted': sum(1 for x in dataset if x.get('delisting') == 1)
# }
# logging.info("\n=== 数据质量报告 ===")
# logging.info(f"总记录数: {total}")
# logging.info(f"缺失最后交易时间: {stats['missing_last_trade']} ({stats['missing_last_trade']/total:.1%})")
# logging.info(f"缺失上市日期: {stats['missing_listing']} ({stats['missing_listing']/total:.1%})")
# logging.info(f"缺失执行价格: {stats['missing_strike_price']} ({stats['missing_strike_price']/total:.1%})")
# logging.info(f"停牌股票数: {stats['suspended']} ({stats['suspended']/total:.1%})")
# logging.info(f"退市股票数: {stats['delisted']} ({stats['delisted']/total:.1%})")
# def check_special_cases(db: MySQLHelper):
# """
# 检查重点关注项:新发、停盘、退市股票
# """
# try:
# # 1. 检查新上市股票最近30天内上市
# new_listings = db.execute_query("""
# SELECT code, name, listing_date
# FROM staticdata_hk
# WHERE listing_date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY)
# ORDER BY listing_date DESC
# """)
# if new_listings:
# logging.info("\n=== 新上市股票 ===")
# for stock in new_listings:
# logging.info(f"{stock['code']} {stock['name']} 上市日期: {stock['listing_date']}")
# # 2. 检查停牌股票
# suspended = db.execute_query("""
# SELECT code, name, suspension
# FROM staticdata_hk
# WHERE suspension = 1
# ORDER BY code
# """)
# if suspended:
# logging.info("\n=== 停牌股票 ===")
# for stock in suspended:
# logging.info(f"{stock['code']} {stock['name']}")
# # 3. 检查退市股票
# delisted = db.execute_query("""
# SELECT code, name, delisting
# FROM staticdata_hk
# WHERE delisting = 1
# ORDER BY code
# """)
# if delisted:
# logging.info("\n=== 退市股票 ===")
# for stock in delisted:
# logging.info(f"{stock['code']} {stock['name']}")
# except Error as e:
# logging.error(f"检查重点关注项时出错: {str(e)}")
# if __name__ == "__main__":
# # 配置要更新的市场
# # target_markets = [Market.SH, Market.SZ, Market.HK] # 上证、深证、港股
# target_markets = [Market.HK] # 上证、深证、港股
# # 数据库配置
# db_config = {
# 'host': 'localhost',
# 'user': 'root',
# 'password': 'bzskmysql',
# 'database': 'fullmarketdata_hk'
# }
# logging.info("=== 开始全市场数据更新 ===")
# # 执行更新(首次运行可设置truncate_table=True)
# success = update_market_data(
# db_config=db_config,
# markets=target_markets,
# truncate_table=True
# )
# if success:
# logging.info("市场数据更新成功完成")
# else:
# logging.error("市场数据更新失败")
"""
全市场数据更新 (港股专用)
- 每日更新一次港股市场数据
- 重点关注:新发、停盘、退市
"""
from futu import *
from pymysql import Error
from base.MySQLHelper import MySQLHelper
from datetime import datetime, timedelta
import logging
import pandas as pd
import time
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('market_data_update.log', encoding='utf-8'),
logging.StreamHandler()
]
)
# 安全转换函数
def safe_float(v) -> Optional[float]:
"""安全转换为float处理N/A和空值"""
try:
return float(v) if v and str(v).upper() != 'N/A' and pd.notna(v) else None
except (ValueError, TypeError):
return None
def safe_int(v) -> Optional[int]:
"""安全转换为int处理N/A和空值"""
try:
return int(v) if v and str(v).upper() != 'N/A' and pd.notna(v) else None
except (ValueError, TypeError):
return None
def safe_parse_date(date_str, date_format='%Y-%m-%d'):
"""安全解析日期字符串"""
if not date_str or pd.isna(date_str) or str(date_str).strip() == '':
return None
try:
return datetime.strptime(str(date_str), date_format).date()
except ValueError:
return None
def get_hk_market_data(retry_count=3, retry_delay=5) -> Optional[List[Dict]]:
"""
获取港股市场股票基础数据(带重试机制)
Returns:
Optional[List[Dict]]: 处理后的数据集或None(获取失败时)
"""
for attempt in range(retry_count):
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
try:
logging.info(f"尝试获取港股市场数据 (尝试 {attempt+1}/{retry_count})")
ret, data = quote_ctx.get_stock_basicinfo(Market.HK, SecurityType.STOCK)
if ret == RET_OK:
logging.info(f"成功获取港股市场 {len(data)} 条股票数据")
processed_data = []
for _, row in data.iterrows():
# 重点关注字段特殊处理
suspension = 1 if str(row['suspension']).upper() == 'Y' else 0
delisting = 1 if str(row['delisting']).upper() == 'Y' else 0
item = {
'code': str(row['code']),
'name': str(row['name']),
'lot_size': safe_int(row['lot_size']),
'stock_type': str(row['stock_type']),
'stock_child_type': str(row['stock_child_type']),
'stock_owner': str(row['stock_owner']) if pd.notna(row['stock_owner']) else None,
'suspension': suspension,
'listing_date': safe_parse_date(row['listing_date']),
'delisting': delisting,
'last_trade_time': safe_parse_date(row['last_trade_time']),
}
processed_data.append(item)
return processed_data
else:
logging.warning(f"获取港股市场数据失败: {data}")
except Exception as e:
logging.error(f"获取数据时发生异常: {str(e)}")
finally:
quote_ctx.close()
if attempt < retry_count - 1:
logging.info(f"{retry_delay}秒后重试...")
time.sleep(retry_delay)
logging.error(f"经过{retry_count}次尝试后仍未能获取数据")
return None
def update_hk_market_data(db_config: dict) -> bool:
"""
更新港股市场数据到数据库
Returns:
bool: 操作是否成功
"""
try:
# 获取港股数据
hk_data = get_hk_market_data()
if not hk_data:
logging.error("未获取到港股有效数据,终止更新")
return False
logging.info(f"获取到 {len(hk_data)} 条港股数据,开始更新数据库")
with MySQLHelper(**db_config) as db:
try:
# 清空表并插入新数据
db.execute_update("TRUNCATE TABLE staticdata_hk")
insert_sql = """
INSERT INTO staticdata_hk (
code, name, lot_size, stock_type, stock_child_type, stock_owner,
suspension, listing_date, delisting, last_trade_time
) VALUES (
%(code)s, %(name)s, %(lot_size)s, %(stock_type)s, %(stock_child_type)s,
%(stock_owner)s, %(suspension)s, %(listing_date)s, %(delisting)s, %(last_trade_time)s
)
"""
affected_rows = db.execute_many(insert_sql, hk_data)
logging.info(f"成功插入 {affected_rows} 条记录到数据库")
# 检查重点关注项
check_special_cases(db)
return True
except Error as e:
logging.error(f"数据库操作失败: {str(e)}")
return False
except Exception as e:
logging.error(f"更新过程中发生未预期异常: {str(e)}")
return False
def check_special_cases(db: MySQLHelper):
"""
检查重点关注项:新发、停盘、退市股票
"""
try:
today = datetime.now().date()
# 1. 检查新上市股票最近7天内上市
new_listings = db.execute_query(f"""
SELECT code, name, listing_date
FROM staticdata_hk
WHERE listing_date >= '{today - timedelta(days=7)}'
ORDER BY listing_date DESC
""")
if new_listings:
logging.info("\n=== 新上市股票 ===")
for stock in new_listings:
logging.info(f"{stock['code']} {stock['name']} 上市日期: {stock['listing_date']}")
else:
logging.info("近期无新上市股票")
# 2. 检查停牌股票
suspended = db.execute_query("""
SELECT code, name
FROM staticdata_hk
WHERE suspension = 1
ORDER BY code
""")
if suspended:
logging.info("\n=== 停牌股票 ===")
for stock in suspended:
logging.info(f"{stock['code']} {stock['name']}")
else:
logging.info("当前无停牌股票")
# 3. 检查退市股票
delisted = db.execute_query("""
SELECT code, name, last_trade_time
FROM staticdata_hk
WHERE delisting = 1
ORDER BY last_trade_time DESC
""")
if delisted:
logging.info("\n=== 退市股票 ===")
for stock in delisted:
trade_time = stock['last_trade_time'] or "未知时间"
logging.info(f"{stock['code']} {stock['name']} 最后交易日: {trade_time}")
else:
logging.info("当前无退市股票")
except Error as e:
logging.error(f"检查重点关注项时出错: {str(e)}")
if __name__ == "__main__":
# 数据库配置
db_config = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': 'bzskmysql',
'database': 'fullmarketdata_hk',
'charset': 'utf8mb4'
}
logging.info("=== 开始港股市场数据更新 ===")
# 执行更新
start_time = time.time()
success = update_hk_market_data(db_config=db_config)
elapsed = time.time() - start_time
if success:
logging.info(f"港股市场数据更新成功完成! 耗时: {elapsed:.2f}")
else:
logging.error("港股市场数据更新失败")