add broker leader
This commit is contained in:
20
weed/mq/balancer/allocate.go
Normal file
20
weed/mq/balancer/allocate.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package balancer
|
||||
|
||||
import (
|
||||
cmap "github.com/orcaman/concurrent-map/v2"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
)
|
||||
|
||||
func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], partitionCount int) (assignments []*mq_pb.BrokerPartitionAssignment) {
|
||||
return []*mq_pb.BrokerPartitionAssignment{
|
||||
{
|
||||
LeaderBroker: "localhost:17777",
|
||||
FollowerBrokers: []string{"localhost:17777"},
|
||||
Partition: &mq_pb.Partition{
|
||||
RingSize: MaxPartitionCount,
|
||||
RangeStart: 0,
|
||||
RangeStop: MaxPartitionCount,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
62
weed/mq/balancer/allocate_test.go
Normal file
62
weed/mq/balancer/allocate_test.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package balancer
|
||||
|
||||
import (
|
||||
cmap "github.com/orcaman/concurrent-map/v2"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func Test_allocateOneBroker(t *testing.T) {
|
||||
brokers := cmap.New[*BrokerStats]()
|
||||
brokers.SetIfAbsent("localhost:17777", &BrokerStats{
|
||||
TopicPartitionCount: 0,
|
||||
ConsumerCount: 0,
|
||||
CpuUsagePercent: 0,
|
||||
})
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
wantAssignments []*mq_pb.BrokerPartitionAssignment
|
||||
}{
|
||||
{
|
||||
name: "test only one broker",
|
||||
args: args{
|
||||
brokers: brokers,
|
||||
partitionCount: 6,
|
||||
},
|
||||
wantAssignments: []*mq_pb.BrokerPartitionAssignment{
|
||||
{
|
||||
LeaderBroker: "localhost:17777",
|
||||
FollowerBrokers: []string{"localhost:17777"},
|
||||
Partition: &mq_pb.Partition{
|
||||
RingSize: MaxPartitionCount,
|
||||
RangeStart: 0,
|
||||
RangeStop: MaxPartitionCount,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
testThem(t, tests)
|
||||
}
|
||||
|
||||
type args struct {
|
||||
brokers cmap.ConcurrentMap[string, *BrokerStats]
|
||||
partitionCount int
|
||||
}
|
||||
|
||||
func testThem(t *testing.T, tests []struct {
|
||||
name string
|
||||
args args
|
||||
wantAssignments []*mq_pb.BrokerPartitionAssignment
|
||||
}) {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if gotAssignments := allocateTopicPartitions(tt.args.brokers, tt.args.partitionCount); !reflect.DeepEqual(gotAssignments, tt.wantAssignments) {
|
||||
t.Errorf("allocateTopicPartitions() = %v, want %v", gotAssignments, tt.wantAssignments)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,26 +0,0 @@
|
||||
package balancer
|
||||
|
||||
import "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
|
||||
func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) ([]*mq_pb.BrokerPartitionAssignment, error) {
|
||||
// TODO lock the topic
|
||||
|
||||
// find the topic partitions on the filer
|
||||
// if the topic is not found
|
||||
// if the request is_for_publish
|
||||
// create the topic
|
||||
// if the request is_for_subscribe
|
||||
// return error not found
|
||||
// t := topic.FromPbTopic(request.Topic)
|
||||
return []*mq_pb.BrokerPartitionAssignment{
|
||||
{
|
||||
LeaderBroker: "localhost:17777",
|
||||
FollowerBrokers: []string{"localhost:17777"},
|
||||
Partition: &mq_pb.Partition{
|
||||
RingSize: MaxPartitionCount,
|
||||
RangeStart: 0,
|
||||
RangeStop: MaxPartitionCount,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
MaxPartitionCount = 1024
|
||||
MaxPartitionCount = 8 * 9 * 5 * 7 //2520
|
||||
)
|
||||
|
||||
type Balancer struct {
|
||||
@@ -36,6 +36,7 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
|
||||
RangeStop: topicPartitionStats.Partition.RangeStop,
|
||||
},
|
||||
ConsumerCount: topicPartitionStats.ConsumerCount,
|
||||
IsLeader: topicPartitionStats.IsLeader,
|
||||
}
|
||||
consumerCount += topicPartitionStats.ConsumerCount
|
||||
key := tps.TopicPartition.String()
|
||||
@@ -60,6 +61,7 @@ type TopicPartition struct {
|
||||
type TopicPartitionStats struct {
|
||||
TopicPartition
|
||||
ConsumerCount int32
|
||||
IsLeader bool
|
||||
}
|
||||
|
||||
func NewBalancer() *Balancer {
|
||||
|
||||
43
weed/mq/balancer/lookup.go
Normal file
43
weed/mq/balancer/lookup.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package balancer
|
||||
|
||||
import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
)
|
||||
|
||||
func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
|
||||
// find existing topic partition assignments
|
||||
for brokerStatsItem := range b.Brokers.IterBuffered() {
|
||||
broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
|
||||
for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() {
|
||||
topicPartitionStat := topicPartitionStatsItem.Val
|
||||
if topicPartitionStat.TopicPartition.Namespace == topic.Namespace &&
|
||||
topicPartitionStat.TopicPartition.Topic == topic.Name {
|
||||
assignment := &mq_pb.BrokerPartitionAssignment{
|
||||
Partition: &mq_pb.Partition{
|
||||
RingSize: MaxPartitionCount,
|
||||
RangeStart: topicPartitionStat.RangeStart,
|
||||
RangeStop: topicPartitionStat.RangeStop,
|
||||
},
|
||||
}
|
||||
if topicPartitionStat.IsLeader {
|
||||
assignment.LeaderBroker = broker
|
||||
} else {
|
||||
assignment.FollowerBrokers = append(assignment.FollowerBrokers, broker)
|
||||
}
|
||||
assignments = append(assignments, assignment)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(assignments) > 0 {
|
||||
return assignments, nil
|
||||
}
|
||||
|
||||
// find the topic partitions on the filer
|
||||
// if the topic is not found
|
||||
// if the request is_for_publish
|
||||
// create the topic
|
||||
// if the request is_for_subscribe
|
||||
// return error not found
|
||||
// t := topic.FromPbTopic(request.Topic)
|
||||
return allocateTopicPartitions(b.Brokers, 6), nil
|
||||
}
|
||||
@@ -1,81 +0,0 @@
|
||||
package topic_allocation
|
||||
|
||||
import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"modernc.org/mathutil"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultBrokerCount = 4
|
||||
)
|
||||
|
||||
// AllocateBrokersForTopicPartitions allocate brokers for a topic's all partitions
|
||||
func AllocateBrokersForTopicPartitions(t topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment, candidateBrokers []pb.ServerAddress) (assignment *mq_pb.TopicPartitionsAssignment, err error) {
|
||||
// create a previous assignment if not exists
|
||||
if prevAssignment == nil || len(prevAssignment.BrokerPartitions) == 0 {
|
||||
prevAssignment = &mq_pb.TopicPartitionsAssignment{
|
||||
PartitionCount: topic.PartitionCount,
|
||||
}
|
||||
partitionCountForEachBroker := topic.PartitionCount / DefaultBrokerCount
|
||||
for i := 0; i < DefaultBrokerCount; i++ {
|
||||
prevAssignment.BrokerPartitions = append(prevAssignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{
|
||||
PartitionStart: int32(i * partitionCountForEachBroker),
|
||||
PartitionStop: mathutil.MaxInt32(int32((i+1)*partitionCountForEachBroker), topic.PartitionCount),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// create a new assignment
|
||||
assignment = &mq_pb.TopicPartitionsAssignment{
|
||||
PartitionCount: prevAssignment.PartitionCount,
|
||||
}
|
||||
|
||||
// allocate partitions for each partition range
|
||||
for _, brokerPartition := range prevAssignment.BrokerPartitions {
|
||||
// allocate partitions for each partition range
|
||||
leader, followers, err := allocateBrokersForOneTopicPartition(t, brokerPartition, candidateBrokers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
followerBrokers := make([]string, len(followers))
|
||||
for i, follower := range followers {
|
||||
followerBrokers[i] = string(follower)
|
||||
}
|
||||
|
||||
assignment.BrokerPartitions = append(assignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{
|
||||
PartitionStart: brokerPartition.PartitionStart,
|
||||
PartitionStop: brokerPartition.PartitionStop,
|
||||
LeaderBroker: string(leader),
|
||||
FollowerBrokers: followerBrokers,
|
||||
})
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func allocateBrokersForOneTopicPartition(t topic.Topic, brokerPartition *mq_pb.BrokerPartitionsAssignment, candidateBrokers []pb.ServerAddress) (leader pb.ServerAddress, followers []pb.ServerAddress, err error) {
|
||||
// allocate leader
|
||||
leader, err = allocateLeaderForOneTopicPartition(t, brokerPartition, candidateBrokers)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// allocate followers
|
||||
followers, err = allocateFollowersForOneTopicPartition(t, brokerPartition, candidateBrokers)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func allocateFollowersForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (followers []pb.ServerAddress, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func allocateLeaderForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (leader pb.ServerAddress, err error) {
|
||||
return
|
||||
}
|
||||
Reference in New Issue
Block a user