# """ # 全市场数据更新 # - 每日更新一次全市场数据 # - 重点关注:新发、停盘、退市 # """ # from futu import * # from pymysql import Error # from MySQLHelper import MySQLHelper # MySQLHelper类保存为单独文件 # from datetime import datetime # import logging from typing import Optional, List, Dict, Union, Tuple # # 配置日志 # logging.basicConfig( # level=logging.INFO, # format='%(asctime)s - %(levelname)s - %(message)s', # handlers=[ # logging.FileHandler('Debug.log', encoding='utf-8'), # 关键在这里 # logging.StreamHandler() # ] # ) # 安全转换函数 def safe_float(v) -> Optional[float]: """安全转换为float,处理N/A和空值""" try: return float(v) if pd.notna(v) and str(v).upper() != 'N/A' else None except (ValueError, TypeError): return None def safe_int(v) -> Optional[int]: """安全转换为int,处理N/A和空值""" try: return int(v) if pd.notna(v) and str(v).upper() != 'N/A' else None except (ValueError, TypeError): return None def safe_parse_date(date_str, date_format='%Y-%m-%d'): """ 安全解析日期字符串 :param date_str: 日期字符串 :param date_format: 日期格式 :return: 解析后的datetime对象或None """ if not date_str or pd.isna(date_str) or str(date_str).strip() == '': return None try: return datetime.strptime(str(date_str), date_format) except ValueError: logging.warning(f"无法解析日期字符串: {date_str}") # return None # def get_market_data(market: Market) -> Optional[List[Dict]]: # """ # 获取指定市场股票基础数据 # Args: # market (Market): 市场枚举值,如 Market.SH, Market.SZ 等 # Returns: # Optional[List[Dict]]: 处理后的数据集或None(获取失败时) # """ # quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111) # try: # ret, data = quote_ctx.get_stock_basicinfo(market, SecurityType.STOCK) # if ret == RET_OK: # logging.info(f"成功获取 {market} 市场 {len(data)} 条股票数据") # processed_data = [] # for _, row in data.iterrows(): # try: # item = { # 'code': str(row['code']), # 'name': str(row['name']), # 'lot_size': safe_int(row['lot_size']), # 'stock_type': str(row['stock_type']), # 'stock_child_type': str(row['stock_child_type']), # 'stock_owner': str(row['stock_owner']) if pd.notna(row['stock_owner']) else None, # 'option_type': str(row['option_type']) if pd.notna(row['option_type']) else None, # 'strike_time': str(row['strike_time']) if pd.notna(row['strike_time']) else None, # 'strike_price': safe_float(row['strike_price']), # 'suspension': safe_int(row['suspension']), # 'listing_date': safe_parse_date(row['listing_date']), # 'stock_id': safe_int(row['stock_id']), # 'delisting': safe_int(row['delisting']), # 'index_option_type': str(row['index_option_type']) if pd.notna(row['index_option_type']) else None, # 'main_contract': safe_int(row['main_contract']), # 'last_trade_time': safe_parse_date(row['last_trade_time']), # 'exchange_type': safe_int(row['exchange_type']), # # 'market': market.name # 添加市场标识 # } # processed_data.append(item) # except Exception as e: # logging.warning(f"处理股票数据时跳过异常数据 {row.get('code', '未知')}: {str(e)}") # continue # return processed_data # else: # logging.error(f"获取 {market} 市场数据失败: {data}") # return None # except Exception as e: # logging.error(f"获取 {market} 市场数据时发生异常: {str(e)}") # return None # finally: # quote_ctx.close() # def update_market_data(db_config: dict, markets: List[Market], truncate_table: bool = False) -> bool: # """ # 更新全市场数据到数据库 # Args: # db_config (dict): 数据库配置 # markets (List[Market]): 要更新的市场列表 # truncate_table (bool): 是否在插入前清空表 # Returns: # bool: 操作是否成功 # """ # # 设置默认值 # db_config.setdefault('port', 3306) # db_config.setdefault('charset', 'utf8mb4') # try: # all_data = [] # # 获取所有指定市场的数据 # for market in markets: # market_data = get_market_data(market) # if market_data: # all_data.extend(market_data) # if not all_data: # logging.error("未获取到任何有效数据,终止更新") # return False # # 数据验证 # validated_data = validate_market_data(all_data) # if not validated_data: # logging.error("没有通过验证的有效数据,终止更新") # return False # logging.info(f"准备插入 {len(validated_data)} 条已验证数据") # with MySQLHelper(**db_config) as db: # try: # if truncate_table: # db.execute_update("TRUNCATE TABLE staticdata_hk") # logging.info("已清空数据表") # insert_sql = """ # INSERT INTO staticdata_hk ( # code, name, lot_size, stock_type, stock_child_type, stock_owner, # option_type, strike_time, strike_price, suspension, listing_date, # stock_id, delisting, index_option_type, main_contract, last_trade_time, # exchange_type # ) VALUES ( # %(code)s, %(name)s, %(lot_size)s, %(stock_type)s, %(stock_child_type)s, %(stock_owner)s, # %(option_type)s, %(strike_time)s, %(strike_price)s, %(suspension)s, %(listing_date)s, # %(stock_id)s, %(delisting)s, %(index_option_type)s, %(main_contract)s, %(last_trade_time)s, # %(exchange_type)s # ) # """ # affected_rows = db.execute_many(insert_sql, validated_data) # logging.info(f"成功插入 {affected_rows} 条记录") # check_special_cases(db) # generate_data_quality_report(validated_data) # return True # except Error as e: # logging.error(f"数据库操作失败: {str(e)}") # return False # except Exception as e: # logging.error(f"更新过程中发生未预期异常: {str(e)}") # return False # def validate_market_data(dataset: list) -> list: # """ # 验证市场数据有效性 # Args: # dataset (list): 原始数据集 # Returns: # list: 通过验证的数据集 # """ # validated_data = [] # for item in dataset: # try: # # 必要字段检查 # if not item.get('code') or not item.get('name'): # logging.warning(f"跳过无效数据: 缺少必要字段 code或name") # continue # # 数值范围验证 # if item.get('lot_size') is not None and item['lot_size'] < 0: # logging.warning(f"股票 {item['code']} 的lot_size为负值: {item['lot_size']}") # item['lot_size'] = None # validated_data.append(item) # except Exception as e: # logging.warning(f"数据验证失败,跳过记录 {item.get('code')}: {str(e)}") # continue # return validated_data # def generate_data_quality_report(dataset: list) -> None: # """ # 生成数据质量报告 # Args: # dataset (list): 数据集 # """ # if not dataset: # logging.warning("无法生成数据质量报告: 数据集为空") # return # total = len(dataset) # stats = { # 'missing_last_trade': sum(1 for x in dataset if x.get('last_trade_time') is None), # 'missing_listing': sum(1 for x in dataset if x.get('listing_date') is None), # 'missing_strike_price': sum(1 for x in dataset if x.get('strike_price') is None), # 'suspended': sum(1 for x in dataset if x.get('suspension') == 1), # 'delisted': sum(1 for x in dataset if x.get('delisting') == 1) # } # logging.info("\n=== 数据质量报告 ===") # logging.info(f"总记录数: {total}") # logging.info(f"缺失最后交易时间: {stats['missing_last_trade']} ({stats['missing_last_trade']/total:.1%})") # logging.info(f"缺失上市日期: {stats['missing_listing']} ({stats['missing_listing']/total:.1%})") # logging.info(f"缺失执行价格: {stats['missing_strike_price']} ({stats['missing_strike_price']/total:.1%})") # logging.info(f"停牌股票数: {stats['suspended']} ({stats['suspended']/total:.1%})") # logging.info(f"退市股票数: {stats['delisted']} ({stats['delisted']/total:.1%})") # def check_special_cases(db: MySQLHelper): # """ # 检查重点关注项:新发、停盘、退市股票 # """ # try: # # 1. 检查新上市股票(最近30天内上市) # new_listings = db.execute_query(""" # SELECT code, name, listing_date # FROM staticdata_hk # WHERE listing_date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY) # ORDER BY listing_date DESC # """) # if new_listings: # logging.info("\n=== 新上市股票 ===") # for stock in new_listings: # logging.info(f"{stock['code']} {stock['name']} 上市日期: {stock['listing_date']}") # # 2. 检查停牌股票 # suspended = db.execute_query(""" # SELECT code, name, suspension # FROM staticdata_hk # WHERE suspension = 1 # ORDER BY code # """) # if suspended: # logging.info("\n=== 停牌股票 ===") # for stock in suspended: # logging.info(f"{stock['code']} {stock['name']}") # # 3. 检查退市股票 # delisted = db.execute_query(""" # SELECT code, name, delisting # FROM staticdata_hk # WHERE delisting = 1 # ORDER BY code # """) # if delisted: # logging.info("\n=== 退市股票 ===") # for stock in delisted: # logging.info(f"{stock['code']} {stock['name']}") # except Error as e: # logging.error(f"检查重点关注项时出错: {str(e)}") # if __name__ == "__main__": # # 配置要更新的市场 # # target_markets = [Market.SH, Market.SZ, Market.HK] # 上证、深证、港股 # target_markets = [Market.HK] # 上证、深证、港股 # # 数据库配置 # db_config = { # 'host': 'localhost', # 'user': 'root', # 'password': 'bzskmysql', # 'database': 'fullmarketdata_hk' # } # logging.info("=== 开始全市场数据更新 ===") # # 执行更新(首次运行可设置truncate_table=True) # success = update_market_data( # db_config=db_config, # markets=target_markets, # truncate_table=True # ) # if success: # logging.info("市场数据更新成功完成") # else: # logging.error("市场数据更新失败") """ 全市场数据更新 (港股专用) - 每日更新一次港股市场数据 - 重点关注:新发、停盘、退市 """ from futu import * from pymysql import Error from base.MySQLHelper import MySQLHelper from datetime import datetime, timedelta import logging import pandas as pd import time # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('market_data_update.log', encoding='utf-8'), logging.StreamHandler() ] ) # 安全转换函数 def safe_float(v) -> Optional[float]: """安全转换为float,处理N/A和空值""" try: return float(v) if v and str(v).upper() != 'N/A' and pd.notna(v) else None except (ValueError, TypeError): return None def safe_int(v) -> Optional[int]: """安全转换为int,处理N/A和空值""" try: return int(v) if v and str(v).upper() != 'N/A' and pd.notna(v) else None except (ValueError, TypeError): return None def safe_parse_date(date_str, date_format='%Y-%m-%d'): """安全解析日期字符串""" if not date_str or pd.isna(date_str) or str(date_str).strip() == '': return None try: return datetime.strptime(str(date_str), date_format).date() except ValueError: return None def get_hk_market_data(retry_count=3, retry_delay=5) -> Optional[List[Dict]]: """ 获取港股市场股票基础数据(带重试机制) Returns: Optional[List[Dict]]: 处理后的数据集或None(获取失败时) """ for attempt in range(retry_count): quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111) try: logging.info(f"尝试获取港股市场数据 (尝试 {attempt+1}/{retry_count})") ret, data = quote_ctx.get_stock_basicinfo(Market.HK, SecurityType.STOCK) if ret == RET_OK: logging.info(f"成功获取港股市场 {len(data)} 条股票数据") processed_data = [] for _, row in data.iterrows(): # 重点关注字段特殊处理 suspension = 1 if str(row['suspension']).upper() == 'Y' else 0 delisting = 1 if str(row['delisting']).upper() == 'Y' else 0 item = { 'code': str(row['code']), 'name': str(row['name']), 'lot_size': safe_int(row['lot_size']), 'stock_type': str(row['stock_type']), 'stock_child_type': str(row['stock_child_type']), 'stock_owner': str(row['stock_owner']) if pd.notna(row['stock_owner']) else None, 'suspension': suspension, 'listing_date': safe_parse_date(row['listing_date']), 'delisting': delisting, 'last_trade_time': safe_parse_date(row['last_trade_time']), } processed_data.append(item) return processed_data else: logging.warning(f"获取港股市场数据失败: {data}") except Exception as e: logging.error(f"获取数据时发生异常: {str(e)}") finally: quote_ctx.close() if attempt < retry_count - 1: logging.info(f"{retry_delay}秒后重试...") time.sleep(retry_delay) logging.error(f"经过{retry_count}次尝试后仍未能获取数据") return None def update_hk_market_data(db_config: dict) -> bool: """ 更新港股市场数据到数据库 Returns: bool: 操作是否成功 """ try: # 获取港股数据 hk_data = get_hk_market_data() if not hk_data: logging.error("未获取到港股有效数据,终止更新") return False logging.info(f"获取到 {len(hk_data)} 条港股数据,开始更新数据库") with MySQLHelper(**db_config) as db: try: # 清空表并插入新数据 db.execute_update("TRUNCATE TABLE staticdata_hk") insert_sql = """ INSERT INTO staticdata_hk ( code, name, lot_size, stock_type, stock_child_type, stock_owner, suspension, listing_date, delisting, last_trade_time ) VALUES ( %(code)s, %(name)s, %(lot_size)s, %(stock_type)s, %(stock_child_type)s, %(stock_owner)s, %(suspension)s, %(listing_date)s, %(delisting)s, %(last_trade_time)s ) """ affected_rows = db.execute_many(insert_sql, hk_data) logging.info(f"成功插入 {affected_rows} 条记录到数据库") # 检查重点关注项 check_special_cases(db) return True except Error as e: logging.error(f"数据库操作失败: {str(e)}") return False except Exception as e: logging.error(f"更新过程中发生未预期异常: {str(e)}") return False def check_special_cases(db: MySQLHelper): """ 检查重点关注项:新发、停盘、退市股票 """ try: today = datetime.now().date() # 1. 检查新上市股票(最近7天内上市) new_listings = db.execute_query(f""" SELECT code, name, listing_date FROM staticdata_hk WHERE listing_date >= '{today - timedelta(days=7)}' ORDER BY listing_date DESC """) if new_listings: logging.info("\n=== 新上市股票 ===") for stock in new_listings: logging.info(f"{stock['code']} {stock['name']} 上市日期: {stock['listing_date']}") else: logging.info("近期无新上市股票") # 2. 检查停牌股票 suspended = db.execute_query(""" SELECT code, name FROM staticdata_hk WHERE suspension = 1 ORDER BY code """) if suspended: logging.info("\n=== 停牌股票 ===") for stock in suspended: logging.info(f"{stock['code']} {stock['name']}") else: logging.info("当前无停牌股票") # 3. 检查退市股票 delisted = db.execute_query(""" SELECT code, name, last_trade_time FROM staticdata_hk WHERE delisting = 1 ORDER BY last_trade_time DESC """) if delisted: logging.info("\n=== 退市股票 ===") for stock in delisted: trade_time = stock['last_trade_time'] or "未知时间" logging.info(f"{stock['code']} {stock['name']} 最后交易日: {trade_time}") else: logging.info("当前无退市股票") except Error as e: logging.error(f"检查重点关注项时出错: {str(e)}") if __name__ == "__main__": # 数据库配置 db_config = { 'host': 'localhost', 'port': 3306, 'user': 'root', 'password': 'bzskmysql', 'database': 'fullmarketdata_hk', 'charset': 'utf8mb4' } logging.info("=== 开始港股市场数据更新 ===") # 执行更新 start_time = time.time() success = update_hk_market_data(db_config=db_config) elapsed = time.time() - start_time if success: logging.info(f"港股市场数据更新成功完成! 耗时: {elapsed:.2f}秒") else: logging.error("港股市场数据更新失败")