client side stop partition subscribing if unassigned

This commit is contained in:
chrislu
2024-05-21 08:42:04 -07:00
parent 6634b42981
commit fa98ecf71e
2 changed files with 27 additions and 12 deletions

View File

@@ -9,6 +9,7 @@ import (
)
type ProcessorState struct {
stopCh chan struct{}
}
// Subscribe subscribes to a topic's specified partitions.
@@ -41,8 +42,11 @@ func (sub *TopicSubscriber) startProcessors() {
sub.waitUntilNoOverlappingPartitionInFlight(topicPartition)
// start a processors
stopChan := make(chan struct{})
sub.activeProcessorsLock.Lock()
sub.activeProcessors[topicPartition] = &ProcessorState{}
sub.activeProcessors[topicPartition] = &ProcessorState{
stopCh: stopChan,
}
sub.activeProcessorsLock.Unlock()
go func(assigned *mq_pb.BrokerPartitionAssignment, topicPartition topic.Partition) {
@@ -55,7 +59,7 @@ func (sub *TopicSubscriber) startProcessors() {
wg.Done()
}()
glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
err := sub.onEachPartition(assigned)
err := sub.onEachPartition(assigned, stopChan)
if err != nil {
glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
} else {
@@ -64,7 +68,13 @@ func (sub *TopicSubscriber) startProcessors() {
}(assigned.PartitionAssignment, topicPartition)
}
if unAssignment := message.GetUnAssignment(); unAssignment != nil {
topicPartition := topic.FromPbPartition(unAssignment.Partition)
sub.activeProcessorsLock.Lock()
if processor, found := sub.activeProcessors[topicPartition]; found {
close(processor.stopCh)
delete(sub.activeProcessors, topicPartition)
}
sub.activeProcessorsLock.Unlock()
}
}