start consuming ASAP

This commit is contained in:
chrislu
2024-05-19 14:52:38 -07:00
parent c21b32dd20
commit c6db3f31a1
5 changed files with 357 additions and 307 deletions

View File

@@ -42,19 +42,41 @@ func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance {
ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1),
}
}
func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) {
cg.onConsumerGroupInstanceChange("add consumer instance " + consumerGroupInstance)
func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic, maxPartitionCount, rebalanceSeconds int32) {
cg.onConsumerGroupInstanceChange(true, "add consumer instance " + consumerGroupInstance, maxPartitionCount, rebalanceSeconds)
}
func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) {
cg.onConsumerGroupInstanceChange("remove consumer instance " + consumerGroupInstance)
func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic, maxPartitionCount, rebalanceSeconds int32) {
cg.onConsumerGroupInstanceChange(false, "remove consumer instance " + consumerGroupInstance, maxPartitionCount, rebalanceSeconds)
}
func (cg *ConsumerGroup) onConsumerGroupInstanceChange(reason string) {
func (cg *ConsumerGroup) onConsumerGroupInstanceChange(isAdd bool, reason string, maxPartitionCount, rebalanceSeconds int32) {
if cg.reBalanceTimer != nil {
cg.reBalanceTimer.Stop()
cg.reBalanceTimer = nil
}
cg.reBalanceTimer = time.AfterFunc(5*time.Second, func() {
if maxPartitionCount == 0 {
maxPartitionCount = 1
}
if rebalanceSeconds == 0 {
rebalanceSeconds = 10
}
if isAdd {
if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil {
var sumMaxPartitionCount int32
for _, cgi := range cg.ConsumerGroupInstances.Items() {
sumMaxPartitionCount += cgi.MaxPartitionCount
}
if sumMaxPartitionCount < int32(len(conf.BrokerPartitionAssignments)) && sumMaxPartitionCount+maxPartitionCount >= int32(len(conf.BrokerPartitionAssignments)) {
partitionSlotToBrokerList := pub_balancer.NewPartitionSlotToBrokerList(pub_balancer.MaxPartitionCount)
for _, assignment := range conf.BrokerPartitionAssignments {
partitionSlotToBrokerList.AddBroker(assignment.Partition, assignment.LeaderBroker, assignment.FollowerBroker)
}
cg.BalanceConsumerGroupInstances(partitionSlotToBrokerList, reason)
return
}
}
}
cg.reBalanceTimer = time.AfterFunc(time.Duration(rebalanceSeconds)*time.Second, func() {
cg.BalanceConsumerGroupInstances(nil, reason)
cg.reBalanceTimer = nil
})

View File

@@ -69,7 +69,7 @@ func (c *Coordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinato
}
}
cgi.MaxPartitionCount = initMessage.MaxPartitionCount
cg.OnAddConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.Topic)
cg.OnAddConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.Topic, initMessage.MaxPartitionCount, initMessage.RebalanceSeconds)
return cgi
}
@@ -83,7 +83,7 @@ func (c *Coordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordin
return
}
cg.ConsumerGroupInstances.Remove(initMessage.ConsumerGroupInstanceId)
cg.OnRemoveConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.Topic)
cg.OnRemoveConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.Topic, initMessage.MaxPartitionCount, initMessage.RebalanceSeconds)
if cg.ConsumerGroupInstances.Count() == 0 {
tcg.ConsumerGroups.Remove(initMessage.ConsumerGroup)
}