Files
QTradeProgram/TradingCore/data_processing/DataBuffer.cpp

343 lines
8.5 KiB
C++
Raw Normal View History

2026-02-25 23:01:42 +08:00
#include "qdatabuffer.h"
#include <QDebug>
#include <QThread>
#include <QDateTime>
QDataBuffer* QDataBuffer::instance()
{
static QDataBuffer instance;
return &instance;
}
QDataBuffer::QDataBuffer(QObject *parent)
: QObject(parent)
, m_cleanupTimer(new QTimer(this))
, m_statsUpdateTimer(new QTimer(this))
{
// 设置清理定时器每30秒清理过期数据包
m_cleanupTimer->setInterval(30000);
connect(m_cleanupTimer, &QTimer::timeout, this, &QDataBuffer::cleanupExpiredPackets);
m_cleanupTimer->start();
// 设置统计更新定时器,每秒更新一次
m_statsUpdateTimer->setInterval(1000);
connect(m_statsUpdateTimer, &QTimer::timeout, this, &QDataBuffer::updateStatistics);
m_statsUpdateTimer->start();
m_rateTimer.start();
}
QDataBuffer::~QDataBuffer()
{
m_cleanupTimer->stop();
m_statsUpdateTimer->stop();
}
void QDataBuffer::setBufferSize(int maxSize)
{
QMutexLocker locker(&m_mutex);
m_maxBufferSize = maxSize;
}
void QDataBuffer::setFlowControlEnabled(bool enabled)
{
QMutexLocker locker(&m_mutex);
m_flowControlEnabled = enabled;
}
void QDataBuffer::setMaxPacketsPerSecond(int maxRate)
{
QMutexLocker locker(&m_mutex);
m_maxPacketsPerSecond = maxRate;
}
bool QDataBuffer::enqueue(const DataPacket& packet)
{
QMutexLocker locker(&m_mutex);
if (m_paused) {
emit packetDropped(packet, "Buffer paused");
return false;
}
if (shouldDropPacket(packet)) {
m_stats.droppedPackets++;
emit packetDropped(packet, "Flow control");
return false;
}
// 检查缓冲区是否已满
if (m_buffer.size() + m_highPriorityBuffer.size() >= m_maxBufferSize) {
m_stats.droppedPackets++;
emit bufferOverflow(packet.stockCode, packet.type);
emit packetDropped(packet, "Buffer overflow");
return false;
}
m_buffer.enqueue(packet);
m_stats.totalPackets++;
m_currentSecondPackets++;
// 通知等待的消费者
m_notEmpty.wakeOne();
return true;
}
bool QDataBuffer::enqueueHighPriority(const DataPacket& packet)
{
QMutexLocker locker(&m_mutex);
if (m_paused) {
emit packetDropped(packet, "Buffer paused");
return false;
}
// 高优先级包不受流量控制限制
if (m_highPriorityBuffer.size() + m_buffer.size() >= m_maxBufferSize) {
m_stats.droppedPackets++;
emit bufferOverflow(packet.stockCode, packet.type);
emit packetDropped(packet, "Buffer overflow");
return false;
}
m_highPriorityBuffer.enqueue(packet);
m_stats.totalPackets++;
m_currentSecondPackets++;
// 通知等待的消费者
m_notEmpty.wakeOne();
return true;
}
DataPacket QDataBuffer::dequeue(int timeoutMs)
{
QMutexLocker locker(&m_mutex);
// 等待数据可用
while (m_highPriorityBuffer.isEmpty() && m_buffer.isEmpty() && !m_paused) {
if (!m_notEmpty.wait(&m_mutex, timeoutMs)) {
// 超时返回空包
return DataPacket(DataPacketType::Unknown, "", QVariant());
}
}
// 优先处理高优先级缓冲区
if (!m_highPriorityBuffer.isEmpty()) {
DataPacket packet = m_highPriorityBuffer.dequeue();
m_stats.processedPackets++;
return packet;
}
if (!m_buffer.isEmpty()) {
DataPacket packet = m_buffer.dequeue();
m_stats.processedPackets++;
return packet;
}
// 如果暂停状态,返回空包
return DataPacket(DataPacketType::Unknown, "", QVariant());
}
QList<DataPacket> QDataBuffer::dequeueBatch(int maxCount, int timeoutMs)
{
QMutexLocker locker(&m_mutex);
QList<DataPacket> result;
// 等待数据可用
while (m_highPriorityBuffer.isEmpty() && m_buffer.isEmpty() && !m_paused) {
if (!m_notEmpty.wait(&m_mutex, timeoutMs)) {
return result; // 超时返回空列表
}
}
// 优先处理高优先级缓冲区
while (!m_highPriorityBuffer.isEmpty() && result.size() < maxCount) {
result.append(m_highPriorityBuffer.dequeue());
m_stats.processedPackets++;
}
// 然后处理普通缓冲区
while (!m_buffer.isEmpty() && result.size() < maxCount) {
result.append(m_buffer.dequeue());
m_stats.processedPackets++;
}
return result;
}
bool QDataBuffer::isEmpty() const
{
QMutexLocker locker(&m_mutex);
return m_highPriorityBuffer.isEmpty() && m_buffer.isEmpty();
}
int QDataBuffer::size() const
{
QMutexLocker locker(&m_mutex);
return m_highPriorityBuffer.size() + m_buffer.size();
}
int QDataBuffer::getQueueSize(DataPacketType type) const
{
QMutexLocker locker(&m_mutex);
int count = 0;
// 统计高优先级缓冲区
for (const DataPacket& packet : m_highPriorityBuffer) {
if (packet.type == type) {
count++;
}
}
// 统计普通缓冲区
for (const DataPacket& packet : m_buffer) {
if (packet.type == type) {
count++;
}
}
return count;
}
FlowStatistics QDataBuffer::getStatistics() const
{
QMutexLocker locker(&m_mutex);
return m_stats;
}
void QDataBuffer::pause()
{
QMutexLocker locker(&m_mutex);
m_paused = true;
emit flowControlActivated(true);
}
void QDataBuffer::resume()
{
QMutexLocker locker(&m_mutex);
m_paused = false;
m_notEmpty.wakeAll(); // 唤醒所有等待的消费者
emit flowControlActivated(false);
}
bool QDataBuffer::isPaused() const
{
QMutexLocker locker(&m_mutex);
return m_paused;
}
void QDataBuffer::clear()
{
QMutexLocker locker(&m_mutex);
m_highPriorityBuffer.clear();
m_buffer.clear();
}
void QDataBuffer::clearByType(DataPacketType type)
{
QMutexLocker locker(&m_mutex);
// 清理高优先级缓冲区
QQueue<DataPacket> newHighPriority;
for (const DataPacket& packet : m_highPriorityBuffer) {
if (packet.type != type) {
newHighPriority.enqueue(packet);
}
}
m_highPriorityBuffer = newHighPriority;
// 清理普通缓冲区
QQueue<DataPacket> newBuffer;
for (const DataPacket& packet : m_buffer) {
if (packet.type != type) {
newBuffer.enqueue(packet);
}
}
m_buffer = newBuffer;
}
void QDataBuffer::clearByStock(const QString& stockCode)
{
QMutexLocker locker(&m_mutex);
// 清理高优先级缓冲区
QQueue<DataPacket> newHighPriority;
for (const DataPacket& packet : m_highPriorityBuffer) {
if (packet.stockCode != stockCode) {
newHighPriority.enqueue(packet);
}
}
m_highPriorityBuffer = newHighPriority;
// 清理普通缓冲区
QQueue<DataPacket> newBuffer;
for (const DataPacket& packet : m_buffer) {
if (packet.stockCode != stockCode) {
newBuffer.enqueue(packet);
}
}
m_buffer = newBuffer;
}
void QDataBuffer::updateStatistics()
{
QMutexLocker locker(&m_mutex);
// 计算每秒包数
if (m_rateTimer.elapsed() >= 1000) {
m_stats.packetsPerSecond = static_cast<double>(m_currentSecondPackets) * 1000 / m_rateTimer.elapsed();
m_currentSecondPackets = 0;
m_rateTimer.restart();
}
// 更新统计信息
m_stats.updateStats(m_stats.totalPackets, m_stats.droppedPackets, m_stats.processedPackets);
locker.unlock();
emit statisticsUpdated(m_stats);
}
bool QDataBuffer::shouldDropPacket(const DataPacket& packet) const
{
if (!m_flowControlEnabled) {
return false;
}
// 检查速率限制
if (m_currentSecondPackets >= m_maxPacketsPerSecond) {
return true;
}
// 可以根据包类型、股票代码等实现更复杂的丢弃策略
// 例如:丢弃低优先级的历史数据包等
return false;
}
void QDataBuffer::cleanupExpiredPackets()
{
QMutexLocker locker(&m_mutex);
QDateTime now = QDateTime::currentDateTime();
// 清理过期的高优先级包超过5分钟
QQueue<DataPacket> newHighPriority;
for (const DataPacket& packet : m_highPriorityBuffer) {
if (packet.timestamp.secsTo(now) < 300) { // 5分钟
newHighPriority.enqueue(packet);
}
}
m_highPriorityBuffer = newHighPriority;
// 清理过期的普通包超过2分钟
QQueue<DataPacket> newBuffer;
for (const DataPacket& packet : m_buffer) {
if (packet.timestamp.secsTo(now) < 120) { // 2分钟
newBuffer.enqueue(packet);
}
}
m_buffer = newBuffer;
}