Files
QTradeProgram/Sqbase/qorderprocessor.cpp

467 lines
13 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "qorderprocessor.h"
#include <QJsonDocument>
#include <QJsonArray>
#include <QDebug>
#include <immintrin.h>
#include <algorithm>
#include <QtConcurrent/QtConcurrent>
#include "qlogmanager.h"
#include "qeventbus.h"
QOrderProcessor::QOrderProcessor(QObject *parent)
: QObject(parent)
{
int idealThreads = QThread::idealThreadCount();
//m_threadPool.setMaxThreadCount(qBound(4, idealThreads, 16));
m_threadPool.setMaxThreadCount(qBound(4, 6, 8));
// 初始化JSON保存线程池
m_jsonSaveThreadPool.setMaxThreadCount(2); // 限制JSON保存线程数量避免过多IO操作
// 存储盘口数据 临时使用 2025.11.10
setJsonSaveEnabled(true);
}
QOrderProcessor::~QOrderProcessor()
{
m_threadPool.waitForDone();
m_jsonSaveThreadPool.waitForDone();
}
void QOrderProcessor::setProcessingEnabled(bool enabled)
{
QMutexLocker lock(&m_dataMutex);
m_enabled = enabled;
}
void QOrderProcessor::setJsonSaveEnabled(bool enabled)
{
QMutexLocker lock(&m_dataMutex);
m_jsonSaveEnabled = enabled;
}
double QOrderProcessor::sumQuantity(const QVector<OrderBookEntry>& items)const {
return std::accumulate(items.begin(), items.end(), 0.0,
[](double sum, const OrderBookEntry& item) {
return sum + item.volume;
});
}
/*
1. 找到列表中的最大值和次大值
2. 最大值是次大值的指定倍数
*/
OrderBookEntry QOrderProcessor::findMaxVolumeItemEx(const QVector<OrderBookEntry>& items, double volumeRatio) const
{
if (items.isEmpty())
return{};
// 特殊情况处理:只有一个元素时无法比较倍数
if (items.size() == 1) {
return volumeRatio <= 1.0 ? items[0] : OrderBookEntry{};
}
// 初始化最大值和次大值
const OrderBookEntry* maxItem = &items[0];
const OrderBookEntry* secondMaxItem = nullptr;
double secondMaxVolume = -1.0;
// 遍历查找最大值和次大值
for (int i = 1; i < items.size(); ++i) {
const OrderBookEntry& item = items[i];
if (item.volume > maxItem->volume) {
// 更新次大值为原最大值
secondMaxItem = maxItem;
secondMaxVolume = maxItem->volume;
// 更新最大值
maxItem = &item;
}
else if (!secondMaxItem || item.volume > secondMaxVolume) {
// 更新次大值
secondMaxItem = &item;
secondMaxVolume = item.volume;
}
}
// 检查是否满足倍数条件
constexpr double EPSILON = 1e-9;
if (secondMaxItem && maxItem->volume + EPSILON >= secondMaxVolume * volumeRatio) {
return *maxItem;
}
return OrderBookEntry{};
}
QVector<BigOrderInfo> QOrderProcessor::findMaxVolumeItem(const OrderBookData & data) const
{
bool findBigOrder = false;
BigOrderInfo bigOrderInfo;
QVector<BigOrderInfo> bigOrderInfoList;
float volume = m_replyCodeQuantity[data.code];
const OrderBookEntry* maxItem;
QVector<OrderBookEntry> items = data.asks;
for (int i = 0; i < items.size(); i++) {
// 不处理刚打开程序时,推送过来的摆盘数据
if (data.askTime == NULL)
continue;
// 同一个价格挡位的订单中股票数量大于,阈值才有可能是大单
if (volume < items[i].volume) {
for (int j = 0; j< items[i].details.size(); j++) // 超过list的最大数量就不会返回数据了
{
try {
if (volume < items[i].details[j].volume)
{
findBigOrder = true;
bigOrderInfo.nBigOrderType = 0;
bigOrderInfo.isBigOrder = findBigOrder;
bigOrderInfo.code = data.code;
bigOrderInfo.name = data.name;
bigOrderInfo.orderId = items[i].details[j].orderId;
bigOrderInfo.price = items[i].price;
bigOrderInfo.volume = items[i].details[j].volume;
bigOrderInfo.level = i + 1;
bigOrderInfo.svrRecvTime = data.askTime;
bigOrderInfoList.append(bigOrderInfo);
}
}
catch (const char* msg) {
qDebug() << "err findMax";
}
}
}
}
items = data.bids;
for (int i = 0; i < items.size(); i++) {
// 不处理刚打开程序时,推送过来的摆盘数据
if (data.bidTime == NULL)
continue;
// 同一个价格挡位的订单中股票数量大于,阈值才有可能是大单
if (volume < items[i].volume) {
for (int j = 0; j< items[i].details.size(); j++)
{
try {
if (volume < items[i].details[j].volume)
{
findBigOrder = true;
bigOrderInfo.nBigOrderType = 1;
bigOrderInfo.isBigOrder = findBigOrder;
bigOrderInfo.code = data.code;
bigOrderInfo.name = data.name;
bigOrderInfo.orderId = items[i].details[j].orderId;
bigOrderInfo.price = items[i].price;
bigOrderInfo.volume = items[i].details[j].volume;
bigOrderInfo.svrRecvTime = data.bidTime;
bigOrderInfoList.append(bigOrderInfo);
}
}
catch (const char* msg) {
qDebug() << "err findMax";
}
}
}
}
return bigOrderInfoList;
}
//
//// 1.简单粗暴,直接通过数量阈值进行筛选,遍历全部价格档位
//// 大单检测接口,返回 QVector<BigOrderInfo>, 暂时找到一个就返回
//QVector<BigOrderInfo> QOrderProcessor::findMaxVolumeItem(const OrderBookData& data) const;
//{
// if (items.isEmpty())
// return{};
// bool findBigOrder = false;
// QVector<BigOrderInfo> bigOrderInfoList;
//
// float volume = m_replyCodeQuantity[data.code];
// const OrderBookEntry* maxItem;
// for (int i = 0; i < items.size(); i++) {
// // 同一个价格挡位的订单中股票数量大于,阈值才有可能是大单
// if (volume < items[i].volume) {
// for (int j = 0; j< items[i].orderCount; j++)
// {
// try {
// if (items[i].details.size() == 0) continue;
// if (volume < items[i].details[j].volume)
// {
// findBigOrder = true;
//
// bigOrderInfo.isBigOrder = findBigOrder;
// bigOrderInfo.code = data.code;
// bigOrderInfo.name = data.name;
// bigOrderInfo.orderId = items[i].details[j].orderId;
// bigOrderInfo.price = items[i].price;
// bigOrderInfo.volume = items[i].details[j].volume;
// bigOrderInfo.svrRecvTime = data.askTime;
//
// return bigOrderInfo;
// }
// }
// catch (const char* msg) {
// qDebug() << "err findMax";
// }
// }
// }
// }
// 大单检测,分为两步:
// 1、判断股票数量是否达到大单的阈值未达到时直接返回
// 2、判断订单详情是否达到大单阈值
// 下一步增加摆盘数据统计,大单占全部摆盘数量的比例,计算出来一个置信度
// 使用指针遍历避免拷贝
// 这里是找到当前摆盘数据中,每一档价位数量最大的
//const OrderBookEntry* maxItem = &items[0];
//for (int i = 1; i < items.size(); ++i) {
// if (items[i].volume > maxItem->volume) {
// maxItem = &items[i];
// }
//}
// 添加业务规则验证(如果需要)
// if (!isValidMaxVolumeItem(*maxItem, items))
// return {};
// return bigOrderInfoList;
//}
OrderBookEntry QOrderProcessor::findMinPriceItem(const QVector<OrderBookEntry>& items) const
{
if (items.isEmpty()) return{};
auto it = std::min_element(items.begin(), items.end(),
[](const OrderBookEntry& a, const OrderBookEntry& b) {
return a.price < b.price;
});
return it != items.end() ? *it : OrderBookEntry{};
}
// 缓存相关方法 - 手动缓存实现
bool QOrderProcessor::getCachedResult(const QString& cacheKey, QVector<BigOrderInfo>& result) const
{
QMutexLocker lock(&m_cacheMutex);
if (m_orderCache.contains(cacheKey)) {
result = m_orderCache.value(cacheKey);
m_cacheHits++;
return true;
}
m_cacheMisses++;
return false;
}
void QOrderProcessor::cacheResult(const QString& cacheKey, const QVector<BigOrderInfo>& result) const
{
QMutexLocker lock(&m_cacheMutex);
if (!m_orderCache.contains(cacheKey)) {
// 如果缓存已满,移除最旧的条目
if (m_cacheKeys.size() >= m_cacheMaxSize) {
QString oldestKey = m_cacheKeys.takeFirst();
m_orderCache.remove(oldestKey);
}
m_orderCache.insert(cacheKey, result);
m_cacheKeys.append(cacheKey);
}
}
QVector<BigOrderInfo> QOrderProcessor::findExtremeOrders(const OrderBookData& data) const
{
// 生成缓存键
QString cacheKey = QString("%1_%2_%3").arg(data.code).arg(data.askTime).arg(data.bidTime);
// 尝试从缓存获取结果
QVector<BigOrderInfo> result;
if (getCachedResult(cacheKey, result)) {
return result;
}
// 使用现有的算法
result = findMaxVolumeItem(data);
// 缓存结果
cacheResult(cacheKey, result);
return result;
}
/*
// 设计一个单独的大单检测引擎
// 包括:历史数据分析、订单分析、成交记录分析
*/
void QOrderProcessor::internalProcess(const OrderBookData& orderData)
{
if (!m_enabled || orderData.isEmpty()) {
return;
}
try {
// 发布处理开始事件
QEventBus::instance()->publish(EventType::PROCESSING_STARTED,
QVariant::fromValue(orderData.code),
"QOrderProcessor");
// 如果JSON保存功能启用则在独立线程中保存orderData为JSON格式
if (m_jsonSaveEnabled) {
saveOrderDataAsJson(orderData);
}
const auto result = findExtremeOrders(orderData);
for (int i = 0; i < result.size(); i++)
{
if(result.at(i).isBigOrder) {
// 保持向后兼容,发送原有信号
emit maxOrderReady(result.at(i));
// 发布大单检测事件
QEventBus::instance()->publish(EventType::BIG_ORDER_DETECTED,
QVariant::fromValue(result.at(i)),
"QOrderProcessor");
}
}
// 发布处理完成事件
QEventBus::instance()->publish(EventType::PROCESSING_FINISHED,
QVariant::fromValue(orderData.code),
"QOrderProcessor");
}
catch (const std::exception& e) {
QString errorMsg = QString::fromUtf8(e.what());
emit errorOccurred(orderData.code, errorMsg);
// 发布错误事件
QEventBus::instance()->publish(EventType::SYSTEM_ERROR,
QVariant::fromValue(QString("Order processing error for %1: %2").arg(orderData.code).arg(errorMsg)),
"QOrderProcessor");
}
}
void QOrderProcessor::processOrderBook(const Qot_UpdateOrderBook::Response &stRsp)
{
QtConcurrent::run(&m_threadPool, [this, stRsp]() {
try {
OrderBookData orderBook;
parser.parse(stRsp.SerializeAsString().c_str(), stRsp.ByteSize(), orderBook);
internalProcess(orderBook);
}
catch (const std::exception& e) {
emit errorOccurred("UNKNOWN", QString::fromUtf8(e.what()));
}
});
}
// JSON序列化方法 - 将OrderBookData转换为QJsonObject
QJsonObject QOrderProcessor::orderBookDataToJson(const OrderBookData& data) const
{
QJsonObject jsonObj;
// 基本字段
jsonObj["code"] = data.code;
jsonObj["name"] = data.name;
jsonObj["askTime"] = data.askTime;
jsonObj["bidTime"] = data.bidTime;
jsonObj["timestamp"] = QDateTime::currentDateTime().toString(Qt::ISODate);
// 买盘数据 (bids)
QJsonArray bidsArray;
for (const auto& bid : data.bids) {
QJsonObject bidObj;
bidObj["price"] = bid.price;
bidObj["volume"] = bid.volume;
bidObj["orderCount"] = bid.orderCount;
bidObj["code"] = bid.code;
// 订单详情
QJsonArray detailsArray;
for (const auto& detail : bid.details) {
QJsonObject detailObj;
detailObj["orderId"] = QString::number(detail.orderId);
detailObj["volume"] = detail.volume;
detailsArray.append(detailObj);
}
bidObj["details"] = detailsArray;
bidsArray.append(bidObj);
}
jsonObj["bids"] = bidsArray;
// 卖盘数据 (asks)
QJsonArray asksArray;
for (const auto& ask : data.asks) {
QJsonObject askObj;
askObj["price"] = ask.price;
askObj["volume"] = ask.volume;
askObj["orderCount"] = ask.orderCount;
askObj["code"] = ask.code;
// 订单详情
QJsonArray detailsArray;
for (const auto& detail : ask.details) {
QJsonObject detailObj;
detailObj["orderId"] = QString::number(detail.orderId);
detailObj["volume"] = detail.volume;
detailsArray.append(detailObj);
}
askObj["details"] = detailsArray;
asksArray.append(askObj);
}
jsonObj["asks"] = asksArray;
return jsonObj;
}
// 保存orderData为JSON文件 - 在独立线程中执行
void QOrderProcessor::saveOrderDataAsJson(const OrderBookData& orderData)
{
// 使用独立的线程池执行JSON保存操作避免阻塞主处理线程
QtConcurrent::run(&m_jsonSaveThreadPool, [this, orderData]() {
try {
// 转换为JSON
QJsonObject jsonObj = orderBookDataToJson(orderData);
QJsonDocument doc(jsonObj);
QString jsonString = doc.toJson(QJsonDocument::Indented);
// 创建保存目录
QDir dir;
QString saveDir = "order_data_json";
if (!dir.exists(saveDir)) {
dir.mkpath(saveDir);
}
// 生成文件名(包含时间戳和股票代码)
QString timestamp = QDateTime::currentDateTime().toString("yyyyMMdd_hhmmss_zzz");
QString filename = QString("%1/%2_%3.json").arg(saveDir).arg(orderData.code).arg(timestamp);
// 保存文件
QFile file(filename);
if (file.open(QIODevice::WriteOnly | QIODevice::Text)) {
QTextStream stream(&file);
stream << jsonString;
file.close();
qDebug() << "Order data saved as JSON:" << filename;
} else {
qWarning() << "Failed to save order data JSON:" << filename;
}
}
catch (const std::exception& e) {
qWarning() << "Error saving order data as JSON:" << e.what();
}
});
}