Files
MADataManagment/webDataScraping.py
2025-08-20 17:30:14 +08:00

430 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
使用 akshare 数数据源获取/更新A股数据
"""
from MySQLHelper import MySQLHelper # 导入我们创建的助手类
from LogHelper import LogHelper
import pandas as pd
import akshare as ak
import re
import time
from datetime import datetime, timedelta
import logging
# 基本用法(自动创建日期日志+控制台输出)
logger = LogHelper(logger_name = 'AkShare').setup()
# 数据库配置信息 股票列表
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'bzskmysql',
'database': 'fullmarketdata_a',
'port': 3306,
'charset': 'utf8mb4'
}
# 日K数据库
DB_CONFIG_1D = {
'host': 'localhost',
'user': 'root',
'password': 'bzskmysql',
'database': 'klinedata_1d_ma',
'port': 3306,
'charset': 'utf8mb4'
}
# 方法1显式连接和关闭
def get_SH_stock_codes() -> list:
"""
从数据库中获取所有 a_stock_code 值
返回:
list: 包含所有股票代码的列表
"""
# 创建数据库助手实例
db = MySQLHelper(**DB_CONFIG)
try:
# 连接数据库
if not db.connect():
logger.error("数据库连接失败")
return []
# 执行查询
results = db.execute_query("SELECT a_stock_code FROM stocks_sh")
# 提取股票代码
stock_codes = [row['a_stock_code'] for row in results if row['a_stock_code']]
return stock_codes
except Exception as e:
logger.error(f"获取股票代码时出错: {e}")
return []
finally:
# 确保关闭数据库连接
db.close()
def get_SZ_stock_codes() -> list:
"""
从数据库中获取所有 a_stock_code 值
返回:
list: 包含所有股票代码的列表
"""
# 创建数据库助手实例
db = MySQLHelper(**DB_CONFIG)
try:
# 连接数据库
if not db.connect():
logger.error("数据库连接失败")
return []
# 执行查询
results = db.execute_query("SELECT a_stock_code FROM stocks_sz")
# 提取股票代码
stock_codes = [row['a_stock_code'] for row in results if row['a_stock_code']]
return stock_codes
except Exception as e:
logger.error(f"获取股票代码时出错: {e}")
return []
finally:
# 确保关闭数据库连接
db.close()
# 方法2使用上下文管理器推荐
def get_SH_stock_codes_with_context() -> list:
"""
使用上下文管理器获取所有 a_stock_code 值
返回:
list: 包含所有股票代码的列表
"""
# 使用上下文管理器自动处理连接
with MySQLHelper(**DB_CONFIG) as db:
try:
# 执行查询
results = db.execute_query("SELECT a_stock_code FROM stocks_sh")
# 提取股票代码
return [row['a_stock_code'] for row in results if row['a_stock_code']]
except Exception as e:
logger.error(f"获取股票代码时出错: {e}")
return []
def get_SZ_stock_codes_with_context() -> list:
"""
使用上下文管理器获取所有 a_stock_code 值
返回:
list: 包含所有股票代码的列表
"""
# 使用上下文管理器自动处理连接
with MySQLHelper(**DB_CONFIG) as db:
try:
# 执行查询
results = db.execute_query("SELECT a_stock_code FROM stocks_sz")
# 提取股票代码
return [row['a_stock_code'] for row in results if row['a_stock_code']]
except Exception as e:
logger.error(f"获取股票代码时出错: {e}")
return []
def get_daily_k_data(stock_code: str, start_date: str, end_date: str) -> pd.DataFrame:
"""
获取单只股票的日K线数据
参数:
stock_code: 格式化后的股票代码 (如 sh600000)
start_date: 开始日期 (YYYYMMDD)
end_date: 结束日期 (YYYYMMDD)
返回:
DataFrame: 包含日K线数据的DataFrame
"""
try:
# 获取股票历史行情数据
df = ak.stock_zh_a_hist(
symbol=stock_code,
period="daily",
start_date=start_date,
end_date=end_date,
adjust="qfq" # 前复权
)
# 如果数据为空,尝试使用原始代码
if df.empty and not stock_code.startswith(('sh', 'sz', 'bj')):
logger.info(f"尝试使用原始代码: {stock_code}")
df = ak.stock_zh_a_hist(
symbol=stock_code,
period="daily",
start_date=start_date,
end_date=end_date,
adjust="qfq"
)
# 重命名列
if not df.empty:
df.columns = [
'date', 'open', 'close', 'high', 'low',
'volume', 'amount', 'amplitude', 'change_percent',
'change_amount', 'turnover'
]
df['code'] = stock_code # 添加股票代码列
return df
except Exception as e:
logger.error(f"获取 {stock_code} 日K数据时出错: {e}")
return pd.DataFrame()
def format_stock_code(code: str) -> str:
"""
格式化股票代码为akshare需要的格式
规则:
- 6开头: 上海证券交易所 (sh)
- 0或3开头: 深圳证券交易所 (sz)
- 4或8开头: 北京证券交易所 (bj)
返回: 交易所前缀 + 股票代码
"""
# 如果代码已经是带前缀的格式,直接返回
if code.startswith(('sh', 'sz', 'bj')):
return code
# 根据数字前缀判断交易所
if code.startswith('6'):
return f"sh{code}"
elif code.startswith(('0', '3')):
return f"sz{code}"
elif code.startswith(('4', '8')):
return f"bj{code}"
else:
logger.error(f"无法识别的股票代码格式: {code}")
return code # 返回原始格式让akshare尝试处理
def get_daily_k_data(stock_code: str, start_date: str, end_date: str) -> pd.DataFrame:
"""
获取单只股票的日K线数据
参数:
stock_code: 格式化后的股票代码 (如 sh600000)
start_date: 开始日期 (YYYYMMDD)
end_date: 结束日期 (YYYYMMDD)
返回:
DataFrame: 包含日K线数据的DataFrame
"""
try:
# 获取股票历史行情数据
df = ak.stock_zh_a_hist(
symbol=stock_code,
period="daily",
start_date=start_date,
end_date=end_date,
adjust="qfq" # 前复权
)
# 重命名列
if not df.empty:
df.columns = [
'date', 'code', 'open', 'close', 'high', 'low',
'volume', 'amount', 'amplitude', 'change_percent',
'change_amount', 'turnover'
]
return df
except Exception as e:
logger.error(f"获取 {stock_code} 日K数据时出错: {e}")
return pd.DataFrame()
def create_stock_table(db: MySQLHelper, table_name: str) -> bool:
"""
创建股票数据表
参数:
db: 数据库连接
table_name: 表名 (格式: 交易所_股票代码, 如 sh_600000)
返回:
bool: 是否成功
"""
# 检查表名是否合法
if not re.match(r"^[a-z]{2}_[0-9]{6}$", table_name):
print(f"表名 '{table_name}' 不符合命名规则")
return False
# 创建表SQL
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS `{table_name}` (
`date` DATE NOT NULL COMMENT '日期',
`code` DECIMAL(10, 2) NOT NULL COMMENT '代码',
`open` DECIMAL(10, 2) NOT NULL COMMENT '开盘价',
`close` DECIMAL(10, 2) NOT NULL COMMENT '收盘价',
`high` DECIMAL(10, 2) NOT NULL COMMENT '最高价',
`low` DECIMAL(10, 2) NOT NULL COMMENT '最低价',
`volume` BIGINT NOT NULL COMMENT '成交量(手)',
`amount` DECIMAL(20, 2) NOT NULL COMMENT '成交额(元)',
`amplitude` DECIMAL(5, 2) NOT NULL COMMENT '振幅(%)',
`change_percent` DECIMAL(5, 2) NOT NULL COMMENT '涨跌幅(%)',
`change_amount` DECIMAL(5, 2) NOT NULL COMMENT '涨跌额(元)',
`turnover` DECIMAL(5, 2) NOT NULL COMMENT '换手率(%)',
PRIMARY KEY (`date`),
INDEX `idx_date` (`date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='股票日K数据表';
"""
try:
db.execute_update(create_table_sql)
return True
except Exception as e:
print(f"创建表 {table_name} 失败: {e}")
return False
def save_stock_data_to_db(db: MySQLHelper, df: pd.DataFrame, table_name: str) -> int:
"""
将股票数据保存到数据库表中
参数:
db: 数据库连接
df: 包含股票数据的DataFrame
table_name: 表名 (格式: 交易所_股票代码, 如 sh_600000)
返回:
int: 成功插入的记录数
"""
if df.empty:
return 0
# 检查表名是否合法
if not re.match(r"^[a-z]{2}_[0-9]{6}$", table_name):
print(f"表名 '{table_name}' 不符合命名规则")
return 0
# 准备插入SQL
insert_sql = f"""
INSERT INTO `{table_name}` (
date, code, open, close, high, low,
volume, amount, amplitude, change_percent,
change_amount, turnover
) VALUES (
%s, %s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s
) ON DUPLICATE KEY UPDATE
code = VALUES(code),
open = VALUES(open),
close = VALUES(close),
high = VALUES(high),
low = VALUES(low),
volume = VALUES(volume),
amount = VALUES(amount),
amplitude = VALUES(amplitude),
change_percent = VALUES(change_percent),
change_amount = VALUES(change_amount),
turnover = VALUES(turnover)
"""
# 准备数据
data_to_insert = []
for _, row in df.iterrows():
# 确保日期格式正确
date_value = row['date']
# if len(date_str) == 10: # YYYY-MM-DD
# date_value = date_str
# else:
# try:
# date_value = datetime.strptime(date_str, '%Y-%m-%d').strftime('%Y-%m-%d')
# except:
# # 尝试其他日期格式
# date_value = date_str[:10] # 取前10个字符
data_to_insert.append((
date_value, row['code'], row['open'], row['close'],
row['high'], row['low'], row['volume'], row['amount'],
row['amplitude'], row['change_percent'],
row['change_amount'], row['turnover']
))
# 批量插入
if data_to_insert:
try:
affected_rows = db.execute_many(insert_sql, data_to_insert)
print(f"{table_name}: 成功插入/更新 {affected_rows} 条记录")
return affected_rows
except Exception as e:
print(f"保存数据到表 {table_name} 失败: {e}")
return 0
return 0
def generate_table_name(stock_code: str) -> str:
"""
根据股票代码生成表名 (格式: 交易所_股票代码)
参数:
stock_code: 股票代码 (带或不带交易所前缀)
返回:
str: 表名 (如 sh_600000)
"""
if stock_code.startswith('6'):
return f"sh_{stock_code}"
elif stock_code.startswith(('0', '3')):
return f"sz_{stock_code}"
elif stock_code.startswith(('4', '8')):
return f"bj_{stock_code}"
# 默认处理
return f"unknown_{stock_code}"
if __name__ == "__main__":
# 读取股票代码
logger.info("读取股票代码")
sh_stock_codes_context = get_SH_stock_codes_with_context()
sz_stock_codes_context = get_SZ_stock_codes_with_context()
all_stock_codes = sh_stock_codes_context + sz_stock_codes_context
if all_stock_codes:
logger.info(f"前五个代码:{all_stock_codes[:5]}")
logger.info(f"后五个代码:{all_stock_codes[-6:-1]}")
# 存储日K数据
db_1d = MySQLHelper(**DB_CONFIG_1D)
if not db_1d.connect():
logger.error("数据库连接失败")
# 获取最近3年的数据
start_date = (datetime.now() - timedelta(days = 3 * 365)).strftime("%Y%m%d")
end_date = (datetime.now() + timedelta(days = 1)).strftime("%Y%m%d")
logger.info(f"获取数据时间范围: {start_date}{end_date}")
nCount = 0
for code in all_stock_codes:
nCount = nCount+1
if nCount < 1584:
continue
df = get_daily_k_data(code,start_date,end_date)
# 生成表名 (交易所_股票代码)
table_name = generate_table_name(code)
# 创建表(如果不存在)
if not create_stock_table(db_1d, table_name):
logger.error(f"无法为股票 {code} 创建表 {table_name}")
# 保存数据到表
save_stock_data_to_db(db_1d, df, table_name)
# 添加延迟,避免请求过快
time.sleep(5)