#ifndef DATA_BUFFER_H #define DATA_BUFFER_H #include #include #include #include #include #include #include #include #include "../common_structures/TradingStructures.h" struct FlowStatistics { qint64 totalPackets = 0; qint64 droppedPackets = 0; qint64 processedPackets = 0; double packetsPerSecond = 0.0; double dropRate = 0.0; QDateTime lastUpdate; void updateStats(qint64 total, qint64 dropped, qint64 processed) { totalPackets = total; droppedPackets = dropped; processedPackets = processed; dropRate = total > 0 ? static_cast(dropped) / total : 0.0; lastUpdate = QDateTime::currentDateTime(); } }; class DataBuffer : public QObject { Q_OBJECT public: static DataBuffer* instance(); void setBufferSize(int maxSize); void setFlowControlEnabled(bool enabled); void setMaxPacketsPerSecond(int maxRate); bool enqueue(const Trading::DataPacket& packet); bool enqueueHighPriority(const Trading::DataPacket& packet); Trading::DataPacket dequeue(int timeoutMs = 1000); QList dequeueBatch(int maxCount, int timeoutMs = 1000); bool isEmpty() const; int size() const; int getQueueSize(Trading::DataPacketType type) const; FlowStatistics getStatistics() const; void pause(); void resume(); bool isPaused() const; void clear(); void clearByType(Trading::DataPacketType type); void clearByStock(const QString& stockCode); signals: void bufferOverflow(const QString& stockCode, Trading::DataPacketType type); void flowControlActivated(bool activated); void statisticsUpdated(const FlowStatistics& stats); void packetDropped(const Trading::DataPacket& packet, const QString& reason); private: explicit DataBuffer(QObject *parent = nullptr); ~DataBuffer(); void updateStatistics(); bool shouldDropPacket(const Trading::DataPacket& packet) const; void cleanupExpiredPackets(); QQueue m_buffer; QQueue m_highPriorityBuffer; mutable QMutex m_mutex; QWaitCondition m_notEmpty; QWaitCondition m_notFull; int m_maxBufferSize = 10000; int m_maxPacketsPerSecond = 1000; bool m_flowControlEnabled = true; bool m_paused = false; FlowStatistics m_stats; QElapsedTimer m_rateTimer; qint64 m_currentSecondPackets = 0; QTimer* m_cleanupTimer; QTimer* m_statsUpdateTimer; }; #endif