fix inflight message tracker
This commit is contained in:
@@ -101,11 +101,15 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
|
|||||||
}})
|
}})
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
glog.V(0).Infof("topic %v partition %v subscriber %s error: %v", t, partition, clientName, err)
|
glog.V(0).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
if ack.GetAck().Key == nil {
|
||||||
|
// skip ack for control messages
|
||||||
|
continue
|
||||||
|
}
|
||||||
imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence)
|
imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence)
|
||||||
currentLastOffset := imt.GetOldest()
|
currentLastOffset := imt.GetOldestAckedTimestamp()
|
||||||
fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset)
|
fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset)
|
||||||
if subscribeFollowMeStream != nil && currentLastOffset > lastOffset {
|
if subscribeFollowMeStream != nil && currentLastOffset > lastOffset {
|
||||||
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
|
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
|
||||||
@@ -124,16 +128,16 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
|
|||||||
}
|
}
|
||||||
if lastOffset > 0 {
|
if lastOffset > 0 {
|
||||||
if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil {
|
if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil {
|
||||||
glog.Errorf("saveConsumerGroupOffset: %v", err)
|
glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err)
|
||||||
}
|
}
|
||||||
if subscribeFollowMeStream != nil {
|
}
|
||||||
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
|
if subscribeFollowMeStream != nil {
|
||||||
Message: &mq_pb.SubscribeFollowMeRequest_Close{
|
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
|
||||||
Close: &mq_pb.SubscribeFollowMeRequest_CloseMessage{},
|
Message: &mq_pb.SubscribeFollowMeRequest_Close{
|
||||||
},
|
Close: &mq_pb.SubscribeFollowMeRequest_CloseMessage{},
|
||||||
}); err != nil {
|
},
|
||||||
glog.Errorf("Error sending close to follower: %v", err)
|
}); err != nil {
|
||||||
}
|
glog.Errorf("Error sending close to follower: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@@ -170,8 +174,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
|
|||||||
for imt.IsInflight(logEntry.Key) {
|
for imt.IsInflight(logEntry.Key) {
|
||||||
time.Sleep(137 * time.Millisecond)
|
time.Sleep(137 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
if logEntry.Key != nil {
|
||||||
imt.InflightMessage(logEntry.Key, logEntry.TsNs)
|
imt.EnflightMessage(logEntry.Key, logEntry.TsNs)
|
||||||
|
}
|
||||||
|
|
||||||
if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
|
if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
|
||||||
Data: &mq_pb.DataMessage{
|
Data: &mq_pb.DataMessage{
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package sub_coordinator
|
package sub_coordinator
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
@@ -18,13 +19,14 @@ func NewInflightMessageTracker(capacity int) *InflightMessageTracker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// InflightMessage tracks the message with the key and timestamp.
|
// EnflightMessage tracks the message with the key and timestamp.
|
||||||
// These messages are sent to the consumer group instances and waiting for ack.
|
// These messages are sent to the consumer group instances and waiting for ack.
|
||||||
func (imt *InflightMessageTracker) InflightMessage(key []byte, tsNs int64) {
|
func (imt *InflightMessageTracker) EnflightMessage(key []byte, tsNs int64) {
|
||||||
|
fmt.Printf("EnflightMessage(%s,%d)\n", string(key), tsNs)
|
||||||
imt.mu.Lock()
|
imt.mu.Lock()
|
||||||
defer imt.mu.Unlock()
|
defer imt.mu.Unlock()
|
||||||
imt.messages[string(key)] = tsNs
|
imt.messages[string(key)] = tsNs
|
||||||
imt.timestamps.Add(tsNs)
|
imt.timestamps.EnflightTimestamp(tsNs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsMessageAcknowledged returns true if the message has been acknowledged.
|
// IsMessageAcknowledged returns true if the message has been acknowledged.
|
||||||
@@ -35,7 +37,7 @@ func (imt *InflightMessageTracker) IsMessageAcknowledged(key []byte, tsNs int64)
|
|||||||
imt.mu.Lock()
|
imt.mu.Lock()
|
||||||
defer imt.mu.Unlock()
|
defer imt.mu.Unlock()
|
||||||
|
|
||||||
if tsNs < imt.timestamps.Oldest() {
|
if tsNs <= imt.timestamps.OldestAckedTimestamp() {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if tsNs > imt.timestamps.Latest() {
|
if tsNs > imt.timestamps.Latest() {
|
||||||
@@ -51,6 +53,7 @@ func (imt *InflightMessageTracker) IsMessageAcknowledged(key []byte, tsNs int64)
|
|||||||
|
|
||||||
// AcknowledgeMessage acknowledges the message with the key and timestamp.
|
// AcknowledgeMessage acknowledges the message with the key and timestamp.
|
||||||
func (imt *InflightMessageTracker) AcknowledgeMessage(key []byte, tsNs int64) bool {
|
func (imt *InflightMessageTracker) AcknowledgeMessage(key []byte, tsNs int64) bool {
|
||||||
|
fmt.Printf("AcknowledgeMessage(%s,%d)\n", string(key), tsNs)
|
||||||
imt.mu.Lock()
|
imt.mu.Lock()
|
||||||
defer imt.mu.Unlock()
|
defer imt.mu.Unlock()
|
||||||
timestamp, exists := imt.messages[string(key)]
|
timestamp, exists := imt.messages[string(key)]
|
||||||
@@ -59,12 +62,12 @@ func (imt *InflightMessageTracker) AcknowledgeMessage(key []byte, tsNs int64) bo
|
|||||||
}
|
}
|
||||||
delete(imt.messages, string(key))
|
delete(imt.messages, string(key))
|
||||||
// Remove the specific timestamp from the ring buffer.
|
// Remove the specific timestamp from the ring buffer.
|
||||||
imt.timestamps.Remove(tsNs)
|
imt.timestamps.AckTimestamp(tsNs)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (imt *InflightMessageTracker) GetOldest() int64 {
|
func (imt *InflightMessageTracker) GetOldestAckedTimestamp() int64 {
|
||||||
return imt.timestamps.Oldest()
|
return imt.timestamps.OldestAckedTimestamp()
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsInflight returns true if the message with the key is inflight.
|
// IsInflight returns true if the message with the key is inflight.
|
||||||
@@ -75,63 +78,81 @@ func (imt *InflightMessageTracker) IsInflight(key []byte) bool {
|
|||||||
return found
|
return found
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TimestampStatus struct {
|
||||||
|
Timestamp int64
|
||||||
|
Acked bool
|
||||||
|
}
|
||||||
|
|
||||||
// RingBuffer represents a circular buffer to hold timestamps.
|
// RingBuffer represents a circular buffer to hold timestamps.
|
||||||
type RingBuffer struct {
|
type RingBuffer struct {
|
||||||
buffer []int64
|
buffer []*TimestampStatus
|
||||||
head int
|
head int
|
||||||
size int
|
size int
|
||||||
|
maxTimestamp int64
|
||||||
|
minAckedTs int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRingBuffer creates a new RingBuffer of the given capacity.
|
// NewRingBuffer creates a new RingBuffer of the given capacity.
|
||||||
func NewRingBuffer(capacity int) *RingBuffer {
|
func NewRingBuffer(capacity int) *RingBuffer {
|
||||||
return &RingBuffer{
|
return &RingBuffer{
|
||||||
buffer: make([]int64, capacity),
|
buffer: newBuffer(capacity),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add adds a new timestamp to the ring buffer.
|
func newBuffer(capacity int) []*TimestampStatus {
|
||||||
func (rb *RingBuffer) Add(timestamp int64) {
|
buffer := make([]*TimestampStatus, capacity)
|
||||||
rb.buffer[rb.head] = timestamp
|
for i := range buffer {
|
||||||
rb.head = (rb.head + 1) % len(rb.buffer)
|
buffer[i] = &TimestampStatus{}
|
||||||
|
}
|
||||||
|
return buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnflightTimestamp adds a new timestamp to the ring buffer.
|
||||||
|
func (rb *RingBuffer) EnflightTimestamp(timestamp int64) {
|
||||||
if rb.size < len(rb.buffer) {
|
if rb.size < len(rb.buffer) {
|
||||||
rb.size++
|
rb.size++
|
||||||
|
} else {
|
||||||
|
newBuf := newBuffer(2*len(rb.buffer))
|
||||||
|
for i := 0; i < rb.size; i++ {
|
||||||
|
newBuf[i] = rb.buffer[(rb.head+len(rb.buffer)-rb.size+i)%len(rb.buffer)]
|
||||||
|
}
|
||||||
|
rb.buffer = newBuf
|
||||||
|
rb.head = rb.size
|
||||||
|
rb.size++
|
||||||
|
}
|
||||||
|
head := rb.buffer[rb.head]
|
||||||
|
head.Timestamp = timestamp
|
||||||
|
head.Acked = false
|
||||||
|
rb.head = (rb.head + 1) % len(rb.buffer)
|
||||||
|
if timestamp > rb.maxTimestamp {
|
||||||
|
rb.maxTimestamp = timestamp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove removes the specified timestamp from the ring buffer.
|
// AckTimestamp removes the specified timestamp from the ring buffer.
|
||||||
func (rb *RingBuffer) Remove(timestamp int64) {
|
func (rb *RingBuffer) AckTimestamp(timestamp int64) {
|
||||||
// Perform binary search
|
// Perform binary search
|
||||||
index := sort.Search(rb.size, func(i int) bool {
|
index := sort.Search(rb.size, func(i int) bool {
|
||||||
return rb.buffer[(rb.head+len(rb.buffer)-rb.size+i)%len(rb.buffer)] >= timestamp
|
return rb.buffer[(rb.head+len(rb.buffer)-rb.size+i)%len(rb.buffer)].Timestamp >= timestamp
|
||||||
})
|
})
|
||||||
actualIndex := (rb.head + len(rb.buffer) - rb.size + index) % len(rb.buffer)
|
actualIndex := (rb.head + len(rb.buffer) - rb.size + index) % len(rb.buffer)
|
||||||
|
|
||||||
if index < rb.size && rb.buffer[actualIndex] == timestamp {
|
rb.buffer[actualIndex].Acked = true
|
||||||
// Shift elements to maintain the buffer order
|
|
||||||
for i := index; i < rb.size-1; i++ {
|
// Remove all the acknowledged timestamps from the buffer
|
||||||
fromIndex := (rb.head + len(rb.buffer) - rb.size + i + 1) % len(rb.buffer)
|
startPos := (rb.head + len(rb.buffer) - rb.size) % len(rb.buffer)
|
||||||
toIndex := (rb.head + len(rb.buffer) - rb.size + i) % len(rb.buffer)
|
for i := 0; i < len(rb.buffer) && rb.buffer[(startPos+i)%len(rb.buffer)].Acked; i++ {
|
||||||
rb.buffer[toIndex] = rb.buffer[fromIndex]
|
|
||||||
}
|
|
||||||
rb.size--
|
rb.size--
|
||||||
rb.buffer[(rb.head+len(rb.buffer)-1)%len(rb.buffer)] = 0 // Clear the last element
|
rb.minAckedTs = rb.buffer[(startPos+i)%len(rb.buffer)].Timestamp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Oldest returns the oldest timestamp in the ring buffer.
|
// OldestAckedTimestamp returns the oldest that is already acked timestamp in the ring buffer.
|
||||||
func (rb *RingBuffer) Oldest() int64 {
|
func (rb *RingBuffer) OldestAckedTimestamp() int64 {
|
||||||
if rb.size == 0 {
|
return rb.minAckedTs
|
||||||
return 0
|
|
||||||
}
|
|
||||||
oldestIndex := (rb.head + len(rb.buffer) - rb.size) % len(rb.buffer)
|
|
||||||
return rb.buffer[oldestIndex]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Latest returns the most recently added timestamp in the ring buffer.
|
// Latest returns the most recently known timestamp in the ring buffer.
|
||||||
func (rb *RingBuffer) Latest() int64 {
|
func (rb *RingBuffer) Latest() int64 {
|
||||||
if rb.size == 0 {
|
return rb.maxTimestamp
|
||||||
return 0
|
|
||||||
}
|
|
||||||
latestIndex := (rb.head + len(rb.buffer) - 1) % len(rb.buffer)
|
|
||||||
return rb.buffer[latestIndex]
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,9 +1,8 @@
|
|||||||
package sub_coordinator
|
package sub_coordinator
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sort"
|
"github.com/stretchr/testify/assert"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRingBuffer(t *testing.T) {
|
func TestRingBuffer(t *testing.T) {
|
||||||
@@ -13,7 +12,7 @@ func TestRingBuffer(t *testing.T) {
|
|||||||
// Add timestamps to the buffer
|
// Add timestamps to the buffer
|
||||||
timestamps := []int64{100, 200, 300, 400, 500}
|
timestamps := []int64{100, 200, 300, 400, 500}
|
||||||
for _, ts := range timestamps {
|
for _, ts := range timestamps {
|
||||||
rb.Add(ts)
|
rb.EnflightTimestamp(ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test Add method and buffer size
|
// Test Add method and buffer size
|
||||||
@@ -22,38 +21,25 @@ func TestRingBuffer(t *testing.T) {
|
|||||||
t.Errorf("Expected buffer size %d, got %d", expectedSize, rb.size)
|
t.Errorf("Expected buffer size %d, got %d", expectedSize, rb.size)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test Oldest and Latest methods
|
assert.Equal(t, int64(0), rb.OldestAckedTimestamp())
|
||||||
expectedOldest := int64(100)
|
assert.Equal(t, int64(500), rb.Latest())
|
||||||
if oldest := rb.Oldest(); oldest != expectedOldest {
|
|
||||||
t.Errorf("Expected oldest timestamp %d, got %d", expectedOldest, oldest)
|
|
||||||
}
|
|
||||||
expectedLatest := int64(500)
|
|
||||||
if latest := rb.Latest(); latest != expectedLatest {
|
|
||||||
t.Errorf("Expected latest timestamp %d, got %d", expectedLatest, latest)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test Remove method
|
rb.AckTimestamp(200)
|
||||||
rb.Remove(200)
|
assert.Equal(t, int64(0), rb.OldestAckedTimestamp())
|
||||||
expectedSize--
|
rb.AckTimestamp(100)
|
||||||
if rb.size != expectedSize {
|
assert.Equal(t, int64(200), rb.OldestAckedTimestamp())
|
||||||
t.Errorf("Expected buffer size %d after removal, got %d", expectedSize, rb.size)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test removal of non-existent element
|
rb.EnflightTimestamp(int64(600))
|
||||||
rb.Remove(600)
|
rb.EnflightTimestamp(int64(700))
|
||||||
if rb.size != expectedSize {
|
|
||||||
t.Errorf("Expected buffer size %d after attempting removal of non-existent element, got %d", expectedSize, rb.size)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test binary search correctness
|
rb.AckTimestamp(500)
|
||||||
target := int64(300)
|
assert.Equal(t, int64(200), rb.OldestAckedTimestamp())
|
||||||
index := sort.Search(rb.size, func(i int) bool {
|
rb.AckTimestamp(400)
|
||||||
return rb.buffer[(rb.head+len(rb.buffer)-rb.size+i)%len(rb.buffer)] >= target
|
assert.Equal(t, int64(200), rb.OldestAckedTimestamp())
|
||||||
})
|
rb.AckTimestamp(300)
|
||||||
actualIndex := (rb.head + len(rb.buffer) - rb.size + index) % len(rb.buffer)
|
assert.Equal(t, int64(500), rb.OldestAckedTimestamp())
|
||||||
if rb.buffer[actualIndex] != target {
|
|
||||||
t.Errorf("Binary search failed to find the correct index for timestamp %d", target)
|
assert.Equal(t, int64(700), rb.Latest())
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInflightMessageTracker(t *testing.T) {
|
func TestInflightMessageTracker(t *testing.T) {
|
||||||
@@ -61,9 +47,9 @@ func TestInflightMessageTracker(t *testing.T) {
|
|||||||
tracker := NewInflightMessageTracker(5)
|
tracker := NewInflightMessageTracker(5)
|
||||||
|
|
||||||
// Add inflight messages
|
// Add inflight messages
|
||||||
key := []byte("exampleKey")
|
key := []byte("1")
|
||||||
timestamp := time.Now().UnixNano()
|
timestamp := int64(1)
|
||||||
tracker.InflightMessage(key, timestamp)
|
tracker.EnflightMessage(key, timestamp)
|
||||||
|
|
||||||
// Test IsMessageAcknowledged method
|
// Test IsMessageAcknowledged method
|
||||||
isOld := tracker.IsMessageAcknowledged(key, timestamp-10)
|
isOld := tracker.IsMessageAcknowledged(key, timestamp-10)
|
||||||
@@ -82,4 +68,29 @@ func TestInflightMessageTracker(t *testing.T) {
|
|||||||
if tracker.timestamps.size != 0 {
|
if tracker.timestamps.size != 0 {
|
||||||
t.Error("Expected buffer size to be 0 after ack")
|
t.Error("Expected buffer size to be 0 after ack")
|
||||||
}
|
}
|
||||||
|
assert.Equal(t, timestamp, tracker.GetOldestAckedTimestamp())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInflightMessageTracker2(t *testing.T) {
|
||||||
|
// Initialize an InflightMessageTracker with initial capacity 1
|
||||||
|
tracker := NewInflightMessageTracker(1)
|
||||||
|
|
||||||
|
tracker.EnflightMessage([]byte("1"), int64(1))
|
||||||
|
tracker.EnflightMessage([]byte("2"), int64(2))
|
||||||
|
tracker.EnflightMessage([]byte("3"), int64(3))
|
||||||
|
tracker.EnflightMessage([]byte("4"), int64(4))
|
||||||
|
tracker.EnflightMessage([]byte("5"), int64(5))
|
||||||
|
assert.True(t, tracker.AcknowledgeMessage([]byte("1"), int64(1)))
|
||||||
|
assert.Equal(t, int64(1), tracker.GetOldestAckedTimestamp())
|
||||||
|
|
||||||
|
// Test IsMessageAcknowledged method
|
||||||
|
isAcked := tracker.IsMessageAcknowledged([]byte("2"), int64(2))
|
||||||
|
if isAcked {
|
||||||
|
t.Error("Expected message to be not acked")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test AcknowledgeMessage method
|
||||||
|
assert.True(t, tracker.AcknowledgeMessage([]byte("2"), int64(2)))
|
||||||
|
assert.Equal(t, int64(2), tracker.GetOldestAckedTimestamp())
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user