track offset
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
@@ -53,6 +54,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
|
||||
}()
|
||||
|
||||
startPosition := b.getRequestPosition(req.GetInit())
|
||||
imt := sub_coordinator.NewInflightMessageTracker(int(req.GetInit().Concurrency))
|
||||
|
||||
// connect to the follower
|
||||
var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient
|
||||
@@ -97,8 +99,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
|
||||
glog.V(0).Infof("topic %v partition %v subscriber %s error: %v", t, partition, clientName, err)
|
||||
break
|
||||
}
|
||||
lastOffset = ack.GetAck().Sequence
|
||||
if subscribeFollowMeStream != nil {
|
||||
imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence)
|
||||
currentLastOffset := imt.GetOldest()
|
||||
if subscribeFollowMeStream != nil && currentLastOffset > lastOffset {
|
||||
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
|
||||
Message: &mq_pb.SubscribeFollowMeRequest_Ack{
|
||||
Ack: &mq_pb.SubscribeFollowMeRequest_AckMessage{
|
||||
@@ -110,6 +113,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
|
||||
break
|
||||
}
|
||||
println("forwarding ack", lastOffset)
|
||||
lastOffset = currentLastOffset
|
||||
}
|
||||
}
|
||||
if lastOffset > 0 {
|
||||
|
||||
@@ -37,7 +37,7 @@ func main() {
|
||||
}
|
||||
|
||||
processorConfig := sub_client.ProcessorConfiguration{
|
||||
ConcurrentPartitionLimit: 3,
|
||||
MaxPartitionCount: 3,
|
||||
}
|
||||
|
||||
brokers := strings.Split(*seedBrokers, ",")
|
||||
|
||||
@@ -63,7 +63,8 @@ func main() {
|
||||
}
|
||||
|
||||
processorConfig := sub_client.ProcessorConfiguration{
|
||||
ConcurrentPartitionLimit: 3,
|
||||
MaxPartitionCount: 3,
|
||||
PerPartitionConcurrency: 1,
|
||||
}
|
||||
|
||||
brokers := strings.Split(*seedBrokers, ",")
|
||||
|
||||
@@ -51,7 +51,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
|
||||
ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
|
||||
ConsumerGroupInstanceId: sub.SubscriberConfig.ConsumerGroupInstanceId,
|
||||
Topic: sub.ContentConfig.Topic.ToPbTopic(),
|
||||
MaxPartitionCount: sub.ProcessorConfig.ConcurrentPartitionLimit,
|
||||
MaxPartitionCount: sub.ProcessorConfig.MaxPartitionCount,
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
@@ -107,6 +107,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
|
||||
},
|
||||
Filter: sub.ContentConfig.Filter,
|
||||
FollowerBroker: assigned.FollowerBroker,
|
||||
Concurrency: sub.ProcessorConfig.PerPartitionConcurrency,
|
||||
},
|
||||
},
|
||||
});err != nil {
|
||||
@@ -124,7 +125,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
|
||||
close(partitionOffsetChan)
|
||||
}()
|
||||
|
||||
concurrentPartitionLimit := int(sub.ProcessorConfig.ConcurrentPartitionLimit)
|
||||
concurrentPartitionLimit := int(sub.ProcessorConfig.MaxPartitionCount)
|
||||
if concurrentPartitionLimit <= 0 {
|
||||
concurrentPartitionLimit = 1
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ func (sub *TopicSubscriber) startProcessors() {
|
||||
// listen to the messages from the sub coordinator
|
||||
// start one processor per partition
|
||||
var wg sync.WaitGroup
|
||||
semaphore := make(chan struct{}, sub.ProcessorConfig.ConcurrentPartitionLimit)
|
||||
semaphore := make(chan struct{}, sub.ProcessorConfig.MaxPartitionCount)
|
||||
|
||||
for assigned := range sub.brokerPartitionAssignmentChan {
|
||||
wg.Add(1)
|
||||
|
||||
@@ -22,7 +22,8 @@ type ContentConfiguration struct {
|
||||
}
|
||||
|
||||
type ProcessorConfiguration struct {
|
||||
ConcurrentPartitionLimit int32 // how many partitions to process concurrently
|
||||
MaxPartitionCount int32 // how many partitions to process concurrently
|
||||
PerPartitionConcurrency int32 // how many messages to process concurrently per partition
|
||||
}
|
||||
|
||||
type OnEachMessageFunc func(key, value []byte) (err error)
|
||||
|
||||
@@ -61,6 +61,10 @@ func (imt *InflightMessageTracker) AcknowledgeMessage(key []byte, tsNs int64) bo
|
||||
return true
|
||||
}
|
||||
|
||||
func (imt *InflightMessageTracker) GetOldest() int64 {
|
||||
return imt.timestamps.Oldest()
|
||||
}
|
||||
|
||||
// RingBuffer represents a circular buffer to hold timestamps.
|
||||
type RingBuffer struct {
|
||||
buffer []int64
|
||||
|
||||
Reference in New Issue
Block a user