refactoring
This commit is contained in:
@@ -89,7 +89,7 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Use buffer_start offset for precise deduplication
|
// Use buffer_start offset for precise deduplication
|
||||||
lastFlushTsNs := localPartition.LogBuffer.LastFlushTsNs
|
lastFlushTsNs := localPartition.LogBuffer.GetLastFlushTsNs()
|
||||||
startBufferOffset := req.StartBufferOffset
|
startBufferOffset := req.StartBufferOffset
|
||||||
startTimeNs := lastFlushTsNs // Still respect last flush time for safety
|
startTimeNs := lastFlushTsNs // Still respect last flush time for safety
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package broker
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
@@ -41,14 +40,14 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic.StoreInt64(&logBuffer.LastFlushTsNs, stopTime.UnixNano())
|
logBuffer.SetLastFlushTsNs(stopTime.UnixNano())
|
||||||
|
|
||||||
b.accessLock.Lock()
|
b.accessLock.Lock()
|
||||||
defer b.accessLock.Unlock()
|
defer b.accessLock.Unlock()
|
||||||
if localPartition := b.localTopicManager.GetLocalPartition(t, p); localPartition != nil {
|
if localPartition := b.localTopicManager.GetLocalPartition(t, p); localPartition != nil {
|
||||||
localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs)
|
localPartition.NotifyLogFlushed(logBuffer.GetLastFlushTsNs())
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(0).Infof("flushing at %d to %s size %d from buffer %s (offset %d)", logBuffer.LastFlushTsNs, targetFile, len(buf), logBuffer.GetName(), bufferOffset)
|
glog.V(0).Infof("flushing at %d to %s size %d from buffer %s (offset %d)", logBuffer.GetLastFlushTsNs(), targetFile, len(buf), logBuffer.GetName(), bufferOffset)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -55,35 +55,36 @@ type CachedDiskChunk struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type LogBuffer struct {
|
type LogBuffer struct {
|
||||||
LastFlushTsNs int64
|
// 8-byte aligned fields
|
||||||
name string
|
LastTsNs atomic.Int64
|
||||||
prevBuffers *SealedBuffers
|
lastFlushTsNs atomic.Int64
|
||||||
buf []byte
|
lastFlushedOffset atomic.Int64 // Highest offset that has been flushed to disk (-1 = nothing flushed yet)
|
||||||
offset int64 // Last offset in current buffer (endOffset)
|
offset int64
|
||||||
bufferStartOffset int64 // First offset in current buffer
|
bufferStartOffset int64
|
||||||
idx []int
|
minOffset int64
|
||||||
pos int
|
maxOffset int64
|
||||||
|
flushInterval time.Duration
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
stopTime time.Time
|
stopTime time.Time
|
||||||
lastFlushDataTime time.Time
|
|
||||||
sizeBuf []byte
|
// Other fields
|
||||||
flushInterval time.Duration
|
name string
|
||||||
flushFn LogFlushFuncType
|
prevBuffers *SealedBuffers
|
||||||
ReadFromDiskFn LogReadFromDiskFuncType
|
buf []byte
|
||||||
notifyFn func()
|
idx []int
|
||||||
|
pos int
|
||||||
|
sizeBuf []byte
|
||||||
|
flushFn LogFlushFuncType
|
||||||
|
ReadFromDiskFn LogReadFromDiskFuncType
|
||||||
|
notifyFn func()
|
||||||
// Per-subscriber notification channels for instant wake-up
|
// Per-subscriber notification channels for instant wake-up
|
||||||
subscribersMu sync.RWMutex
|
subscribersMu sync.RWMutex
|
||||||
subscribers map[string]chan struct{} // subscriberID -> notification channel
|
subscribers map[string]chan struct{} // subscriberID -> notification channel
|
||||||
isStopping *atomic.Bool
|
isStopping *atomic.Bool
|
||||||
isAllFlushed bool
|
isAllFlushed bool
|
||||||
flushChan chan *dataToFlush
|
flushChan chan *dataToFlush
|
||||||
LastTsNs atomic.Int64
|
|
||||||
// Offset range tracking for Kafka integration
|
// Offset range tracking for Kafka integration
|
||||||
minOffset int64
|
hasOffsets bool
|
||||||
maxOffset int64
|
|
||||||
hasOffsets bool
|
|
||||||
lastFlushedOffset atomic.Int64 // Highest offset that has been flushed to disk (-1 = nothing flushed yet)
|
|
||||||
lastFlushTsNs atomic.Int64 // Latest timestamp that has been flushed to disk (0 = nothing flushed yet)
|
|
||||||
// Disk chunk cache for historical data reads
|
// Disk chunk cache for historical data reads
|
||||||
diskChunkCache *DiskChunkCache
|
diskChunkCache *DiskChunkCache
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
@@ -235,7 +236,7 @@ func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn
|
|||||||
logBuffer.bufferStartOffset = nextOffset
|
logBuffer.bufferStartOffset = nextOffset
|
||||||
// CRITICAL: Track that data [0...highestOffset] is on disk
|
// CRITICAL: Track that data [0...highestOffset] is on disk
|
||||||
logBuffer.lastFlushedOffset.Store(highestOffset)
|
logBuffer.lastFlushedOffset.Store(highestOffset)
|
||||||
// Set lastFlushedTime to current time (we know data up to highestOffset is on disk)
|
// Set lastFlushTsNs to current time (we know data up to highestOffset is on disk)
|
||||||
logBuffer.lastFlushTsNs.Store(time.Now().UnixNano())
|
logBuffer.lastFlushTsNs.Store(time.Now().UnixNano())
|
||||||
} else {
|
} else {
|
||||||
logBuffer.bufferStartOffset = 0 // Start from offset 0
|
logBuffer.bufferStartOffset = 0 // Start from offset 0
|
||||||
@@ -507,10 +508,6 @@ func (logBuffer *LogBuffer) loopFlush() {
|
|||||||
logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset)
|
logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset)
|
||||||
d.releaseMemory()
|
d.releaseMemory()
|
||||||
// local logbuffer is different from aggregate logbuffer here
|
// local logbuffer is different from aggregate logbuffer here
|
||||||
logBuffer.lastFlushDataTime = d.stopTime
|
|
||||||
|
|
||||||
// CRITICAL: Track what's been flushed to disk for both offset-based and time-based reads
|
|
||||||
// Use >= 0 to include offset 0 (first message in a topic)
|
|
||||||
if d.maxOffset >= 0 {
|
if d.maxOffset >= 0 {
|
||||||
logBuffer.lastFlushedOffset.Store(d.maxOffset)
|
logBuffer.lastFlushedOffset.Store(d.maxOffset)
|
||||||
}
|
}
|
||||||
@@ -567,8 +564,6 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush
|
|||||||
if withCallback {
|
if withCallback {
|
||||||
d.done = make(chan struct{})
|
d.done = make(chan struct{})
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
logBuffer.lastFlushDataTime = logBuffer.stopTime
|
|
||||||
}
|
}
|
||||||
// CRITICAL: logBuffer.offset is the "next offset to assign", so last offset in buffer is offset-1
|
// CRITICAL: logBuffer.offset is the "next offset to assign", so last offset in buffer is offset-1
|
||||||
lastOffsetInBuffer := logBuffer.offset - 1
|
lastOffsetInBuffer := logBuffer.offset - 1
|
||||||
@@ -624,6 +619,10 @@ func (logBuffer *LogBuffer) GetLastFlushTsNs() int64 {
|
|||||||
return logBuffer.lastFlushTsNs.Load()
|
return logBuffer.lastFlushTsNs.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (logBuffer *LogBuffer) SetLastFlushTsNs(ts int64) {
|
||||||
|
logBuffer.lastFlushTsNs.Store(ts)
|
||||||
|
}
|
||||||
|
|
||||||
func (d *dataToFlush) releaseMemory() {
|
func (d *dataToFlush) releaseMemory() {
|
||||||
d.data.Reset()
|
d.data.Reset()
|
||||||
bufferPool.Put(d.data)
|
bufferPool.Put(d.data)
|
||||||
|
|||||||
Reference in New Issue
Block a user