balancer works
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
package balancer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
cmap "github.com/orcaman/concurrent-map/v2"
|
||||
)
|
||||
|
||||
@@ -10,24 +9,10 @@ type Balancer struct {
|
||||
}
|
||||
type BrokerStats struct {
|
||||
TopicPartitionCount int32
|
||||
MessageCount int64
|
||||
BytesCount int64
|
||||
ConsumerCount int32
|
||||
CpuUsagePercent int32
|
||||
}
|
||||
|
||||
type TopicPartition struct {
|
||||
Topic string
|
||||
RangeStart int32
|
||||
RangeStop int32
|
||||
}
|
||||
|
||||
type TopicPartitionStats struct {
|
||||
TopicPartition
|
||||
Throughput int64
|
||||
ConsumerCount int64
|
||||
TopicPartitionCount int64
|
||||
}
|
||||
|
||||
func NewBalancer() *Balancer {
|
||||
return &Balancer{
|
||||
Brokers: cmap.New[*BrokerStats](),
|
||||
@@ -37,7 +22,3 @@ func NewBalancer() *Balancer {
|
||||
func NewBrokerStats() *BrokerStats {
|
||||
return &BrokerStats{}
|
||||
}
|
||||
|
||||
func (tp *TopicPartition) String() string {
|
||||
return fmt.Sprintf("%v-%04d-%04d", tp.Topic, tp.RangeStart, tp.RangeStop)
|
||||
}
|
||||
|
||||
@@ -1,39 +1,50 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/balancer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// BrokerConnectToBalancer receives connections from brokers and collects stats
|
||||
func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessaging_ConnectToBalancerServer) error {
|
||||
if !broker.lockAsBalancer.IsLocked() {
|
||||
return status.Errorf(codes.Unavailable, "not current broker balancer")
|
||||
}
|
||||
req, err := stream.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
response := &mq_pb.ConnectToBalancerResponse{}
|
||||
|
||||
// process init message
|
||||
initMessage := req.GetInit()
|
||||
brokerStats := balancer.NewBrokerStats()
|
||||
if initMessage != nil {
|
||||
broker.Balancer.Brokers.Set(initMessage.Broker, brokerStats)
|
||||
} else {
|
||||
response.Error = "balancer init message is empty"
|
||||
return stream.Send(response)
|
||||
return status.Errorf(codes.InvalidArgument, "balancer init message is empty")
|
||||
}
|
||||
defer func() {
|
||||
broker.Balancer.Brokers.Remove(initMessage.Broker)
|
||||
}()
|
||||
stream.Send(response)
|
||||
|
||||
// process stats message
|
||||
for {
|
||||
req, err := stream.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !broker.lockAsBalancer.IsLocked() {
|
||||
return status.Errorf(codes.Unavailable, "not current broker balancer")
|
||||
}
|
||||
if receivedStats := req.GetStats(); receivedStats != nil {
|
||||
brokerStats.TopicPartitionCount = receivedStats.TopicPartitionCount
|
||||
brokerStats.MessageCount = receivedStats.MessageCount
|
||||
brokerStats.BytesCount = receivedStats.BytesCount
|
||||
brokerStats.ConsumerCount = receivedStats.ConsumerCount
|
||||
brokerStats.CpuUsagePercent = receivedStats.CpuUsagePercent
|
||||
|
||||
glog.V(3).Infof("broker %s stats: %+v", initMessage.Broker, brokerStats)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/balancer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"time"
|
||||
@@ -36,6 +38,7 @@ type MessageQueueBroker struct {
|
||||
currentFiler pb.ServerAddress
|
||||
localTopicManager *topic.LocalTopicManager
|
||||
Balancer *balancer.Balancer
|
||||
lockAsBalancer *cluster.LiveLock
|
||||
}
|
||||
|
||||
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
|
||||
@@ -57,6 +60,25 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
|
||||
mqBroker.OnBrokerUpdate(newNode, time.Now())
|
||||
}
|
||||
|
||||
// keep connecting to balancer
|
||||
go func() {
|
||||
for mqBroker.currentFiler == "" {
|
||||
time.Sleep(time.Millisecond * 237)
|
||||
}
|
||||
self := fmt.Sprintf("%s:%d", option.Ip, option.Port)
|
||||
glog.V(1).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
|
||||
|
||||
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
|
||||
mqBroker.lockAsBalancer = lockClient.StartLock(LockBrokerBalancer, self)
|
||||
for {
|
||||
err := mqBroker.BrokerConnectToBalancer(self)
|
||||
if err != nil {
|
||||
fmt.Printf("BrokerConnectToBalancer: %v\n", err)
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}()
|
||||
|
||||
return mqBroker, nil
|
||||
}
|
||||
|
||||
|
||||
74
weed/mq/broker/broker_stats.go
Normal file
74
weed/mq/broker/broker_stats.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"math/rand"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
LockBrokerBalancer = "broker_balancer"
|
||||
)
|
||||
|
||||
// BrokerConnectToBalancer connects to the broker balancer and sends stats
|
||||
func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
|
||||
// find the lock owner
|
||||
var brokerBalancer string
|
||||
err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
resp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
|
||||
Name: LockBrokerBalancer,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
brokerBalancer = resp.Owner
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
glog.V(1).Infof("broker %s found balancer %s", self, brokerBalancer)
|
||||
|
||||
// connect to the lock owner
|
||||
err = pb.WithBrokerGrpcClient(false, brokerBalancer, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
stream, err := client.ConnectToBalancer(context.Background())
|
||||
if err != nil {
|
||||
return fmt.Errorf("connect to balancer %v: %v", brokerBalancer, err)
|
||||
}
|
||||
defer stream.CloseSend()
|
||||
err = stream.Send(&mq_pb.ConnectToBalancerRequest{
|
||||
Message: &mq_pb.ConnectToBalancerRequest_Init{
|
||||
Init: &mq_pb.ConnectToBalancerRequest_InitMessage{
|
||||
Broker: self,
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("send init message: %v", err)
|
||||
}
|
||||
|
||||
for {
|
||||
stats := broker.localTopicManager.CollectStats(time.Second * 5)
|
||||
err = stream.Send(&mq_pb.ConnectToBalancerRequest{
|
||||
Message: &mq_pb.ConnectToBalancerRequest_Stats{
|
||||
Stats: stats,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("send stats message: %v", err)
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -2,6 +2,9 @@ package topic
|
||||
|
||||
import (
|
||||
cmap "github.com/orcaman/concurrent-map/v2"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"github.com/shirou/gopsutil/v3/cpu"
|
||||
"time"
|
||||
)
|
||||
|
||||
// LocalTopicManager manages topics on local broker
|
||||
@@ -53,3 +56,22 @@ func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Pa
|
||||
}
|
||||
return localTopic.removePartition(partition)
|
||||
}
|
||||
|
||||
func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.BrokerStats {
|
||||
stats := &mq_pb.BrokerStats{}
|
||||
manager.topics.IterCb(func(topic string, localTopic *LocalTopic) {
|
||||
for _, localPartition := range localTopic.Partitions {
|
||||
stats.TopicPartitionCount++
|
||||
stats.ConsumerCount += localPartition.ConsumerCount
|
||||
}
|
||||
})
|
||||
|
||||
// collect current broker's cpu usage
|
||||
usages, err := cpu.Percent(duration, false)
|
||||
if err == nil && len(usages) > 0 {
|
||||
stats.CpuUsagePercent = int32(usages[0])
|
||||
}
|
||||
|
||||
return stats
|
||||
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ type LocalPartition struct {
|
||||
isLeader bool
|
||||
FollowerBrokers []pb.ServerAddress
|
||||
logBuffer *log_buffer.LogBuffer
|
||||
ConsumerCount int32
|
||||
}
|
||||
|
||||
func NewLocalPartition(topic Topic, partition Partition, isLeader bool, followerBrokers []pb.ServerAddress) *LocalPartition {
|
||||
|
||||
Reference in New Issue
Block a user