142 lines
5.0 KiB
Python
142 lines
5.0 KiB
Python
"""
|
||
获取市场快照
|
||
"""
|
||
from futu import *
|
||
from base.MySQLHelper import MySQLHelper # MySQLHelper类保存为单独文件
|
||
from base.LogHelper import LogHelper
|
||
from datetime import datetime
|
||
from typing import Optional, List, Dict
|
||
from ConditionalSelection import FutuStockFilter
|
||
from tqdm import tqdm
|
||
import pandas as pd
|
||
import time
|
||
import csv
|
||
import pandas as pd
|
||
|
||
# 基本用法(自动创建日期日志+控制台输出)
|
||
logger = LogHelper(logger_name = 'snapshot').setup()
|
||
|
||
|
||
def read_single_account_stock_codes(file_path='data\missing_tables_小航富途.txt'):
|
||
"""基础读取方法 - 按行读取所有内容"""
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
lines = f.readlines()
|
||
# 去除每行末尾的换行符,并过滤空行
|
||
codes = [line.strip() for line in lines if line.strip()]
|
||
return codes
|
||
except FileNotFoundError:
|
||
print(f"文件 {file_path} 不存在")
|
||
return []
|
||
except Exception as e:
|
||
print(f"读取文件失败: {str(e)}")
|
||
return []
|
||
|
||
|
||
def get_stock_codes() -> List[str]:
|
||
"""从conditionalselection表获取所有股票代码"""
|
||
try:
|
||
with MySQLHelper(**db_config) as db:
|
||
sql = f"SELECT DISTINCT stock_code,stock_name FROM stock_filter"
|
||
results = db.execute_query(sql)
|
||
return [row['stock_code'] for row in results if row['stock_code'] and row['stock_name'][-1] != 'R']
|
||
except Exception as e:
|
||
logger.error(f"获取股票代码失败: {str(e)}")
|
||
return []
|
||
|
||
def export_to_csv(data: List[Dict], output_file: str, mode: str = 'a') -> bool:
|
||
"""
|
||
将数据导出到CSV文件,支持追加模式
|
||
|
||
Args:
|
||
data: 要导出的数据集
|
||
output_file: 输出的CSV文件路径
|
||
mode: 写入模式,'w'为覆盖写入,'a'为追加写入
|
||
|
||
Returns:
|
||
bool: 是否导出成功
|
||
"""
|
||
if not data:
|
||
logger.warning("没有数据可导出")
|
||
return False
|
||
|
||
try:
|
||
# 获取字段名(使用第一个数据的键)
|
||
field_names = list(data[0].keys())
|
||
|
||
# 字段名到中文的映射
|
||
header_map = {
|
||
'stock_code': '股票代码',
|
||
'stock_name': '股票名称',
|
||
'last_price': '最新价',
|
||
'outstanding_shares': '流通股本',
|
||
'circular_market_val': '流通市值'
|
||
}
|
||
|
||
# 检查文件是否存在,决定是否需要写入表头
|
||
file_exists = os.path.isfile(output_file)
|
||
|
||
with open(output_file, mode=mode, newline='', encoding='utf-8-sig') as csvfile:
|
||
writer = csv.DictWriter(csvfile, fieldnames=field_names)
|
||
|
||
# 如果是新文件或覆盖模式,写入表头
|
||
if not file_exists or mode == 'w':
|
||
# 写入中文表头
|
||
writer.writerow({col: header_map.get(col, col) for col in field_names})
|
||
|
||
# 写入数据
|
||
writer.writerows(data)
|
||
|
||
logger.info(f"成功{'追加' if mode == 'a' and file_exists else '写入'} {len(data)} 条记录到CSV文件: {output_file}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"导出到CSV失败: {str(e)}")
|
||
return False
|
||
|
||
if __name__ == "__main__":
|
||
# 数据库配置
|
||
db_config = {
|
||
'host': 'localhost',
|
||
'user': 'root',
|
||
'password': 'bzskmysql',
|
||
'database': 'hk_kline_1d'
|
||
}
|
||
|
||
# 每个账号获取的数据独立开来 -> 操作见面可以选择
|
||
market_data_all = get_stock_codes()
|
||
# market_data_hang = read_single_account_stock_codes('config\hang_futu.txt')
|
||
# market_data_kevin= read_single_account_stock_codes('config\kevin_futu.txt')
|
||
# market_data_HK= read_single_account_stock_codes('config\HK_futu.txt')
|
||
# market_data_new = list(set(market_data_all) - set(market_data_hang) - set(market_data_kevin) - set(market_data_HK))
|
||
|
||
# 动态调整
|
||
"""*********************************************"""
|
||
market_data = market_data_all
|
||
|
||
|
||
# 使用tqdm创建进度条
|
||
for code in tqdm(market_data, desc="下载股票数据", unit="支"):
|
||
quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
|
||
|
||
processed_data = []
|
||
ret, data = quote_ctx.get_market_snapshot(code)
|
||
if ret == RET_OK:
|
||
for _, row in data.iterrows():
|
||
item = {
|
||
'stock_code': row['code'],
|
||
'stock_name': row['name'],
|
||
'last_price': row['last_price'],
|
||
'outstanding_shares': row['outstanding_shares'],
|
||
'circular_market_val': float(row['circular_market_val'])
|
||
}
|
||
processed_data.append(item)
|
||
else:
|
||
print('error:', data)
|
||
|
||
if processed_data:
|
||
filePath = 'data/circular_market_val.csv'
|
||
csv_success = export_to_csv(processed_data, filePath)
|
||
|
||
quote_ctx.close() # 结束后记得关闭当条连接,防止连接条数用尽
|
||
time.sleep(0.7) |