@@ -8,8 +8,8 @@ import (
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// ConnectToPubCoordinator receives connections from brokers and collects stats
|
||||
func (broker *MessageQueueBroker) ConnectToPubCoordinator(stream mq_pb.SeaweedMessaging_ConnectToPubCoordinatorServer) error {
|
||||
// ConnectToBalancer receives connections from brokers and collects stats
|
||||
func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessaging_ConnectToBalancerServer) error {
|
||||
if !broker.lockAsBalancer.IsLocked() {
|
||||
return status.Errorf(codes.Unavailable, "not current broker balancer")
|
||||
}
|
||||
|
||||
@@ -72,9 +72,9 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
|
||||
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
|
||||
mqBroker.lockAsBalancer = lockClient.StartLock(balancer.LockBrokerBalancer, self)
|
||||
for {
|
||||
err := mqBroker.BrokerConnectToPubCoordinator(self)
|
||||
err := mqBroker.BrokerConnectToBalancer(self)
|
||||
if err != nil {
|
||||
fmt.Printf("BrokerConnectToPubCoordinator: %v\n", err)
|
||||
fmt.Printf("BrokerConnectToBalancer: %v\n", err)
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
@@ -12,8 +12,8 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// BrokerConnectToPubCoordinator connects to the broker balancer and sends stats
|
||||
func (broker *MessageQueueBroker) BrokerConnectToPubCoordinator(self string) error {
|
||||
// BrokerConnectToBalancer connects to the broker balancer and sends stats
|
||||
func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
|
||||
// find the lock owner
|
||||
var brokerBalancer string
|
||||
err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
@@ -35,14 +35,14 @@ func (broker *MessageQueueBroker) BrokerConnectToPubCoordinator(self string) err
|
||||
|
||||
// connect to the lock owner
|
||||
err = pb.WithBrokerGrpcClient(false, brokerBalancer, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
stream, err := client.ConnectToPubCoordinator(context.Background())
|
||||
stream, err := client.ConnectToBalancer(context.Background())
|
||||
if err != nil {
|
||||
return fmt.Errorf("connect to balancer %v: %v", brokerBalancer, err)
|
||||
}
|
||||
defer stream.CloseSend()
|
||||
err = stream.Send(&mq_pb.ConnectToPubCoordinatorRequest{
|
||||
Message: &mq_pb.ConnectToPubCoordinatorRequest_Init{
|
||||
Init: &mq_pb.ConnectToPubCoordinatorRequest_InitMessage{
|
||||
err = stream.Send(&mq_pb.ConnectToBalancerRequest{
|
||||
Message: &mq_pb.ConnectToBalancerRequest_Init{
|
||||
Init: &mq_pb.ConnectToBalancerRequest_InitMessage{
|
||||
Broker: self,
|
||||
},
|
||||
},
|
||||
@@ -53,8 +53,8 @@ func (broker *MessageQueueBroker) BrokerConnectToPubCoordinator(self string) err
|
||||
|
||||
for {
|
||||
stats := broker.localTopicManager.CollectStats(time.Second * 5)
|
||||
err = stream.Send(&mq_pb.ConnectToPubCoordinatorRequest{
|
||||
Message: &mq_pb.ConnectToPubCoordinatorRequest_Stats{
|
||||
err = stream.Send(&mq_pb.ConnectToBalancerRequest{
|
||||
Message: &mq_pb.ConnectToBalancerRequest_Stats{
|
||||
Stats: stats,
|
||||
},
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user