Files
QTradeProgram/Sqbase/qdatabuffer.h
2026-02-25 23:01:42 +08:00

129 lines
3.2 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#ifndef QDATABUFFER_H
#define QDATABUFFER_H
#include <QObject>
#include <QQueue>
#include <QMutex>
#include <QWaitCondition>
#include <QTimer>
#include <QElapsedTimer>
#include <QMap>
#include <QSet>
#include "BZStruct.h"
// 数据包类型
enum class DataPacketType {
RealTimeQuote,
OrderBook,
KLine,
Tick,
BrokerQueue,
Unknown
};
// 数据包结构
struct DataPacket {
DataPacketType type;
QString stockCode;
QDateTime timestamp;
QVariant data;
int priority; // 优先级 0-99为最高
DataPacket(DataPacketType t, const QString& code, const QVariant& d, int prio = 5)
: type(t), stockCode(code), timestamp(QDateTime::currentDateTime()), data(d), priority(prio) {}
};
// 流量统计
struct FlowStatistics {
qint64 totalPackets = 0;
qint64 droppedPackets = 0;
qint64 processedPackets = 0;
double packetsPerSecond = 0.0;
double dropRate = 0.0;
QDateTime lastUpdate;
void updateStats(qint64 total, qint64 dropped, qint64 processed) {
totalPackets = total;
droppedPackets = dropped;
processedPackets = processed;
dropRate = total > 0 ? static_cast<double>(dropped) / total : 0.0;
lastUpdate = QDateTime::currentDateTime();
}
};
class QDataBuffer : public QObject
{
Q_OBJECT
public:
static QDataBuffer* instance();
// 缓冲区配置
void setBufferSize(int maxSize);
void setFlowControlEnabled(bool enabled);
void setMaxPacketsPerSecond(int maxRate);
// 数据入队
bool enqueue(const DataPacket& packet);
bool enqueueHighPriority(const DataPacket& packet);
// 数据出队
DataPacket dequeue(int timeoutMs = 1000);
QList<DataPacket> dequeueBatch(int maxCount, int timeoutMs = 1000);
// 状态查询
bool isEmpty() const;
int size() const;
int getQueueSize(DataPacketType type) const;
FlowStatistics getStatistics() const;
// 流量控制
void pause();
void resume();
bool isPaused() const;
// 清理
void clear();
void clearByType(DataPacketType type);
void clearByStock(const QString& stockCode);
signals:
void bufferOverflow(const QString& stockCode, DataPacketType type);
void flowControlActivated(bool activated);
void statisticsUpdated(const FlowStatistics& stats);
void packetDropped(const DataPacket& packet, const QString& reason);
private:
explicit QDataBuffer(QObject *parent = nullptr);
~QDataBuffer();
void updateStatistics();
bool shouldDropPacket(const DataPacket& packet) const;
void cleanupExpiredPackets();
// 缓冲区
QQueue<DataPacket> m_buffer;
QQueue<DataPacket> m_highPriorityBuffer;
// 同步控制
mutable QMutex m_mutex;
QWaitCondition m_notEmpty;
QWaitCondition m_notFull;
// 配置
int m_maxBufferSize = 10000;
int m_maxPacketsPerSecond = 1000;
bool m_flowControlEnabled = true;
bool m_paused = false;
// 统计
FlowStatistics m_stats;
QElapsedTimer m_rateTimer;
qint64 m_currentSecondPackets = 0;
// 清理定时器
QTimer* m_cleanupTimer;
QTimer* m_statsUpdateTimer;
};
#endif // QDATABUFFER_H