#include "qdatabuffer.h" #include #include #include QDataBuffer* QDataBuffer::instance() { static QDataBuffer instance; return &instance; } QDataBuffer::QDataBuffer(QObject *parent) : QObject(parent) , m_cleanupTimer(new QTimer(this)) , m_statsUpdateTimer(new QTimer(this)) { // 设置清理定时器,每30秒清理过期数据包 m_cleanupTimer->setInterval(30000); connect(m_cleanupTimer, &QTimer::timeout, this, &QDataBuffer::cleanupExpiredPackets); m_cleanupTimer->start(); // 设置统计更新定时器,每秒更新一次 m_statsUpdateTimer->setInterval(1000); connect(m_statsUpdateTimer, &QTimer::timeout, this, &QDataBuffer::updateStatistics); m_statsUpdateTimer->start(); m_rateTimer.start(); } QDataBuffer::~QDataBuffer() { m_cleanupTimer->stop(); m_statsUpdateTimer->stop(); } void QDataBuffer::setBufferSize(int maxSize) { QMutexLocker locker(&m_mutex); m_maxBufferSize = maxSize; } void QDataBuffer::setFlowControlEnabled(bool enabled) { QMutexLocker locker(&m_mutex); m_flowControlEnabled = enabled; } void QDataBuffer::setMaxPacketsPerSecond(int maxRate) { QMutexLocker locker(&m_mutex); m_maxPacketsPerSecond = maxRate; } bool QDataBuffer::enqueue(const DataPacket& packet) { QMutexLocker locker(&m_mutex); if (m_paused) { emit packetDropped(packet, "Buffer paused"); return false; } if (shouldDropPacket(packet)) { m_stats.droppedPackets++; emit packetDropped(packet, "Flow control"); return false; } // 检查缓冲区是否已满 if (m_buffer.size() + m_highPriorityBuffer.size() >= m_maxBufferSize) { m_stats.droppedPackets++; emit bufferOverflow(packet.stockCode, packet.type); emit packetDropped(packet, "Buffer overflow"); return false; } m_buffer.enqueue(packet); m_stats.totalPackets++; m_currentSecondPackets++; // 通知等待的消费者 m_notEmpty.wakeOne(); return true; } bool QDataBuffer::enqueueHighPriority(const DataPacket& packet) { QMutexLocker locker(&m_mutex); if (m_paused) { emit packetDropped(packet, "Buffer paused"); return false; } // 高优先级包不受流量控制限制 if (m_highPriorityBuffer.size() + m_buffer.size() >= m_maxBufferSize) { m_stats.droppedPackets++; emit bufferOverflow(packet.stockCode, packet.type); emit packetDropped(packet, "Buffer overflow"); return false; } m_highPriorityBuffer.enqueue(packet); m_stats.totalPackets++; m_currentSecondPackets++; // 通知等待的消费者 m_notEmpty.wakeOne(); return true; } DataPacket QDataBuffer::dequeue(int timeoutMs) { QMutexLocker locker(&m_mutex); // 等待数据可用 while (m_highPriorityBuffer.isEmpty() && m_buffer.isEmpty() && !m_paused) { if (!m_notEmpty.wait(&m_mutex, timeoutMs)) { // 超时返回空包 return DataPacket(DataPacketType::Unknown, "", QVariant()); } } // 优先处理高优先级缓冲区 if (!m_highPriorityBuffer.isEmpty()) { DataPacket packet = m_highPriorityBuffer.dequeue(); m_stats.processedPackets++; return packet; } if (!m_buffer.isEmpty()) { DataPacket packet = m_buffer.dequeue(); m_stats.processedPackets++; return packet; } // 如果暂停状态,返回空包 return DataPacket(DataPacketType::Unknown, "", QVariant()); } QList QDataBuffer::dequeueBatch(int maxCount, int timeoutMs) { QMutexLocker locker(&m_mutex); QList result; // 等待数据可用 while (m_highPriorityBuffer.isEmpty() && m_buffer.isEmpty() && !m_paused) { if (!m_notEmpty.wait(&m_mutex, timeoutMs)) { return result; // 超时返回空列表 } } // 优先处理高优先级缓冲区 while (!m_highPriorityBuffer.isEmpty() && result.size() < maxCount) { result.append(m_highPriorityBuffer.dequeue()); m_stats.processedPackets++; } // 然后处理普通缓冲区 while (!m_buffer.isEmpty() && result.size() < maxCount) { result.append(m_buffer.dequeue()); m_stats.processedPackets++; } return result; } bool QDataBuffer::isEmpty() const { QMutexLocker locker(&m_mutex); return m_highPriorityBuffer.isEmpty() && m_buffer.isEmpty(); } int QDataBuffer::size() const { QMutexLocker locker(&m_mutex); return m_highPriorityBuffer.size() + m_buffer.size(); } int QDataBuffer::getQueueSize(DataPacketType type) const { QMutexLocker locker(&m_mutex); int count = 0; // 统计高优先级缓冲区 for (const DataPacket& packet : m_highPriorityBuffer) { if (packet.type == type) { count++; } } // 统计普通缓冲区 for (const DataPacket& packet : m_buffer) { if (packet.type == type) { count++; } } return count; } FlowStatistics QDataBuffer::getStatistics() const { QMutexLocker locker(&m_mutex); return m_stats; } void QDataBuffer::pause() { QMutexLocker locker(&m_mutex); m_paused = true; emit flowControlActivated(true); } void QDataBuffer::resume() { QMutexLocker locker(&m_mutex); m_paused = false; m_notEmpty.wakeAll(); // 唤醒所有等待的消费者 emit flowControlActivated(false); } bool QDataBuffer::isPaused() const { QMutexLocker locker(&m_mutex); return m_paused; } void QDataBuffer::clear() { QMutexLocker locker(&m_mutex); m_highPriorityBuffer.clear(); m_buffer.clear(); } void QDataBuffer::clearByType(DataPacketType type) { QMutexLocker locker(&m_mutex); // 清理高优先级缓冲区 QQueue newHighPriority; for (const DataPacket& packet : m_highPriorityBuffer) { if (packet.type != type) { newHighPriority.enqueue(packet); } } m_highPriorityBuffer = newHighPriority; // 清理普通缓冲区 QQueue newBuffer; for (const DataPacket& packet : m_buffer) { if (packet.type != type) { newBuffer.enqueue(packet); } } m_buffer = newBuffer; } void QDataBuffer::clearByStock(const QString& stockCode) { QMutexLocker locker(&m_mutex); // 清理高优先级缓冲区 QQueue newHighPriority; for (const DataPacket& packet : m_highPriorityBuffer) { if (packet.stockCode != stockCode) { newHighPriority.enqueue(packet); } } m_highPriorityBuffer = newHighPriority; // 清理普通缓冲区 QQueue newBuffer; for (const DataPacket& packet : m_buffer) { if (packet.stockCode != stockCode) { newBuffer.enqueue(packet); } } m_buffer = newBuffer; } void QDataBuffer::updateStatistics() { QMutexLocker locker(&m_mutex); // 计算每秒包数 if (m_rateTimer.elapsed() >= 1000) { m_stats.packetsPerSecond = static_cast(m_currentSecondPackets) * 1000 / m_rateTimer.elapsed(); m_currentSecondPackets = 0; m_rateTimer.restart(); } // 更新统计信息 m_stats.updateStats(m_stats.totalPackets, m_stats.droppedPackets, m_stats.processedPackets); locker.unlock(); emit statisticsUpdated(m_stats); } bool QDataBuffer::shouldDropPacket(const DataPacket& packet) const { if (!m_flowControlEnabled) { return false; } // 检查速率限制 if (m_currentSecondPackets >= m_maxPacketsPerSecond) { return true; } // 可以根据包类型、股票代码等实现更复杂的丢弃策略 // 例如:丢弃低优先级的历史数据包等 return false; } void QDataBuffer::cleanupExpiredPackets() { QMutexLocker locker(&m_mutex); QDateTime now = QDateTime::currentDateTime(); // 清理过期的高优先级包(超过5分钟) QQueue newHighPriority; for (const DataPacket& packet : m_highPriorityBuffer) { if (packet.timestamp.secsTo(now) < 300) { // 5分钟 newHighPriority.enqueue(packet); } } m_highPriorityBuffer = newHighPriority; // 清理过期的普通包(超过2分钟) QQueue newBuffer; for (const DataPacket& packet : m_buffer) { if (packet.timestamp.secsTo(now) < 120) { // 2分钟 newBuffer.enqueue(packet); } } m_buffer = newBuffer; }