#include "qorderprocessor.h" #include #include #include #include #include #include #include "qlogmanager.h" #include "qeventbus.h" QOrderProcessor::QOrderProcessor(QObject *parent) : QObject(parent) { int idealThreads = QThread::idealThreadCount(); m_threadPool.setMaxThreadCount(qBound(4, 6, 8)); m_jsonSaveThreadPool.setMaxThreadCount(2); setJsonSaveEnabled(true); } QOrderProcessor::~QOrderProcessor() { m_threadPool.waitForDone(); m_jsonSaveThreadPool.waitForDone(); } void QOrderProcessor::setProcessingEnabled(bool enabled) { QMutexLocker lock(&m_dataMutex); m_enabled = enabled; } void QOrderProcessor::setJsonSaveEnabled(bool enabled) { QMutexLocker lock(&m_dataMutex); m_jsonSaveEnabled = enabled; } double QOrderProcessor::sumQuantity(const QVector& items)const { return std::accumulate(items.begin(), items.end(), 0.0, [](double sum, const OrderBookEntry& item) { return sum + item.volume; }); } OrderBookEntry QOrderProcessor::findMaxVolumeItemEx(const QVector& items, double volumeRatio) const { if (items.isEmpty()) return{}; if (items.size() == 1) { return volumeRatio <= 1.0 ? items[0] : OrderBookEntry{}; } const OrderBookEntry* maxItem = &items[0]; const OrderBookEntry* secondMaxItem = nullptr; double secondMaxVolume = -1.0; for (int i = 1; i < items.size(); ++i) { const OrderBookEntry& item = items[i]; if (item.volume > maxItem->volume) { secondMaxItem = maxItem; secondMaxVolume = maxItem->volume; maxItem = &item; } else if (!secondMaxItem || item.volume > secondMaxVolume) { secondMaxItem = &item; secondMaxVolume = item.volume; } } constexpr double EPSILON = 1e-9; if (secondMaxItem && maxItem->volume + EPSILON >= secondMaxVolume * volumeRatio) { return *maxItem; } return OrderBookEntry{}; } QVector QOrderProcessor::findMaxVolumeItem(const OrderBookData & data) const { bool findBigOrder = false; BigOrderInfo bigOrderInfo; QVector bigOrderInfoList; float volume = m_replyCodeQuantity[data.code]; const OrderBookEntry* maxItem; QVector items = data.asks; for (int i = 0; i < items.size(); i++) { if (data.askTime == NULL) continue; if (volume < items[i].volume) { for (int j = 0; j< items[i].details.size(); j++) { try { if (volume < items[i].details[j].volume) { findBigOrder = true; bigOrderInfo.nBigOrderType = 0; bigOrderInfo.isBigOrder = findBigOrder; bigOrderInfo.code = data.code; bigOrderInfo.name = data.name; bigOrderInfo.orderId = items[i].details[j].orderId; bigOrderInfo.price = items[i].price; bigOrderInfo.volume = items[i].details[j].volume; bigOrderInfo.level = i + 1; bigOrderInfo.svrRecvTime = data.askTime; bigOrderInfoList.append(bigOrderInfo); } } catch (const char* msg) { qDebug() << "err findMax"; } } } } items = data.bids; for (int i = 0; i < items.size(); i++) { if (data.bidTime == NULL) continue; if (volume < items[i].volume) { for (int j = 0; j< items[i].details.size(); j++) { try { if (volume < items[i].details[j].volume) { findBigOrder = true; bigOrderInfo.nBigOrderType = 1; bigOrderInfo.isBigOrder = findBigOrder; bigOrderInfo.code = data.code; bigOrderInfo.name = data.name; bigOrderInfo.orderId = items[i].details[j].orderId; bigOrderInfo.price = items[i].price; bigOrderInfo.volume = items[i].details[j].volume; bigOrderInfo.svrRecvTime = data.bidTime; bigOrderInfoList.append(bigOrderInfo); } } catch (const char* msg) { qDebug() << "err findMax"; } } } } return bigOrderInfoList; } OrderBookEntry QOrderProcessor::findMinPriceItem(const QVector& items) const { if (items.isEmpty()) return{}; auto it = std::min_element(items.begin(), items.end(), [](const OrderBookEntry& a, const OrderBookEntry& b) { return a.price < b.price; }); return it != items.end() ? *it : OrderBookEntry{}; } bool QOrderProcessor::getCachedResult(const QString& cacheKey, QVector& result) const { QMutexLocker lock(&m_cacheMutex); if (m_orderCache.contains(cacheKey)) { result = m_orderCache.value(cacheKey); m_cacheHits++; return true; } m_cacheMisses++; return false; } void QOrderProcessor::cacheResult(const QString& cacheKey, const QVector& result) const { QMutexLocker lock(&m_cacheMutex); if (!m_orderCache.contains(cacheKey)) { if (m_cacheKeys.size() >= m_cacheMaxSize) { QString oldestKey = m_cacheKeys.takeFirst(); m_orderCache.remove(oldestKey); } m_orderCache.insert(cacheKey, result); m_cacheKeys.append(cacheKey); } } QVector QOrderProcessor::findExtremeOrders(const OrderBookData& data) const { QString cacheKey = QString("%1_%2_%3").arg(data.code).arg(data.askTime).arg(data.bidTime); QVector result; if (getCachedResult(cacheKey, result)) { return result; } result = findMaxVolumeItem(data); cacheResult(cacheKey, result); return result; } void QOrderProcessor::internalProcess(const OrderBookData& orderData) { if (!m_enabled || orderData.isEmpty()) { return; } try { QEventBus::instance()->publish(EventType::PROCESSING_STARTED, QVariant::fromValue(orderData.code), "QOrderProcessor"); if (m_jsonSaveEnabled) { saveOrderDataAsJson(orderData); } const auto result = findExtremeOrders(orderData); for (int i = 0; i < result.size(); i++) { if(result.at(i).isBigOrder) { emit maxOrderReady(result.at(i)); QEventBus::instance()->publish(EventType::BIG_ORDER_DETECTED, QVariant::fromValue(result.at(i)), "QOrderProcessor"); } } QEventBus::instance()->publish(EventType::PROCESSING_FINISHED, QVariant::fromValue(orderData.code), "QOrderProcessor"); } catch (const std::exception& e) { QString errorMsg = QString::fromUtf8(e.what()); emit errorOccurred(orderData.code, errorMsg); QEventBus::instance()->publish(EventType::SYSTEM_ERROR, QVariant::fromValue(QString("Order processing error for %1: %2").arg(orderData.code).arg(errorMsg)), "QOrderProcessor"); } } void QOrderProcessor::processOrderBook(const Qot_UpdateOrderBook::Response &stRsp) { QtConcurrent::run(&m_threadPool, [this, stRsp]() { try { OrderBookData orderBook; parser.parse(stRsp.SerializeAsString().c_str(), stRsp.ByteSize(), orderBook); internalProcess(orderBook); } catch (const std::exception& e) { emit errorOccurred("UNKNOWN", QString::fromUtf8(e.what())); } }); } QJsonObject QOrderProcessor::orderBookDataToJson(const OrderBookData& data) const { QJsonObject jsonObj; jsonObj["code"] = data.code; jsonObj["name"] = data.name; jsonObj["askTime"] = data.askTime; jsonObj["bidTime"] = data.bidTime; jsonObj["timestamp"] = QDateTime::currentDateTime().toString(Qt::ISODate); QJsonArray bidsArray; for (const auto& bid : data.bids) { QJsonObject bidObj; bidObj["price"] = bid.price; bidObj["volume"] = bid.volume; bidObj["orderCount"] = bid.orderCount; bidObj["code"] = bid.code; QJsonArray detailsArray; for (const auto& detail : bid.details) { QJsonObject detailObj; detailObj["orderId"] = QString::number(detail.orderId); detailObj["volume"] = detail.volume; detailsArray.append(detailObj); } bidObj["details"] = detailsArray; bidsArray.append(bidObj); } jsonObj["bids"] = bidsArray; QJsonArray asksArray; for (const auto& ask : data.asks) { QJsonObject askObj; askObj["price"] = ask.price; askObj["volume"] = ask.volume; askObj["orderCount"] = ask.orderCount; askObj["code"] = ask.code; QJsonArray detailsArray; for (const auto& detail : ask.details) { QJsonObject detailObj; detailObj["orderId"] = QString::number(detail.orderId); detailObj["volume"] = detail.volume; detailsArray.append(detailObj); } askObj["details"] = detailsArray; asksArray.append(askObj); } jsonObj["asks"] = asksArray; return jsonObj; } void QOrderProcessor::saveOrderDataAsJson(const OrderBookData& orderData) { QtConcurrent::run(&m_jsonSaveThreadPool, [this, orderData]() { try { QJsonObject jsonObj = orderBookDataToJson(orderData); QJsonDocument doc(jsonObj); QString jsonString = doc.toJson(QJsonDocument::Indented); QDir dir; QString saveDir = "order_data_json"; if (!dir.exists(saveDir)) { dir.mkpath(saveDir); } QString timestamp = QDateTime::currentDateTime().toString("yyyyMMdd_hhmmss_zzz"); QString filename = QString("%1/%2_%3.json").arg(saveDir).arg(orderData.code).arg(timestamp); QFile file(filename); if (file.open(QIODevice::WriteOnly | QIODevice::Text)) { QTextStream stream(&file); stream << jsonString; file.close(); qDebug() << "Order data saved as JSON:" << filename; } else { qWarning() << "Failed to save order data JSON:" << filename; } } catch (const std::exception& e) { qWarning() << "Error saving order data as JSON:" << e.what(); } }); }