#include "qorderprocessor.h" #include #include #include #include #include #include #include "qlogmanager.h" #include "qeventbus.h" QOrderProcessor::QOrderProcessor(QObject *parent) : QObject(parent) { int idealThreads = QThread::idealThreadCount(); //m_threadPool.setMaxThreadCount(qBound(4, idealThreads, 16)); m_threadPool.setMaxThreadCount(qBound(4, 6, 8)); // 初始化JSON保存线程池 m_jsonSaveThreadPool.setMaxThreadCount(2); // 限制JSON保存线程数量,避免过多IO操作 // 存储盘口数据 临时使用 2025.11.10 setJsonSaveEnabled(true); } QOrderProcessor::~QOrderProcessor() { m_threadPool.waitForDone(); m_jsonSaveThreadPool.waitForDone(); } void QOrderProcessor::setProcessingEnabled(bool enabled) { QMutexLocker lock(&m_dataMutex); m_enabled = enabled; } void QOrderProcessor::setJsonSaveEnabled(bool enabled) { QMutexLocker lock(&m_dataMutex); m_jsonSaveEnabled = enabled; } double QOrderProcessor::sumQuantity(const QVector& items)const { return std::accumulate(items.begin(), items.end(), 0.0, [](double sum, const OrderBookEntry& item) { return sum + item.volume; }); } /* 1. 找到列表中的最大值和次大值 2. 最大值是次大值的指定倍数 */ OrderBookEntry QOrderProcessor::findMaxVolumeItemEx(const QVector& items, double volumeRatio) const { if (items.isEmpty()) return{}; // 特殊情况处理:只有一个元素时无法比较倍数 if (items.size() == 1) { return volumeRatio <= 1.0 ? items[0] : OrderBookEntry{}; } // 初始化最大值和次大值 const OrderBookEntry* maxItem = &items[0]; const OrderBookEntry* secondMaxItem = nullptr; double secondMaxVolume = -1.0; // 遍历查找最大值和次大值 for (int i = 1; i < items.size(); ++i) { const OrderBookEntry& item = items[i]; if (item.volume > maxItem->volume) { // 更新次大值为原最大值 secondMaxItem = maxItem; secondMaxVolume = maxItem->volume; // 更新最大值 maxItem = &item; } else if (!secondMaxItem || item.volume > secondMaxVolume) { // 更新次大值 secondMaxItem = &item; secondMaxVolume = item.volume; } } // 检查是否满足倍数条件 constexpr double EPSILON = 1e-9; if (secondMaxItem && maxItem->volume + EPSILON >= secondMaxVolume * volumeRatio) { return *maxItem; } return OrderBookEntry{}; } QVector QOrderProcessor::findMaxVolumeItem(const OrderBookData & data) const { bool findBigOrder = false; BigOrderInfo bigOrderInfo; QVector bigOrderInfoList; float volume = m_replyCodeQuantity[data.code]; const OrderBookEntry* maxItem; QVector items = data.asks; for (int i = 0; i < items.size(); i++) { // 不处理刚打开程序时,推送过来的摆盘数据 if (data.askTime == NULL) continue; // 同一个价格挡位的订单中股票数量大于,阈值才有可能是大单 if (volume < items[i].volume) { for (int j = 0; j< items[i].details.size(); j++) // 超过list的最大数量,就不会返回数据了 { try { if (volume < items[i].details[j].volume) { findBigOrder = true; bigOrderInfo.nBigOrderType = 0; bigOrderInfo.isBigOrder = findBigOrder; bigOrderInfo.code = data.code; bigOrderInfo.name = data.name; bigOrderInfo.orderId = items[i].details[j].orderId; bigOrderInfo.price = items[i].price; bigOrderInfo.volume = items[i].details[j].volume; bigOrderInfo.level = i + 1; bigOrderInfo.svrRecvTime = data.askTime; bigOrderInfoList.append(bigOrderInfo); } } catch (const char* msg) { qDebug() << "err findMax"; } } } } items = data.bids; for (int i = 0; i < items.size(); i++) { // 不处理刚打开程序时,推送过来的摆盘数据 if (data.bidTime == NULL) continue; // 同一个价格挡位的订单中股票数量大于,阈值才有可能是大单 if (volume < items[i].volume) { for (int j = 0; j< items[i].details.size(); j++) { try { if (volume < items[i].details[j].volume) { findBigOrder = true; bigOrderInfo.nBigOrderType = 1; bigOrderInfo.isBigOrder = findBigOrder; bigOrderInfo.code = data.code; bigOrderInfo.name = data.name; bigOrderInfo.orderId = items[i].details[j].orderId; bigOrderInfo.price = items[i].price; bigOrderInfo.volume = items[i].details[j].volume; bigOrderInfo.svrRecvTime = data.bidTime; bigOrderInfoList.append(bigOrderInfo); } } catch (const char* msg) { qDebug() << "err findMax"; } } } } return bigOrderInfoList; } // //// 1.简单粗暴,直接通过数量阈值进行筛选,遍历全部价格档位 //// 大单检测接口,返回 QVector, 暂时找到一个就返回 //QVector QOrderProcessor::findMaxVolumeItem(const OrderBookData& data) const; //{ // if (items.isEmpty()) // return{}; // bool findBigOrder = false; // QVector bigOrderInfoList; // // float volume = m_replyCodeQuantity[data.code]; // const OrderBookEntry* maxItem; // for (int i = 0; i < items.size(); i++) { // // 同一个价格挡位的订单中股票数量大于,阈值才有可能是大单 // if (volume < items[i].volume) { // for (int j = 0; j< items[i].orderCount; j++) // { // try { // if (items[i].details.size() == 0) continue; // if (volume < items[i].details[j].volume) // { // findBigOrder = true; // // bigOrderInfo.isBigOrder = findBigOrder; // bigOrderInfo.code = data.code; // bigOrderInfo.name = data.name; // bigOrderInfo.orderId = items[i].details[j].orderId; // bigOrderInfo.price = items[i].price; // bigOrderInfo.volume = items[i].details[j].volume; // bigOrderInfo.svrRecvTime = data.askTime; // // return bigOrderInfo; // } // } // catch (const char* msg) { // qDebug() << "err findMax"; // } // } // } // } // 大单检测,分为两步: // 1、判断股票数量是否达到大单的阈值,未达到时直接返回 // 2、判断订单详情是否达到大单阈值 // 下一步增加摆盘数据统计,大单占全部摆盘数量的比例,计算出来一个置信度 // 使用指针遍历避免拷贝 // 这里是找到当前摆盘数据中,每一档价位数量最大的 //const OrderBookEntry* maxItem = &items[0]; //for (int i = 1; i < items.size(); ++i) { // if (items[i].volume > maxItem->volume) { // maxItem = &items[i]; // } //} // 添加业务规则验证(如果需要) // if (!isValidMaxVolumeItem(*maxItem, items)) // return {}; // return bigOrderInfoList; //} OrderBookEntry QOrderProcessor::findMinPriceItem(const QVector& items) const { if (items.isEmpty()) return{}; auto it = std::min_element(items.begin(), items.end(), [](const OrderBookEntry& a, const OrderBookEntry& b) { return a.price < b.price; }); return it != items.end() ? *it : OrderBookEntry{}; } // 缓存相关方法 - 手动缓存实现 bool QOrderProcessor::getCachedResult(const QString& cacheKey, QVector& result) const { QMutexLocker lock(&m_cacheMutex); if (m_orderCache.contains(cacheKey)) { result = m_orderCache.value(cacheKey); m_cacheHits++; return true; } m_cacheMisses++; return false; } void QOrderProcessor::cacheResult(const QString& cacheKey, const QVector& result) const { QMutexLocker lock(&m_cacheMutex); if (!m_orderCache.contains(cacheKey)) { // 如果缓存已满,移除最旧的条目 if (m_cacheKeys.size() >= m_cacheMaxSize) { QString oldestKey = m_cacheKeys.takeFirst(); m_orderCache.remove(oldestKey); } m_orderCache.insert(cacheKey, result); m_cacheKeys.append(cacheKey); } } QVector QOrderProcessor::findExtremeOrders(const OrderBookData& data) const { // 生成缓存键 QString cacheKey = QString("%1_%2_%3").arg(data.code).arg(data.askTime).arg(data.bidTime); // 尝试从缓存获取结果 QVector result; if (getCachedResult(cacheKey, result)) { return result; } // 使用现有的算法 result = findMaxVolumeItem(data); // 缓存结果 cacheResult(cacheKey, result); return result; } /* // 设计一个单独的大单检测引擎 // 包括:历史数据分析、订单分析、成交记录分析 */ void QOrderProcessor::internalProcess(const OrderBookData& orderData) { if (!m_enabled || orderData.isEmpty()) { return; } try { // 发布处理开始事件 QEventBus::instance()->publish(EventType::PROCESSING_STARTED, QVariant::fromValue(orderData.code), "QOrderProcessor"); // 如果JSON保存功能启用,则在独立线程中保存orderData为JSON格式 if (m_jsonSaveEnabled) { saveOrderDataAsJson(orderData); } const auto result = findExtremeOrders(orderData); for (int i = 0; i < result.size(); i++) { if(result.at(i).isBigOrder) { // 保持向后兼容,发送原有信号 emit maxOrderReady(result.at(i)); // 发布大单检测事件 QEventBus::instance()->publish(EventType::BIG_ORDER_DETECTED, QVariant::fromValue(result.at(i)), "QOrderProcessor"); } } // 发布处理完成事件 QEventBus::instance()->publish(EventType::PROCESSING_FINISHED, QVariant::fromValue(orderData.code), "QOrderProcessor"); } catch (const std::exception& e) { QString errorMsg = QString::fromUtf8(e.what()); emit errorOccurred(orderData.code, errorMsg); // 发布错误事件 QEventBus::instance()->publish(EventType::SYSTEM_ERROR, QVariant::fromValue(QString("Order processing error for %1: %2").arg(orderData.code).arg(errorMsg)), "QOrderProcessor"); } } void QOrderProcessor::processOrderBook(const Qot_UpdateOrderBook::Response &stRsp) { QtConcurrent::run(&m_threadPool, [this, stRsp]() { try { OrderBookData orderBook; parser.parse(stRsp.SerializeAsString().c_str(), stRsp.ByteSize(), orderBook); internalProcess(orderBook); } catch (const std::exception& e) { emit errorOccurred("UNKNOWN", QString::fromUtf8(e.what())); } }); } // JSON序列化方法 - 将OrderBookData转换为QJsonObject QJsonObject QOrderProcessor::orderBookDataToJson(const OrderBookData& data) const { QJsonObject jsonObj; // 基本字段 jsonObj["code"] = data.code; jsonObj["name"] = data.name; jsonObj["askTime"] = data.askTime; jsonObj["bidTime"] = data.bidTime; jsonObj["timestamp"] = QDateTime::currentDateTime().toString(Qt::ISODate); // 买盘数据 (bids) QJsonArray bidsArray; for (const auto& bid : data.bids) { QJsonObject bidObj; bidObj["price"] = bid.price; bidObj["volume"] = bid.volume; bidObj["orderCount"] = bid.orderCount; bidObj["code"] = bid.code; // 订单详情 QJsonArray detailsArray; for (const auto& detail : bid.details) { QJsonObject detailObj; detailObj["orderId"] = QString::number(detail.orderId); detailObj["volume"] = detail.volume; detailsArray.append(detailObj); } bidObj["details"] = detailsArray; bidsArray.append(bidObj); } jsonObj["bids"] = bidsArray; // 卖盘数据 (asks) QJsonArray asksArray; for (const auto& ask : data.asks) { QJsonObject askObj; askObj["price"] = ask.price; askObj["volume"] = ask.volume; askObj["orderCount"] = ask.orderCount; askObj["code"] = ask.code; // 订单详情 QJsonArray detailsArray; for (const auto& detail : ask.details) { QJsonObject detailObj; detailObj["orderId"] = QString::number(detail.orderId); detailObj["volume"] = detail.volume; detailsArray.append(detailObj); } askObj["details"] = detailsArray; asksArray.append(askObj); } jsonObj["asks"] = asksArray; return jsonObj; } // 保存orderData为JSON文件 - 在独立线程中执行 void QOrderProcessor::saveOrderDataAsJson(const OrderBookData& orderData) { // 使用独立的线程池执行JSON保存操作,避免阻塞主处理线程 QtConcurrent::run(&m_jsonSaveThreadPool, [this, orderData]() { try { // 转换为JSON QJsonObject jsonObj = orderBookDataToJson(orderData); QJsonDocument doc(jsonObj); QString jsonString = doc.toJson(QJsonDocument::Indented); // 创建保存目录 QDir dir; QString saveDir = "order_data_json"; if (!dir.exists(saveDir)) { dir.mkpath(saveDir); } // 生成文件名(包含时间戳和股票代码) QString timestamp = QDateTime::currentDateTime().toString("yyyyMMdd_hhmmss_zzz"); QString filename = QString("%1/%2_%3.json").arg(saveDir).arg(orderData.code).arg(timestamp); // 保存文件 QFile file(filename); if (file.open(QIODevice::WriteOnly | QIODevice::Text)) { QTextStream stream(&file); stream << jsonString; file.close(); qDebug() << "Order data saved as JSON:" << filename; } else { qWarning() << "Failed to save order data JSON:" << filename; } } catch (const std::exception& e) { qWarning() << "Error saving order data as JSON:" << e.what(); } }); }