add stock data Scraping from baostock/akshare
This commit is contained in:
440
webDataScraping.py
Normal file
440
webDataScraping.py
Normal file
@@ -0,0 +1,440 @@
|
||||
from MySQLHelper import MySQLHelper # 导入我们创建的助手类
|
||||
from LogHelper import LogHelper
|
||||
import logging
|
||||
import pandas as pd
|
||||
import akshare as ak
|
||||
import re
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from tqdm import tqdm # 用于显示进度条
|
||||
|
||||
# 创建配置实例
|
||||
logHelper = LogHelper(
|
||||
level=logging.DEBUG, # 设置日志级别为 DEBUG
|
||||
format='%(asctime)s [%(levelname)s] %(message)s' # 自定义格式
|
||||
)
|
||||
|
||||
# # 添加处理器
|
||||
logHelper.add_console_handler() # 默认输出到 stdout
|
||||
logHelper.add_file_handler('Debug.log') # 添加文件日志
|
||||
|
||||
# # 应用配置
|
||||
logHelper.setup()
|
||||
logger = logging.getLogger('StockDataImporter')
|
||||
|
||||
# 数据库配置信息 股票列表
|
||||
DB_CONFIG = {
|
||||
'host': 'localhost',
|
||||
'user': 'root',
|
||||
'password': 'bzskmysql',
|
||||
'database': 'fullmarketdata_a',
|
||||
'port': 3306,
|
||||
'charset': 'utf8mb4'
|
||||
}
|
||||
|
||||
# 日K数据库
|
||||
DB_CONFIG_1D = {
|
||||
'host': 'localhost',
|
||||
'user': 'root',
|
||||
'password': 'bzskmysql',
|
||||
'database': 'klinedata_1d_ma',
|
||||
'port': 3306,
|
||||
'charset': 'utf8mb4'
|
||||
}
|
||||
|
||||
# 方法1:显式连接和关闭
|
||||
def get_SH_stock_codes() -> list:
|
||||
"""
|
||||
从数据库中获取所有 a_stock_code 值
|
||||
|
||||
返回:
|
||||
list: 包含所有股票代码的列表
|
||||
"""
|
||||
# 创建数据库助手实例
|
||||
db = MySQLHelper(**DB_CONFIG)
|
||||
|
||||
try:
|
||||
# 连接数据库
|
||||
if not db.connect():
|
||||
logger.error("数据库连接失败")
|
||||
return []
|
||||
|
||||
# 执行查询
|
||||
results = db.execute_query("SELECT a_stock_code FROM stocks_sh")
|
||||
|
||||
# 提取股票代码
|
||||
stock_codes = [row['a_stock_code'] for row in results if row['a_stock_code']]
|
||||
|
||||
return stock_codes
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取股票代码时出错: {e}")
|
||||
return []
|
||||
|
||||
finally:
|
||||
# 确保关闭数据库连接
|
||||
db.close()
|
||||
|
||||
def get_SZ_stock_codes() -> list:
|
||||
"""
|
||||
从数据库中获取所有 a_stock_code 值
|
||||
|
||||
返回:
|
||||
list: 包含所有股票代码的列表
|
||||
"""
|
||||
# 创建数据库助手实例
|
||||
db = MySQLHelper(**DB_CONFIG)
|
||||
|
||||
try:
|
||||
# 连接数据库
|
||||
if not db.connect():
|
||||
logger.error("数据库连接失败")
|
||||
return []
|
||||
|
||||
# 执行查询
|
||||
results = db.execute_query("SELECT a_stock_code FROM stocks_sz")
|
||||
|
||||
# 提取股票代码
|
||||
stock_codes = [row['a_stock_code'] for row in results if row['a_stock_code']]
|
||||
|
||||
return stock_codes
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取股票代码时出错: {e}")
|
||||
return []
|
||||
|
||||
finally:
|
||||
# 确保关闭数据库连接
|
||||
db.close()
|
||||
|
||||
# 方法2:使用上下文管理器(推荐)
|
||||
def get_SH_stock_codes_with_context() -> list:
|
||||
"""
|
||||
使用上下文管理器获取所有 a_stock_code 值
|
||||
|
||||
返回:
|
||||
list: 包含所有股票代码的列表
|
||||
"""
|
||||
# 使用上下文管理器自动处理连接
|
||||
with MySQLHelper(**DB_CONFIG) as db:
|
||||
try:
|
||||
# 执行查询
|
||||
results = db.execute_query("SELECT a_stock_code FROM stocks_sh")
|
||||
|
||||
# 提取股票代码
|
||||
return [row['a_stock_code'] for row in results if row['a_stock_code']]
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取股票代码时出错: {e}")
|
||||
return []
|
||||
|
||||
def get_SZ_stock_codes_with_context() -> list:
|
||||
"""
|
||||
使用上下文管理器获取所有 a_stock_code 值
|
||||
|
||||
返回:
|
||||
list: 包含所有股票代码的列表
|
||||
"""
|
||||
# 使用上下文管理器自动处理连接
|
||||
with MySQLHelper(**DB_CONFIG) as db:
|
||||
try:
|
||||
# 执行查询
|
||||
results = db.execute_query("SELECT a_stock_code FROM stocks_sz")
|
||||
|
||||
# 提取股票代码
|
||||
return [row['a_stock_code'] for row in results if row['a_stock_code']]
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取股票代码时出错: {e}")
|
||||
return []
|
||||
|
||||
def get_daily_k_data(stock_code: str, start_date: str, end_date: str) -> pd.DataFrame:
|
||||
"""
|
||||
获取单只股票的日K线数据
|
||||
|
||||
参数:
|
||||
stock_code: 格式化后的股票代码 (如 sh600000)
|
||||
start_date: 开始日期 (YYYYMMDD)
|
||||
end_date: 结束日期 (YYYYMMDD)
|
||||
|
||||
返回:
|
||||
DataFrame: 包含日K线数据的DataFrame
|
||||
"""
|
||||
try:
|
||||
# 获取股票历史行情数据
|
||||
df = ak.stock_zh_a_hist(
|
||||
symbol=stock_code,
|
||||
period="daily",
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
adjust="qfq" # 前复权
|
||||
)
|
||||
|
||||
# 如果数据为空,尝试使用原始代码
|
||||
if df.empty and not stock_code.startswith(('sh', 'sz', 'bj')):
|
||||
logger.info(f"尝试使用原始代码: {stock_code}")
|
||||
df = ak.stock_zh_a_hist(
|
||||
symbol=stock_code,
|
||||
period="daily",
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
adjust="qfq"
|
||||
)
|
||||
|
||||
# 重命名列
|
||||
if not df.empty:
|
||||
df.columns = [
|
||||
'date', 'open', 'close', 'high', 'low',
|
||||
'volume', 'amount', 'amplitude', 'change_percent',
|
||||
'change_amount', 'turnover'
|
||||
]
|
||||
df['code'] = stock_code # 添加股票代码列
|
||||
|
||||
return df
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取 {stock_code} 日K数据时出错: {e}")
|
||||
return pd.DataFrame()
|
||||
|
||||
def format_stock_code(code: str) -> str:
|
||||
"""
|
||||
格式化股票代码为akshare需要的格式
|
||||
|
||||
规则:
|
||||
- 6开头: 上海证券交易所 (sh)
|
||||
- 0或3开头: 深圳证券交易所 (sz)
|
||||
- 4或8开头: 北京证券交易所 (bj)
|
||||
|
||||
返回: 交易所前缀 + 股票代码
|
||||
"""
|
||||
# 如果代码已经是带前缀的格式,直接返回
|
||||
if code.startswith(('sh', 'sz', 'bj')):
|
||||
return code
|
||||
|
||||
# 根据数字前缀判断交易所
|
||||
if code.startswith('6'):
|
||||
return f"sh{code}"
|
||||
elif code.startswith(('0', '3')):
|
||||
return f"sz{code}"
|
||||
elif code.startswith(('4', '8')):
|
||||
return f"bj{code}"
|
||||
else:
|
||||
logger.error(f"无法识别的股票代码格式: {code}")
|
||||
return code # 返回原始格式,让akshare尝试处理
|
||||
|
||||
def get_daily_k_data(stock_code: str, start_date: str, end_date: str) -> pd.DataFrame:
|
||||
"""
|
||||
获取单只股票的日K线数据
|
||||
|
||||
参数:
|
||||
stock_code: 格式化后的股票代码 (如 sh600000)
|
||||
start_date: 开始日期 (YYYYMMDD)
|
||||
end_date: 结束日期 (YYYYMMDD)
|
||||
|
||||
返回:
|
||||
DataFrame: 包含日K线数据的DataFrame
|
||||
"""
|
||||
try:
|
||||
# 获取股票历史行情数据
|
||||
df = ak.stock_zh_a_hist(
|
||||
symbol=stock_code,
|
||||
period="daily",
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
adjust="qfq" # 前复权
|
||||
)
|
||||
|
||||
# 重命名列
|
||||
if not df.empty:
|
||||
df.columns = [
|
||||
'date', 'code', 'open', 'close', 'high', 'low',
|
||||
'volume', 'amount', 'amplitude', 'change_percent',
|
||||
'change_amount', 'turnover'
|
||||
]
|
||||
return df
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取 {stock_code} 日K数据时出错: {e}")
|
||||
return pd.DataFrame()
|
||||
|
||||
def create_stock_table(db: MySQLHelper, table_name: str) -> bool:
|
||||
"""
|
||||
创建股票数据表
|
||||
|
||||
参数:
|
||||
db: 数据库连接
|
||||
table_name: 表名 (格式: 交易所_股票代码, 如 sh_600000)
|
||||
|
||||
返回:
|
||||
bool: 是否成功
|
||||
"""
|
||||
# 检查表名是否合法
|
||||
if not re.match(r"^[a-z]{2}_[0-9]{6}$", table_name):
|
||||
print(f"表名 '{table_name}' 不符合命名规则")
|
||||
return False
|
||||
|
||||
# 创建表SQL
|
||||
create_table_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS `{table_name}` (
|
||||
`date` DATE NOT NULL COMMENT '日期',
|
||||
`code` DECIMAL(10, 2) NOT NULL COMMENT '代码',
|
||||
`open` DECIMAL(10, 2) NOT NULL COMMENT '开盘价',
|
||||
`close` DECIMAL(10, 2) NOT NULL COMMENT '收盘价',
|
||||
`high` DECIMAL(10, 2) NOT NULL COMMENT '最高价',
|
||||
`low` DECIMAL(10, 2) NOT NULL COMMENT '最低价',
|
||||
`volume` BIGINT NOT NULL COMMENT '成交量(手)',
|
||||
`amount` DECIMAL(20, 2) NOT NULL COMMENT '成交额(元)',
|
||||
`amplitude` DECIMAL(5, 2) NOT NULL COMMENT '振幅(%)',
|
||||
`change_percent` DECIMAL(5, 2) NOT NULL COMMENT '涨跌幅(%)',
|
||||
`change_amount` DECIMAL(5, 2) NOT NULL COMMENT '涨跌额(元)',
|
||||
`turnover` DECIMAL(5, 2) NOT NULL COMMENT '换手率(%)',
|
||||
PRIMARY KEY (`date`),
|
||||
INDEX `idx_date` (`date`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='股票日K数据表';
|
||||
"""
|
||||
|
||||
try:
|
||||
db.execute_update(create_table_sql)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"创建表 {table_name} 失败: {e}")
|
||||
return False
|
||||
|
||||
def save_stock_data_to_db(db: MySQLHelper, df: pd.DataFrame, table_name: str) -> int:
|
||||
"""
|
||||
将股票数据保存到数据库表中
|
||||
|
||||
参数:
|
||||
db: 数据库连接
|
||||
df: 包含股票数据的DataFrame
|
||||
table_name: 表名 (格式: 交易所_股票代码, 如 sh_600000)
|
||||
|
||||
返回:
|
||||
int: 成功插入的记录数
|
||||
"""
|
||||
if df.empty:
|
||||
return 0
|
||||
|
||||
# 检查表名是否合法
|
||||
if not re.match(r"^[a-z]{2}_[0-9]{6}$", table_name):
|
||||
print(f"表名 '{table_name}' 不符合命名规则")
|
||||
return 0
|
||||
|
||||
# 准备插入SQL
|
||||
insert_sql = f"""
|
||||
INSERT INTO `{table_name}` (
|
||||
date, code, open, close, high, low,
|
||||
volume, amount, amplitude, change_percent,
|
||||
change_amount, turnover
|
||||
) VALUES (
|
||||
%s, %s, %s, %s, %s,
|
||||
%s, %s, %s, %s,
|
||||
%s, %s, %s
|
||||
) ON DUPLICATE KEY UPDATE
|
||||
code = VALUES(code),
|
||||
open = VALUES(open),
|
||||
close = VALUES(close),
|
||||
high = VALUES(high),
|
||||
low = VALUES(low),
|
||||
volume = VALUES(volume),
|
||||
amount = VALUES(amount),
|
||||
amplitude = VALUES(amplitude),
|
||||
change_percent = VALUES(change_percent),
|
||||
change_amount = VALUES(change_amount),
|
||||
turnover = VALUES(turnover)
|
||||
"""
|
||||
|
||||
# 准备数据
|
||||
data_to_insert = []
|
||||
for _, row in df.iterrows():
|
||||
# 确保日期格式正确
|
||||
date_value = row['date']
|
||||
# if len(date_str) == 10: # YYYY-MM-DD
|
||||
# date_value = date_str
|
||||
# else:
|
||||
# try:
|
||||
# date_value = datetime.strptime(date_str, '%Y-%m-%d').strftime('%Y-%m-%d')
|
||||
# except:
|
||||
# # 尝试其他日期格式
|
||||
# date_value = date_str[:10] # 取前10个字符
|
||||
|
||||
data_to_insert.append((
|
||||
date_value, row['code'], row['open'], row['close'],
|
||||
row['high'], row['low'], row['volume'], row['amount'],
|
||||
row['amplitude'], row['change_percent'],
|
||||
row['change_amount'], row['turnover']
|
||||
))
|
||||
|
||||
# 批量插入
|
||||
if data_to_insert:
|
||||
try:
|
||||
affected_rows = db.execute_many(insert_sql, data_to_insert)
|
||||
print(f"表 {table_name}: 成功插入/更新 {affected_rows} 条记录")
|
||||
return affected_rows
|
||||
except Exception as e:
|
||||
print(f"保存数据到表 {table_name} 失败: {e}")
|
||||
return 0
|
||||
return 0
|
||||
|
||||
def generate_table_name(stock_code: str) -> str:
|
||||
"""
|
||||
根据股票代码生成表名 (格式: 交易所_股票代码)
|
||||
|
||||
参数:
|
||||
stock_code: 股票代码 (带或不带交易所前缀)
|
||||
|
||||
返回:
|
||||
str: 表名 (如 sh_600000)
|
||||
"""
|
||||
if stock_code.startswith('6'):
|
||||
return f"sh_{stock_code}"
|
||||
elif stock_code.startswith(('0', '3')):
|
||||
return f"sz_{stock_code}"
|
||||
elif stock_code.startswith(('4', '8')):
|
||||
return f"bj_{stock_code}"
|
||||
|
||||
# 默认处理
|
||||
return f"unknown_{stock_code}"
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
# 读取股票代码
|
||||
logger.info("读取股票代码")
|
||||
sh_stock_codes_context = get_SH_stock_codes_with_context()
|
||||
sz_stock_codes_context = get_SZ_stock_codes_with_context()
|
||||
all_stock_codes = sh_stock_codes_context + sz_stock_codes_context
|
||||
|
||||
if all_stock_codes:
|
||||
logger.info(f"前五个代码:{all_stock_codes[:5]}")
|
||||
logger.info(f"后五个代码:{all_stock_codes[-6:-1]}")
|
||||
|
||||
# 存储日K数据
|
||||
db_1d = MySQLHelper(**DB_CONFIG_1D)
|
||||
if not db_1d.connect():
|
||||
logger.error("数据库连接失败")
|
||||
|
||||
# 获取最近3年的数据
|
||||
start_date = (datetime.now() - timedelta(days = 3 * 365)).strftime("%Y%m%d")
|
||||
end_date = (datetime.now() + timedelta(days = 1)).strftime("%Y%m%d")
|
||||
logger.info(f"获取数据时间范围: {start_date} 至 {end_date}")
|
||||
|
||||
nCount = 0
|
||||
for code in all_stock_codes:
|
||||
nCount = nCount+1
|
||||
if nCount < 1584:
|
||||
continue
|
||||
df = get_daily_k_data(code,start_date,end_date)
|
||||
|
||||
# 生成表名 (交易所_股票代码)
|
||||
table_name = generate_table_name(code)
|
||||
|
||||
# 创建表(如果不存在)
|
||||
if not create_stock_table(db_1d, table_name):
|
||||
logger.error(f"无法为股票 {code} 创建表 {table_name}")
|
||||
|
||||
# 保存数据到表
|
||||
save_stock_data_to_db(db_1d, df, table_name)
|
||||
|
||||
# 添加延迟,避免请求过快
|
||||
time.sleep(5)
|
||||
Reference in New Issue
Block a user