add CreateTopic API

This commit is contained in:
chrislu
2023-09-24 14:22:11 -07:00
parent 0f8168c0c9
commit 0361c321b4
9 changed files with 714 additions and 400 deletions

View File

@@ -7,7 +7,8 @@ import (
)
const (
MaxPartitionCount = 8 * 9 * 5 * 7 //2520
MaxPartitionCount = 8 * 9 * 5 * 7 //2520
LockBrokerBalancer = "broker_balancer"
)
type Balancer struct {

View File

@@ -7,6 +7,26 @@ import (
"google.golang.org/grpc/status"
)
func (broker *MessageQueueBroker) CreateTopic(ctx context.Context, request *mq_pb.CreateTopicRequest) (resp *mq_pb.CreateTopicResponse, err error) {
if broker.currentBalancer == "" {
return nil, status.Errorf(codes.Unavailable, "no balancer")
}
if !broker.lockAsBalancer.IsLocked() {
proxyErr := broker.withBrokerClient(false, broker.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.CreateTopic(ctx, request)
return nil
})
if proxyErr != nil {
return nil, proxyErr
}
return resp, err
}
ret := &mq_pb.CreateTopicResponse{}
ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true)
return ret, err
}
// FindTopicBrokers returns the brokers that are serving the topic
//
// 1. lock the topic

View File

@@ -70,7 +70,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
glog.V(1).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
mqBroker.lockAsBalancer = lockClient.StartLock(LockBrokerBalancer, self)
mqBroker.lockAsBalancer = lockClient.StartLock(balancer.LockBrokerBalancer, self)
for {
err := mqBroker.BrokerConnectToBalancer(self)
if err != nil {

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/balancer"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -11,17 +12,13 @@ import (
"time"
)
const (
LockBrokerBalancer = "broker_balancer"
)
// BrokerConnectToBalancer connects to the broker balancer and sends stats
func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
// find the lock owner
var brokerBalancer string
err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
Name: LockBrokerBalancer,
Name: balancer.LockBrokerBalancer,
})
if err != nil {
return err
@@ -32,6 +29,7 @@ func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
if err != nil {
return err
}
broker.currentBalancer = pb.ServerAddress(brokerBalancer)
glog.V(1).Infof("broker %s found balancer %s", self, brokerBalancer)