subscriber keep connected to the balancer

This commit is contained in:
chrislu
2023-12-28 11:56:37 -08:00
parent bebbc9fe44
commit c950a40aad
9 changed files with 375 additions and 246 deletions

View File

@@ -5,6 +5,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"time"
)
type ConsumerGroupInstance struct {
@@ -16,7 +17,8 @@ type ConsumerGroupInstance struct {
type ConsumerGroup struct {
// map a consumer group instance id to a consumer group instance
ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
mapping *PartitionConsumerMapping
mapping *PartitionConsumerMapping
reBalanceTimer *time.Timer
}
func NewConsumerGroup() *ConsumerGroup {
@@ -33,10 +35,30 @@ func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance {
}
}
func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) {
cg.onConsumerGroupInstanceChange()
}
func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) {
cg.onConsumerGroupInstanceChange()
}
func (cg *ConsumerGroup) onConsumerGroupInstanceChange(){
if cg.reBalanceTimer != nil {
cg.reBalanceTimer.Stop()
cg.reBalanceTimer = nil
}
cg.reBalanceTimer = time.AfterFunc(5*time.Second, func() {
cg.Rebalance()
cg.reBalanceTimer = nil
})
}
func (cg *ConsumerGroup) OnPartitionListChange() {
if cg.reBalanceTimer != nil {
cg.reBalanceTimer.Stop()
cg.reBalanceTimer = nil
}
cg.Rebalance()
}
func (cg *ConsumerGroup) Rebalance() {
println("rebalance...")
}