161 lines
5.7 KiB
Python
161 lines
5.7 KiB
Python
"""
|
|
#
|
|
# 使用筛选器来更新股票列表 <临时使用,功能是完整的,后面继续优化>
|
|
#
|
|
"""
|
|
|
|
from futu import *
|
|
import time
|
|
from typing import List, Dict
|
|
from base.MySQLHelper import MySQLHelper # 假设您已有MySQLHelper类
|
|
from base.LogHelper import LogHelper
|
|
|
|
# 基本用法(自动创建日期日志+控制台输出)
|
|
logger = LogHelper(logger_name = 'floatShare').setup()
|
|
|
|
class FutuStockFilter:
|
|
def __init__(self, db_config:dict):
|
|
self.db_config = db_config
|
|
self.table_name = "stock_filter" # 筛选后的股票信息
|
|
self.quote_ctx = None
|
|
|
|
def connect_to_futu(self) -> bool:
|
|
"""连接Futu API"""
|
|
try:
|
|
self.quote_ctx = OpenQuoteContext(host='127.0.0.1', port=11111)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"连接Futu API失败: {str(e)}")
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""断开连接"""
|
|
if self.quote_ctx:
|
|
self.quote_ctx.close()
|
|
|
|
# 准备数据结构,后面可以根据实际需要再增加,
|
|
# stock_filter 表格中的数据每日更新
|
|
def prepare_db_structure(self) -> bool:
|
|
"""准备数据库表结构"""
|
|
create_table_sql = f"""
|
|
CREATE TABLE IF NOT EXISTS `{self.table_name}` (
|
|
`stock_code` varchar(255) NOT NULL,
|
|
`stock_name` varchar(255) DEFAULT NULL,
|
|
`float_share` float DEFAULT NULL,
|
|
UNIQUE KEY `idx_stock_code` (`stock_code`)
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
|
"""
|
|
|
|
try:
|
|
with MySQLHelper(**self.db_config) as db:
|
|
db.execute_update(create_table_sql)
|
|
logger.info("数据库表准备就绪")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"准备数据库表失败: {str(e)}")
|
|
return False
|
|
|
|
def process_filter_result(self, ret_list: List) -> List[Dict]:
|
|
"""处理筛选结果并转换为数据库格式"""
|
|
processed_data = []
|
|
for item in ret_list:
|
|
try:
|
|
data = {
|
|
'stock_code': item.stock_code,
|
|
'stock_name': item.stock_name,
|
|
'float_share': item.float_share
|
|
}
|
|
processed_data.append(data)
|
|
except Exception as e:
|
|
logger.error(f"处理股票 {getattr(item, 'stock_code', '未知')} 数据失败: {str(e)}")
|
|
|
|
return processed_data
|
|
|
|
def save_batch_to_database(self, data_batch: List[Dict]) -> int:
|
|
"""批量保存数据到数据库"""
|
|
if not data_batch:
|
|
return 0
|
|
|
|
# 准备SQL
|
|
columns = [
|
|
'stock_code', 'stock_name', 'float_share'
|
|
]
|
|
|
|
placeholders = ', '.join(['%s'] * len(columns))
|
|
columns_str = ', '.join([f'`{col}`' for col in columns])
|
|
updates = ', '.join([f'`{col}`=VALUES(`{col}`)' for col in columns if col != 'stock_code'])
|
|
|
|
sql = f"""
|
|
INSERT INTO `{self.table_name}` ({columns_str})
|
|
VALUES ({placeholders})
|
|
ON DUPLICATE KEY UPDATE {updates}
|
|
"""
|
|
|
|
try:
|
|
with MySQLHelper(**self.db_config) as db:
|
|
# 准备参数列表
|
|
params = []
|
|
for item in data_batch:
|
|
params.append(tuple(item.get(col) for col in columns))
|
|
|
|
affected_rows = db.execute_many(sql, params)
|
|
logger.info(f"批量保存了 {len(data_batch)} 条记录,影响 {affected_rows} 行")
|
|
return affected_rows
|
|
except Exception as e:
|
|
logger.error(f"批量保存失败: {str(e)}")
|
|
return 0
|
|
|
|
def run_direct_import(self):
|
|
"""运行直接导入流程"""
|
|
if not self.connect_to_futu():
|
|
return False
|
|
|
|
if not self.prepare_db_structure():
|
|
self.disconnect()
|
|
return False
|
|
|
|
logger.info(f"================= start ===================")
|
|
simple_filter = SimpleFilter()
|
|
simple_filter.filter_min = 0
|
|
simple_filter.filter_max = 100000000000000
|
|
simple_filter.stock_field = StockField.FLOAT_SHARE
|
|
simple_filter.is_no_filter = False
|
|
|
|
nBegin = 0
|
|
last_page = False
|
|
total_imported = 0
|
|
|
|
try:
|
|
while not last_page:
|
|
ret, ls = self.quote_ctx.get_stock_filter(
|
|
market=Market.HK,
|
|
filter_list=[simple_filter],
|
|
begin=nBegin
|
|
)
|
|
|
|
if ret == RET_OK:
|
|
last_page, all_count, ret_list = ls
|
|
logger.info(f'获取股票列表进度: {nBegin}/{all_count}')
|
|
|
|
# 处理并保存当前批次数据
|
|
processed_data = self.process_filter_result(ret_list)
|
|
if processed_data:
|
|
affected_rows = self.save_batch_to_database(processed_data)
|
|
total_imported += affected_rows
|
|
|
|
nBegin += len(ret_list)
|
|
else:
|
|
logger.error(f'获取股票列表错误: {ls}')
|
|
break
|
|
|
|
time.sleep(1) # 避免触发限频
|
|
|
|
logger.info(f"导入完成! 共导入 {total_imported} 条记录")
|
|
logger.info(f"================= end ===================")
|
|
return total_imported > 0
|
|
|
|
except Exception as e:
|
|
logger.error(f"导入过程中发生错误: {str(e)}")
|
|
return False
|
|
finally:
|
|
self.disconnect() |