Files
HKDataManagment/UpdateFutuData/getmarketsnapshot.py
2025-08-22 11:20:41 +08:00

142 lines
5.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
获取市场快照
"""
from futu import *
from base.MySQLHelper import MySQLHelper # MySQLHelper类保存为单独文件
from base.LogHelper import LogHelper
from datetime import datetime
from typing import Optional, List, Dict
from ConditionalSelection import FutuStockFilter
from tqdm import tqdm
import pandas as pd
import time
import csv
import pandas as pd
# 基本用法(自动创建日期日志+控制台输出)
logger = LogHelper(logger_name = 'snapshot').setup()
def read_single_account_stock_codes(file_path='data\missing_tables_小航富途.txt'):
"""基础读取方法 - 按行读取所有内容"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 去除每行末尾的换行符,并过滤空行
codes = [line.strip() for line in lines if line.strip()]
return codes
except FileNotFoundError:
print(f"文件 {file_path} 不存在")
return []
except Exception as e:
print(f"读取文件失败: {str(e)}")
return []
def get_stock_codes() -> List[str]:
"""从conditionalselection表获取所有股票代码"""
try:
with MySQLHelper(**db_config) as db:
sql = f"SELECT DISTINCT stock_code,stock_name FROM stock_filter"
results = db.execute_query(sql)
return [row['stock_code'] for row in results if row['stock_code'] and row['stock_name'][-1] != 'R']
except Exception as e:
logger.error(f"获取股票代码失败: {str(e)}")
return []
def export_to_csv(data: List[Dict], output_file: str, mode: str = 'a') -> bool:
"""
将数据导出到CSV文件支持追加模式
Args:
data: 要导出的数据集
output_file: 输出的CSV文件路径
mode: 写入模式,'w'为覆盖写入,'a'为追加写入
Returns:
bool: 是否导出成功
"""
if not data:
logger.warning("没有数据可导出")
return False
try:
# 获取字段名(使用第一个数据的键)
field_names = list(data[0].keys())
# 字段名到中文的映射
header_map = {
'stock_code': '股票代码',
'stock_name': '股票名称',
'last_price': '最新价',
'outstanding_shares': '流通股本',
'circular_market_val': '流通市值'
}
# 检查文件是否存在,决定是否需要写入表头
file_exists = os.path.isfile(output_file)
with open(output_file, mode=mode, newline='', encoding='utf-8-sig') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=field_names)
# 如果是新文件或覆盖模式,写入表头
if not file_exists or mode == 'w':
# 写入中文表头
writer.writerow({col: header_map.get(col, col) for col in field_names})
# 写入数据
writer.writerows(data)
logger.info(f"成功{'追加' if mode == 'a' and file_exists else '写入'} {len(data)} 条记录到CSV文件: {output_file}")
return True
except Exception as e:
logger.error(f"导出到CSV失败: {str(e)}")
return False
if __name__ == "__main__":
# 数据库配置
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'bzskmysql',
'database': 'hk_kline_1d'
}
# 每个账号获取的数据独立开来 -> 操作见面可以选择
market_data_all = get_stock_codes()
# market_data_hang = read_single_account_stock_codes('config\hang_futu.txt')
# market_data_kevin= read_single_account_stock_codes('config\kevin_futu.txt')
# market_data_HK= read_single_account_stock_codes('config\HK_futu.txt')
# market_data_new = list(set(market_data_all) - set(market_data_hang) - set(market_data_kevin) - set(market_data_HK))
# 动态调整
"""*********************************************"""
market_data = market_data_all
# 使用tqdm创建进度条
for code in tqdm(market_data, desc="下载股票数据", unit=""):
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
processed_data = []
ret, data = quote_ctx.get_market_snapshot(code)
if ret == RET_OK:
for _, row in data.iterrows():
item = {
'stock_code': row['code'],
'stock_name': row['name'],
'last_price': row['last_price'],
'outstanding_shares': row['outstanding_shares'],
'circular_market_val': float(row['circular_market_val'])
}
processed_data.append(item)
else:
print('error:', data)
if processed_data:
filePath = 'data/circular_market_val.csv'
csv_success = export_to_csv(processed_data, filePath)
quote_ctx.close() # 结束后记得关闭当条连接,防止连接条数用尽
time.sleep(0.7)