343 lines
8.5 KiB
C++
343 lines
8.5 KiB
C++
#include "qdatabuffer.h"
|
||
#include <QDebug>
|
||
#include <QThread>
|
||
#include <QDateTime>
|
||
|
||
QDataBuffer* QDataBuffer::instance()
|
||
{
|
||
static QDataBuffer instance;
|
||
return &instance;
|
||
}
|
||
|
||
QDataBuffer::QDataBuffer(QObject *parent)
|
||
: QObject(parent)
|
||
, m_cleanupTimer(new QTimer(this))
|
||
, m_statsUpdateTimer(new QTimer(this))
|
||
{
|
||
// 设置清理定时器,每30秒清理过期数据包
|
||
m_cleanupTimer->setInterval(30000);
|
||
connect(m_cleanupTimer, &QTimer::timeout, this, &QDataBuffer::cleanupExpiredPackets);
|
||
m_cleanupTimer->start();
|
||
|
||
// 设置统计更新定时器,每秒更新一次
|
||
m_statsUpdateTimer->setInterval(1000);
|
||
connect(m_statsUpdateTimer, &QTimer::timeout, this, &QDataBuffer::updateStatistics);
|
||
m_statsUpdateTimer->start();
|
||
|
||
m_rateTimer.start();
|
||
}
|
||
|
||
QDataBuffer::~QDataBuffer()
|
||
{
|
||
m_cleanupTimer->stop();
|
||
m_statsUpdateTimer->stop();
|
||
}
|
||
|
||
void QDataBuffer::setBufferSize(int maxSize)
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
m_maxBufferSize = maxSize;
|
||
}
|
||
|
||
void QDataBuffer::setFlowControlEnabled(bool enabled)
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
m_flowControlEnabled = enabled;
|
||
}
|
||
|
||
void QDataBuffer::setMaxPacketsPerSecond(int maxRate)
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
m_maxPacketsPerSecond = maxRate;
|
||
}
|
||
|
||
bool QDataBuffer::enqueue(const DataPacket& packet)
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
|
||
if (m_paused) {
|
||
emit packetDropped(packet, "Buffer paused");
|
||
return false;
|
||
}
|
||
|
||
if (shouldDropPacket(packet)) {
|
||
m_stats.droppedPackets++;
|
||
emit packetDropped(packet, "Flow control");
|
||
return false;
|
||
}
|
||
|
||
// 检查缓冲区是否已满
|
||
if (m_buffer.size() + m_highPriorityBuffer.size() >= m_maxBufferSize) {
|
||
m_stats.droppedPackets++;
|
||
emit bufferOverflow(packet.stockCode, packet.type);
|
||
emit packetDropped(packet, "Buffer overflow");
|
||
return false;
|
||
}
|
||
|
||
m_buffer.enqueue(packet);
|
||
m_stats.totalPackets++;
|
||
m_currentSecondPackets++;
|
||
|
||
// 通知等待的消费者
|
||
m_notEmpty.wakeOne();
|
||
|
||
return true;
|
||
}
|
||
|
||
bool QDataBuffer::enqueueHighPriority(const DataPacket& packet)
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
|
||
if (m_paused) {
|
||
emit packetDropped(packet, "Buffer paused");
|
||
return false;
|
||
}
|
||
|
||
// 高优先级包不受流量控制限制
|
||
if (m_highPriorityBuffer.size() + m_buffer.size() >= m_maxBufferSize) {
|
||
m_stats.droppedPackets++;
|
||
emit bufferOverflow(packet.stockCode, packet.type);
|
||
emit packetDropped(packet, "Buffer overflow");
|
||
return false;
|
||
}
|
||
|
||
m_highPriorityBuffer.enqueue(packet);
|
||
m_stats.totalPackets++;
|
||
m_currentSecondPackets++;
|
||
|
||
// 通知等待的消费者
|
||
m_notEmpty.wakeOne();
|
||
|
||
return true;
|
||
}
|
||
|
||
DataPacket QDataBuffer::dequeue(int timeoutMs)
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
|
||
// 等待数据可用
|
||
while (m_highPriorityBuffer.isEmpty() && m_buffer.isEmpty() && !m_paused) {
|
||
if (!m_notEmpty.wait(&m_mutex, timeoutMs)) {
|
||
// 超时返回空包
|
||
return DataPacket(DataPacketType::Unknown, "", QVariant());
|
||
}
|
||
}
|
||
|
||
// 优先处理高优先级缓冲区
|
||
if (!m_highPriorityBuffer.isEmpty()) {
|
||
DataPacket packet = m_highPriorityBuffer.dequeue();
|
||
m_stats.processedPackets++;
|
||
return packet;
|
||
}
|
||
|
||
if (!m_buffer.isEmpty()) {
|
||
DataPacket packet = m_buffer.dequeue();
|
||
m_stats.processedPackets++;
|
||
return packet;
|
||
}
|
||
|
||
// 如果暂停状态,返回空包
|
||
return DataPacket(DataPacketType::Unknown, "", QVariant());
|
||
}
|
||
|
||
QList<DataPacket> QDataBuffer::dequeueBatch(int maxCount, int timeoutMs)
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
QList<DataPacket> result;
|
||
|
||
// 等待数据可用
|
||
while (m_highPriorityBuffer.isEmpty() && m_buffer.isEmpty() && !m_paused) {
|
||
if (!m_notEmpty.wait(&m_mutex, timeoutMs)) {
|
||
return result; // 超时返回空列表
|
||
}
|
||
}
|
||
|
||
// 优先处理高优先级缓冲区
|
||
while (!m_highPriorityBuffer.isEmpty() && result.size() < maxCount) {
|
||
result.append(m_highPriorityBuffer.dequeue());
|
||
m_stats.processedPackets++;
|
||
}
|
||
|
||
// 然后处理普通缓冲区
|
||
while (!m_buffer.isEmpty() && result.size() < maxCount) {
|
||
result.append(m_buffer.dequeue());
|
||
m_stats.processedPackets++;
|
||
}
|
||
|
||
return result;
|
||
}
|
||
|
||
bool QDataBuffer::isEmpty() const
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
return m_highPriorityBuffer.isEmpty() && m_buffer.isEmpty();
|
||
}
|
||
|
||
int QDataBuffer::size() const
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
return m_highPriorityBuffer.size() + m_buffer.size();
|
||
}
|
||
|
||
int QDataBuffer::getQueueSize(DataPacketType type) const
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
int count = 0;
|
||
|
||
// 统计高优先级缓冲区
|
||
for (const DataPacket& packet : m_highPriorityBuffer) {
|
||
if (packet.type == type) {
|
||
count++;
|
||
}
|
||
}
|
||
|
||
// 统计普通缓冲区
|
||
for (const DataPacket& packet : m_buffer) {
|
||
if (packet.type == type) {
|
||
count++;
|
||
}
|
||
}
|
||
|
||
return count;
|
||
}
|
||
|
||
FlowStatistics QDataBuffer::getStatistics() const
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
return m_stats;
|
||
}
|
||
|
||
void QDataBuffer::pause()
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
m_paused = true;
|
||
emit flowControlActivated(true);
|
||
}
|
||
|
||
void QDataBuffer::resume()
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
m_paused = false;
|
||
m_notEmpty.wakeAll(); // 唤醒所有等待的消费者
|
||
emit flowControlActivated(false);
|
||
}
|
||
|
||
bool QDataBuffer::isPaused() const
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
return m_paused;
|
||
}
|
||
|
||
void QDataBuffer::clear()
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
m_highPriorityBuffer.clear();
|
||
m_buffer.clear();
|
||
}
|
||
|
||
void QDataBuffer::clearByType(DataPacketType type)
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
|
||
// 清理高优先级缓冲区
|
||
QQueue<DataPacket> newHighPriority;
|
||
for (const DataPacket& packet : m_highPriorityBuffer) {
|
||
if (packet.type != type) {
|
||
newHighPriority.enqueue(packet);
|
||
}
|
||
}
|
||
m_highPriorityBuffer = newHighPriority;
|
||
|
||
// 清理普通缓冲区
|
||
QQueue<DataPacket> newBuffer;
|
||
for (const DataPacket& packet : m_buffer) {
|
||
if (packet.type != type) {
|
||
newBuffer.enqueue(packet);
|
||
}
|
||
}
|
||
m_buffer = newBuffer;
|
||
}
|
||
|
||
void QDataBuffer::clearByStock(const QString& stockCode)
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
|
||
// 清理高优先级缓冲区
|
||
QQueue<DataPacket> newHighPriority;
|
||
for (const DataPacket& packet : m_highPriorityBuffer) {
|
||
if (packet.stockCode != stockCode) {
|
||
newHighPriority.enqueue(packet);
|
||
}
|
||
}
|
||
m_highPriorityBuffer = newHighPriority;
|
||
|
||
// 清理普通缓冲区
|
||
QQueue<DataPacket> newBuffer;
|
||
for (const DataPacket& packet : m_buffer) {
|
||
if (packet.stockCode != stockCode) {
|
||
newBuffer.enqueue(packet);
|
||
}
|
||
}
|
||
m_buffer = newBuffer;
|
||
}
|
||
|
||
void QDataBuffer::updateStatistics()
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
|
||
// 计算每秒包数
|
||
if (m_rateTimer.elapsed() >= 1000) {
|
||
m_stats.packetsPerSecond = static_cast<double>(m_currentSecondPackets) * 1000 / m_rateTimer.elapsed();
|
||
m_currentSecondPackets = 0;
|
||
m_rateTimer.restart();
|
||
}
|
||
|
||
// 更新统计信息
|
||
m_stats.updateStats(m_stats.totalPackets, m_stats.droppedPackets, m_stats.processedPackets);
|
||
|
||
locker.unlock();
|
||
|
||
emit statisticsUpdated(m_stats);
|
||
}
|
||
|
||
bool QDataBuffer::shouldDropPacket(const DataPacket& packet) const
|
||
{
|
||
if (!m_flowControlEnabled) {
|
||
return false;
|
||
}
|
||
|
||
// 检查速率限制
|
||
if (m_currentSecondPackets >= m_maxPacketsPerSecond) {
|
||
return true;
|
||
}
|
||
|
||
// 可以根据包类型、股票代码等实现更复杂的丢弃策略
|
||
// 例如:丢弃低优先级的历史数据包等
|
||
|
||
return false;
|
||
}
|
||
|
||
void QDataBuffer::cleanupExpiredPackets()
|
||
{
|
||
QMutexLocker locker(&m_mutex);
|
||
QDateTime now = QDateTime::currentDateTime();
|
||
|
||
// 清理过期的高优先级包(超过5分钟)
|
||
QQueue<DataPacket> newHighPriority;
|
||
for (const DataPacket& packet : m_highPriorityBuffer) {
|
||
if (packet.timestamp.secsTo(now) < 300) { // 5分钟
|
||
newHighPriority.enqueue(packet);
|
||
}
|
||
}
|
||
m_highPriorityBuffer = newHighPriority;
|
||
|
||
// 清理过期的普通包(超过2分钟)
|
||
QQueue<DataPacket> newBuffer;
|
||
for (const DataPacket& packet : m_buffer) {
|
||
if (packet.timestamp.secsTo(now) < 120) { // 2分钟
|
||
newBuffer.enqueue(packet);
|
||
}
|
||
}
|
||
m_buffer = newBuffer;
|
||
}
|