simplify
This commit is contained in:
@@ -14,7 +14,6 @@ import (
|
||||
// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
|
||||
func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
|
||||
ret := &mq_pb.AssignTopicPartitionsResponse{}
|
||||
self := pb.ServerAddress(fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port))
|
||||
|
||||
// drain existing topic partition subscriptions
|
||||
for _, assignment := range request.BrokerPartitionAssignments {
|
||||
@@ -27,7 +26,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
|
||||
} else {
|
||||
var localPartition *topic.LocalPartition
|
||||
if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
|
||||
localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
|
||||
localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
|
||||
b.localTopicManager.AddTopicPartition(t, localPartition)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,7 +88,7 @@ func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition
|
||||
self := b.option.BrokerAddress()
|
||||
for _, assignment := range conf.BrokerPartitionAssignments {
|
||||
if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) {
|
||||
localPartition = topic.FromPbBrokerPartitionAssignment(b.option.BrokerAddress(), partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
|
||||
localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
|
||||
b.localTopicManager.AddTopicPartition(t, localPartition)
|
||||
isGenerated = true
|
||||
break
|
||||
|
||||
@@ -21,8 +21,6 @@ func TestBalanceTopicPartitionOnBrokers(t *testing.T) {
|
||||
Topic: topic.Topic{Namespace: "topic1", Name: "topic1"},
|
||||
Partition: topic.Partition{RangeStart: 0, RangeStop: 512, RingSize: 1024},
|
||||
},
|
||||
ConsumerCount: 1,
|
||||
IsLeader: true,
|
||||
})
|
||||
broker2Stats := &BrokerStats{
|
||||
TopicPartitionCount: 2,
|
||||
@@ -35,16 +33,12 @@ func TestBalanceTopicPartitionOnBrokers(t *testing.T) {
|
||||
Topic: topic.Topic{Namespace: "topic1", Name: "topic1"},
|
||||
Partition: topic.Partition{RangeStart: 512, RangeStop: 1024, RingSize: 1024},
|
||||
},
|
||||
ConsumerCount: 1,
|
||||
IsLeader: true,
|
||||
})
|
||||
broker2Stats.TopicPartitionStats.Set("topic2:0", &TopicPartitionStats{
|
||||
TopicPartition: topic.TopicPartition{
|
||||
Topic: topic.Topic{Namespace: "topic2", Name: "topic2"},
|
||||
Partition: topic.Partition{RangeStart: 0, RangeStop: 1024, RingSize: 1024},
|
||||
},
|
||||
ConsumerCount: 1,
|
||||
IsLeader: true,
|
||||
})
|
||||
brokers.Set("broker1", broker1Stats)
|
||||
brokers.Set("broker2", broker2Stats)
|
||||
|
||||
@@ -9,15 +9,16 @@ import (
|
||||
|
||||
type BrokerStats struct {
|
||||
TopicPartitionCount int32
|
||||
ConsumerCount int32
|
||||
PublisherCount int32
|
||||
SubscriberCount int32
|
||||
CpuUsagePercent int32
|
||||
TopicPartitionStats cmap.ConcurrentMap[string, *TopicPartitionStats] // key: topic_partition
|
||||
Topics []topic.Topic
|
||||
}
|
||||
type TopicPartitionStats struct {
|
||||
topic.TopicPartition
|
||||
ConsumerCount int32
|
||||
IsLeader bool
|
||||
PublisherCount int32
|
||||
SubscriberCount int32
|
||||
}
|
||||
|
||||
func NewBrokerStats() *BrokerStats {
|
||||
@@ -26,15 +27,15 @@ func NewBrokerStats() *BrokerStats {
|
||||
}
|
||||
}
|
||||
func (bs *BrokerStats) String() string {
|
||||
return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}",
|
||||
bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items())
|
||||
return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, Publishers:%d, Subscribers:%d CpuUsagePercent:%d, Stats:%+v}",
|
||||
bs.TopicPartitionCount, bs.PublisherCount, bs.SubscriberCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items())
|
||||
}
|
||||
|
||||
func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
|
||||
bs.TopicPartitionCount = int32(len(stats.Stats))
|
||||
bs.CpuUsagePercent = stats.CpuUsagePercent
|
||||
|
||||
var consumerCount int32
|
||||
var publisherCount, subscriberCount int32
|
||||
currentTopicPartitions := bs.TopicPartitionStats.Items()
|
||||
for _, topicPartitionStats := range stats.Stats {
|
||||
tps := &TopicPartitionStats{
|
||||
@@ -47,10 +48,11 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
|
||||
UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs,
|
||||
},
|
||||
},
|
||||
ConsumerCount: topicPartitionStats.ConsumerCount,
|
||||
IsLeader: topicPartitionStats.IsLeader,
|
||||
PublisherCount: topicPartitionStats.PublisherCount,
|
||||
SubscriberCount: topicPartitionStats.SubscriberCount,
|
||||
}
|
||||
consumerCount += topicPartitionStats.ConsumerCount
|
||||
publisherCount += topicPartitionStats.PublisherCount
|
||||
subscriberCount += topicPartitionStats.SubscriberCount
|
||||
key := tps.TopicPartition.String()
|
||||
bs.TopicPartitionStats.Set(key, tps)
|
||||
delete(currentTopicPartitions, key)
|
||||
@@ -59,8 +61,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
|
||||
for key := range currentTopicPartitions {
|
||||
bs.TopicPartitionStats.Remove(key)
|
||||
}
|
||||
bs.ConsumerCount = consumerCount
|
||||
|
||||
bs.PublisherCount = publisherCount
|
||||
bs.SubscriberCount = subscriberCount
|
||||
}
|
||||
|
||||
func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) {
|
||||
@@ -74,8 +76,8 @@ func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Parti
|
||||
UnixTimeNs: partition.UnixTimeNs,
|
||||
},
|
||||
},
|
||||
ConsumerCount: 0,
|
||||
IsLeader: true,
|
||||
PublisherCount: 0,
|
||||
SubscriberCount: 0,
|
||||
}
|
||||
key := tps.TopicPartition.String()
|
||||
if isAdd {
|
||||
|
||||
@@ -14,8 +14,7 @@ func (balancer *Balancer) RepairTopics() []BalanceAction {
|
||||
}
|
||||
|
||||
type TopicPartitionInfo struct {
|
||||
Leader string
|
||||
Followers []string
|
||||
Broker string
|
||||
}
|
||||
|
||||
// RepairMissingTopicPartitions check the stats of all brokers,
|
||||
@@ -38,11 +37,7 @@ func RepairMissingTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStat
|
||||
tpi = &TopicPartitionInfo{}
|
||||
topicPartitionToInfo[topicPartitionStat.Partition] = tpi
|
||||
}
|
||||
if topicPartitionStat.IsLeader {
|
||||
tpi.Leader = broker
|
||||
} else {
|
||||
tpi.Followers = append(tpi.Followers, broker)
|
||||
}
|
||||
tpi.Broker = broker
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -96,8 +96,9 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br
|
||||
Namespace: string(localTopic.Namespace),
|
||||
Name: localTopic.Name,
|
||||
},
|
||||
Partition: localPartition.Partition.ToPbPartition(),
|
||||
ConsumerCount: localPartition.ConsumerCount,
|
||||
Partition: localPartition.Partition.ToPbPartition(),
|
||||
PublisherCount: int32(localPartition.Publishers.Size()),
|
||||
SubscriberCount: int32(localPartition.Subscribers.Size()),
|
||||
}
|
||||
// fmt.Printf("collect topic %+v partition %+v\n", topicPartition, localPartition.Partition)
|
||||
}
|
||||
|
||||
@@ -22,10 +22,7 @@ type LocalPartition struct {
|
||||
ListenersCond *sync.Cond
|
||||
|
||||
Partition
|
||||
isLeader bool
|
||||
FollowerBrokers []pb.ServerAddress
|
||||
LogBuffer *log_buffer.LogBuffer
|
||||
ConsumerCount int32
|
||||
Publishers *LocalPartitionPublishers
|
||||
Subscribers *LocalPartitionSubscribers
|
||||
FollowerId int32
|
||||
@@ -37,11 +34,9 @@ type LocalPartition struct {
|
||||
|
||||
var TIME_FORMAT = "2006-01-02-15-04-05"
|
||||
|
||||
func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
|
||||
func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
|
||||
lp := &LocalPartition{
|
||||
Partition: partition,
|
||||
isLeader: isLeader,
|
||||
FollowerBrokers: followerBrokers,
|
||||
Publishers: NewLocalPartitionPublishers(),
|
||||
Subscribers: NewLocalPartitionSubscribers(),
|
||||
}
|
||||
@@ -121,15 +116,6 @@ func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.Message
|
||||
return p.LogBuffer.GetEarliestPosition()
|
||||
}
|
||||
|
||||
func FromPbBrokerPartitionAssignment(self pb.ServerAddress, partition Partition, assignment *mq_pb.BrokerPartitionAssignment, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
|
||||
isLeader := assignment.LeaderBroker == string(self)
|
||||
followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers))
|
||||
for i, followerBroker := range assignment.FollowerBrokers {
|
||||
followers[i] = pb.ServerAddress(followerBroker)
|
||||
}
|
||||
return NewLocalPartition(partition, isLeader, followers, logFlushFn, readFromDiskFn)
|
||||
}
|
||||
|
||||
func (p *LocalPartition) closePublishers() {
|
||||
p.Publishers.SignalShutdown()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user