2025-08-20 17:30:14 +08:00
|
|
|
|
"""
|
|
|
|
|
|
使用 baostock 数数据源获取/更新A股数据
|
|
|
|
|
|
|
|
|
|
|
|
***
|
|
|
|
|
|
该数据源中会返回非交易日数据,导致数据存储异常
|
|
|
|
|
|
"""
|
2025-08-19 21:35:08 +08:00
|
|
|
|
from MySQLHelper import MySQLHelper
|
|
|
|
|
|
from LogHelper import LogHelper
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
import re
|
|
|
|
|
|
import time
|
|
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
from tqdm import tqdm
|
|
|
|
|
|
import baostock as bs
|
|
|
|
|
|
|
2025-08-20 17:30:14 +08:00
|
|
|
|
# 基本用法(自动创建日期日志+控制台输出)
|
|
|
|
|
|
logger = LogHelper(logger_name = 'baoStock').setup()
|
2025-08-19 21:35:08 +08:00
|
|
|
|
|
|
|
|
|
|
# 数据库配置
|
|
|
|
|
|
DB_CONFIG = {
|
|
|
|
|
|
'host': 'localhost',
|
|
|
|
|
|
'user': 'root',
|
|
|
|
|
|
'password': 'bzskmysql',
|
|
|
|
|
|
'database': 'fullmarketdata_a',
|
|
|
|
|
|
'port': 3306,
|
|
|
|
|
|
'charset': 'utf8mb4'
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
DB_CONFIG_1D = {
|
|
|
|
|
|
'host': 'localhost',
|
|
|
|
|
|
'user': 'root',
|
|
|
|
|
|
'password': 'bzskmysql',
|
|
|
|
|
|
'database': 'klinedata_1d_ma_bao',
|
|
|
|
|
|
'port': 3306,
|
|
|
|
|
|
'charset': 'utf8mb4'
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def get_SH_stock_codes_with_context() -> list:
|
|
|
|
|
|
"""获取上海股票代码"""
|
|
|
|
|
|
with MySQLHelper(**DB_CONFIG) as db:
|
|
|
|
|
|
try:
|
|
|
|
|
|
results = db.execute_query("SELECT a_stock_code FROM stocks_sh")
|
|
|
|
|
|
return [row['a_stock_code'] for row in results if row['a_stock_code']]
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取股票代码时出错: {e}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
def get_SZ_stock_codes_with_context() -> list:
|
|
|
|
|
|
"""获取深圳股票代码"""
|
|
|
|
|
|
with MySQLHelper(**DB_CONFIG) as db:
|
|
|
|
|
|
try:
|
|
|
|
|
|
results = db.execute_query("SELECT a_stock_code FROM stocks_sz")
|
|
|
|
|
|
return [row['a_stock_code'] for row in results if row['a_stock_code']]
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取股票代码时出错: {e}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
def generate_table_name(code: str) -> str:
|
|
|
|
|
|
"""根据股票代码生成表名"""
|
|
|
|
|
|
# 移除可能的前缀
|
|
|
|
|
|
clean_code = code.replace("sh.", "").replace("sz.", "")
|
|
|
|
|
|
|
|
|
|
|
|
if clean_code.startswith('6'):
|
|
|
|
|
|
return f"sh_{clean_code}"
|
|
|
|
|
|
elif clean_code.startswith(('0', '3')):
|
|
|
|
|
|
return f"sz_{clean_code}"
|
|
|
|
|
|
elif clean_code.startswith(('4', '8')):
|
|
|
|
|
|
return f"bj_{clean_code}"
|
|
|
|
|
|
return f"unknown_{clean_code}"
|
|
|
|
|
|
|
|
|
|
|
|
def create_stock_table(db: MySQLHelper, table_name: str) -> bool:
|
|
|
|
|
|
"""创建股票数据表(匹配baostock数据结构)"""
|
|
|
|
|
|
if not re.match(r"^[a-z]{2}_[0-9]{6}$", table_name):
|
|
|
|
|
|
logger.error(f"表名 '{table_name}' 不符合命名规则")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
create_table_sql = f"""
|
|
|
|
|
|
CREATE TABLE IF NOT EXISTS `{table_name}` (
|
|
|
|
|
|
`date` DATE NOT NULL COMMENT '日期',
|
|
|
|
|
|
`code` VARCHAR(10) NOT NULL COMMENT '代码',
|
|
|
|
|
|
`open` DECIMAL(10, 2) NOT NULL COMMENT '开盘价',
|
|
|
|
|
|
`high` DECIMAL(10, 2) NOT NULL COMMENT '最高价',
|
|
|
|
|
|
`low` DECIMAL(10, 2) NOT NULL COMMENT '最低价',
|
|
|
|
|
|
`close` DECIMAL(10, 2) NOT NULL COMMENT '收盘价',
|
|
|
|
|
|
`preclose` DECIMAL(10, 2) NOT NULL COMMENT '前收盘价',
|
|
|
|
|
|
`volume` BIGINT NOT NULL COMMENT '成交量(股)',
|
|
|
|
|
|
`amount` DECIMAL(20, 2) NOT NULL COMMENT '成交额(元)',
|
|
|
|
|
|
`adjustflag` TINYINT NOT NULL COMMENT '复权状态',
|
|
|
|
|
|
`turn` DECIMAL(10, 2) NOT NULL COMMENT '换手率(%)',
|
|
|
|
|
|
`tradestatus` TINYINT NOT NULL COMMENT '交易状态',
|
|
|
|
|
|
`pctChg` DECIMAL(10, 2) NOT NULL COMMENT '涨跌幅(%)',
|
|
|
|
|
|
`isST` TINYINT NOT NULL COMMENT '是否ST股',
|
|
|
|
|
|
PRIMARY KEY (`date`),
|
|
|
|
|
|
INDEX `idx_date` (`date`)
|
|
|
|
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='股票日K数据表';
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
db.execute_update(create_table_sql)
|
2025-08-20 17:30:14 +08:00
|
|
|
|
logger.info(f"成功打开表: {table_name}")
|
2025-08-19 21:35:08 +08:00
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"创建表 {table_name} 失败: {e}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def save_stock_data_to_db(db: MySQLHelper, df: pd.DataFrame, table_name: str) -> int:
|
|
|
|
|
|
"""将股票数据保存到数据库表中"""
|
|
|
|
|
|
if df.empty:
|
|
|
|
|
|
logger.warning(f"表 {table_name}: 无数据可保存")
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
if not re.match(r"^[a-z]{2}_[0-9]{6}$", table_name):
|
|
|
|
|
|
logger.error(f"表名 '{table_name}' 不符合命名规则")
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
# 准备插入SQL(匹配表结构)
|
|
|
|
|
|
insert_sql = f"""
|
|
|
|
|
|
INSERT INTO `{table_name}` (
|
|
|
|
|
|
date, code, open, high, low, close, preclose,
|
|
|
|
|
|
volume, amount, adjustflag, turn, tradestatus, pctChg, isST
|
|
|
|
|
|
) VALUES (
|
|
|
|
|
|
%s, %s, %s, %s, %s, %s, %s,
|
|
|
|
|
|
%s, %s, %s, %s, %s, %s, %s
|
|
|
|
|
|
) ON DUPLICATE KEY UPDATE
|
|
|
|
|
|
date = VALUES(date),
|
|
|
|
|
|
code = VALUES(code),
|
|
|
|
|
|
open = VALUES(open),
|
|
|
|
|
|
high = VALUES(high),
|
|
|
|
|
|
low = VALUES(low),
|
|
|
|
|
|
close = VALUES(close),
|
|
|
|
|
|
preclose = VALUES(preclose),
|
|
|
|
|
|
volume = VALUES(volume),
|
|
|
|
|
|
amount = VALUES(amount),
|
|
|
|
|
|
adjustflag = VALUES(adjustflag),
|
|
|
|
|
|
turn = VALUES(turn),
|
|
|
|
|
|
tradestatus = VALUES(tradestatus),
|
|
|
|
|
|
pctChg = VALUES(pctChg),
|
|
|
|
|
|
isST = VALUES(isST)
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
# 准备数据
|
|
|
|
|
|
data_to_insert = []
|
|
|
|
|
|
for _, row in df.iterrows():
|
|
|
|
|
|
|
|
|
|
|
|
# m没有交易,则跳过该数据
|
|
|
|
|
|
if row['tradestatus'] == '0':
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
# 转换数据类型
|
|
|
|
|
|
try:
|
|
|
|
|
|
data_to_insert.append((
|
|
|
|
|
|
row['date'],
|
|
|
|
|
|
row['code'],
|
|
|
|
|
|
float(row['open']),
|
|
|
|
|
|
float(row['high']),
|
|
|
|
|
|
float(row['low']),
|
|
|
|
|
|
float(row['close']),
|
|
|
|
|
|
float(row['preclose']),
|
|
|
|
|
|
int(row['volume']), # 注意:baostock返回的是股数
|
|
|
|
|
|
float(row['amount']),
|
|
|
|
|
|
int(row['adjustflag']),
|
|
|
|
|
|
float(row['turn']),
|
|
|
|
|
|
int(row['tradestatus']),
|
|
|
|
|
|
float(row['pctChg']),
|
|
|
|
|
|
int(row['isST'])
|
|
|
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"处理行数据时出错: {e}\n行数据: {row}")
|
|
|
|
|
|
|
|
|
|
|
|
# 批量插入
|
|
|
|
|
|
if not data_to_insert:
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
affected_rows = db.execute_many(insert_sql, data_to_insert)
|
|
|
|
|
|
logger.info(f"表 {table_name}: 成功插入/更新 {affected_rows} 条记录")
|
|
|
|
|
|
return affected_rows
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"保存数据到表 {table_name} 失败: {e}")
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
# 登陆baostock
|
|
|
|
|
|
lg = bs.login()
|
|
|
|
|
|
logger.info(f'登陆返回: error_code={lg.error_code}, error_msg={lg.error_msg}')
|
|
|
|
|
|
|
|
|
|
|
|
if lg.error_code != '0':
|
|
|
|
|
|
logger.error("baostock登录失败,程序终止")
|
|
|
|
|
|
exit(1)
|
|
|
|
|
|
|
|
|
|
|
|
# 读取股票代码
|
|
|
|
|
|
logger.info("开始读取股票代码")
|
|
|
|
|
|
sh_codes = get_SH_stock_codes_with_context()
|
|
|
|
|
|
sz_codes = get_SZ_stock_codes_with_context()
|
|
|
|
|
|
logger.info(f"获取到上海股票数量: {len(sh_codes)}, 深圳股票数量: {len(sz_codes)}")
|
|
|
|
|
|
|
|
|
|
|
|
# 连接数据库
|
|
|
|
|
|
db_1d = MySQLHelper(**DB_CONFIG_1D)
|
|
|
|
|
|
if not db_1d.connect():
|
|
|
|
|
|
logger.error("数据库连接失败")
|
|
|
|
|
|
exit(1)
|
|
|
|
|
|
|
|
|
|
|
|
# # 设置日期范围
|
|
|
|
|
|
# end_date = datetime.now().strftime("%Y-%m-%d")
|
|
|
|
|
|
# start_date = (datetime.now() - timedelta(days= 5 * 365)).strftime("%Y-%m-%d")
|
|
|
|
|
|
# logger.info(f"获取数据时间范围: {start_date} 至 {end_date}")
|
|
|
|
|
|
|
|
|
|
|
|
# 每个交易日结束后,都需压迫抓取一次数据,方便后续处理
|
|
|
|
|
|
# 设置日期范围:(当天开始,下一天结束),无需重复抓取
|
|
|
|
|
|
end_date = datetime.now().strftime("%Y-%m-%d")
|
|
|
|
|
|
start_date = (datetime.now() - timedelta(days= 1)).strftime("%Y-%m-%d")
|
|
|
|
|
|
logger.info(f"获取数据时间范围: {start_date} 至 {end_date}")
|
|
|
|
|
|
|
|
|
|
|
|
# 获取所有股票代码
|
|
|
|
|
|
all_codes = []
|
|
|
|
|
|
for code in sh_codes:
|
|
|
|
|
|
all_codes.append(("sh", code))
|
|
|
|
|
|
for code in sz_codes:
|
|
|
|
|
|
all_codes.append(("sz", code))
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"总股票数量: {len(all_codes)}")
|
|
|
|
|
|
|
|
|
|
|
|
# 使用tqdm创建进度条
|
|
|
|
|
|
for exchange, code in tqdm(all_codes, desc="下载股票数据", unit="支"):
|
|
|
|
|
|
full_code = f"{exchange}.{code}"
|
|
|
|
|
|
table_name = generate_table_name(full_code)
|
|
|
|
|
|
|
|
|
|
|
|
# 创建表(如果不存在)
|
|
|
|
|
|
if not create_stock_table(db_1d, table_name):
|
|
|
|
|
|
logger.error(f"跳过股票 {full_code}")
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
# 获取数据
|
|
|
|
|
|
rs = bs.query_history_k_data_plus(
|
|
|
|
|
|
full_code,
|
|
|
|
|
|
"date,code,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,isST",
|
|
|
|
|
|
start_date=start_date,
|
|
|
|
|
|
end_date=end_date,
|
|
|
|
|
|
frequency="d",
|
|
|
|
|
|
adjustflag="2" # 前复权
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if rs.error_code != '0':
|
|
|
|
|
|
logger.error(f"获取 {full_code} 数据失败: {rs.error_msg}")
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
# 处理数据
|
|
|
|
|
|
data_list = []
|
|
|
|
|
|
while rs.next():
|
|
|
|
|
|
data_list.append(rs.get_row_data())
|
|
|
|
|
|
|
|
|
|
|
|
if not data_list:
|
|
|
|
|
|
logger.warning(f"股票 {full_code} 无数据返回")
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
df = pd.DataFrame(data_list, columns=rs.fields)
|
|
|
|
|
|
|
|
|
|
|
|
# 保存数据
|
|
|
|
|
|
save_stock_data_to_db(db_1d, df, table_name)
|
|
|
|
|
|
|
|
|
|
|
|
# 添加延迟,避免请求过快
|
|
|
|
|
|
time.sleep(5) # 适当降低延迟
|
|
|
|
|
|
|
|
|
|
|
|
# 关闭连接
|
|
|
|
|
|
bs.logout()
|
|
|
|
|
|
db_1d.close()
|
|
|
|
|
|
logger.info("程序执行完成")
|