rename to lookup

This commit is contained in:
chrislu
2023-08-27 18:59:04 -07:00
parent c9caf33119
commit dbcba75271
14 changed files with 432 additions and 515 deletions

View File

@@ -90,36 +90,36 @@ func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq
// createOrUpdateTopicPartitions creates the topic partitions on the broker
// 1. check
func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment) (err error) {
func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignments []*mq_pb.BrokerPartitionAssignment) (err error) {
// create or update each partition
if prevAssignment == nil {
if prevAssignments == nil {
broker.createOrUpdateTopicPartition(topic, nil)
} else {
for _, partitionAssignment := range prevAssignment.BrokerPartitions {
broker.createOrUpdateTopicPartition(topic, partitionAssignment)
for _, brokerPartitionAssignment := range prevAssignments {
broker.createOrUpdateTopicPartition(topic, brokerPartitionAssignment)
}
}
return nil
}
func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (newAssignment *mq_pb.BrokerPartitionsAssignment) {
func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (newAssignment *mq_pb.BrokerPartitionAssignment) {
shouldCreate := broker.confirmBrokerPartitionAssignment(topic, oldAssignment)
if !shouldCreate {
}
return
}
func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (shouldCreate bool) {
func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (shouldCreate bool) {
if oldAssignment == nil {
return true
}
for _, b := range oldAssignment.FollowerBrokers {
pb.WithBrokerGrpcClient(false, b, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{
Namespace: string(topic.Namespace),
Topic: topic.Name,
BrokerPartitionsAssignment: oldAssignment,
ShouldCancelIfNotMatch: true,
Namespace: string(topic.Namespace),
Topic: topic.Name,
BrokerPartitionAssignment: oldAssignment,
ShouldCancelIfNotMatch: true,
})
if err != nil {
shouldCreate = true

View File

@@ -17,8 +17,8 @@ import (
// 2.2 if the topic is found, return the brokers
//
// 3. unlock the topic
func (broker *MessageQueueBroker) FindTopicBrokers(c context.Context, request *mq_pb.FindTopicBrokersRequest) (*mq_pb.FindTopicBrokersResponse, error) {
ret := &mq_pb.FindTopicBrokersResponse{}
func (broker *MessageQueueBroker) FindTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (*mq_pb.LookupTopicBrokersResponse, error) {
ret := &mq_pb.LookupTopicBrokersResponse{}
// TODO lock the topic
// find the topic partitions on the filer
@@ -27,6 +27,19 @@ func (broker *MessageQueueBroker) FindTopicBrokers(c context.Context, request *m
// create the topic
// if the request is_for_subscribe
// return error not found
// t := topic.FromPbTopic(request.Topic)
ret.Topic = request.Topic
ret.BrokerPartitionAssignments = []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:17777",
FollowerBrokers: []string{"localhost:17777"},
Partition: &mq_pb.Partition{
RingSize: MaxPartitionCount,
RangeStart: 0,
RangeStop: MaxPartitionCount,
},
},
}
return ret, nil
}

View File

@@ -120,8 +120,8 @@ func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, reque
ret := &mq_pb.AssignTopicPartitionsResponse{}
self := pb.ServerAddress(fmt.Sprintf("%s:%d", broker.option.Ip, broker.option.Port))
for _, partition := range request.TopicPartitionsAssignment.BrokerPartitions {
localPartiton := topic.FromPbBrokerPartitionsAssignment(self, partition)
for _, brokerPartition := range request.BrokerPartitionAssignments {
localPartiton := topic.FromPbBrokerPartitionAssignment(self, brokerPartition)
broker.localTopicManager.AddTopicPartition(
topic.FromPbTopic(request.Topic),
localPartiton)

View File

@@ -27,15 +27,11 @@ func (p LocalPartition) Subscribe(clientName string, startReadTime time.Time, ea
}, eachMessageFn)
}
func FromPbBrokerPartitionsAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionsAssignment) *LocalPartition {
func FromPbBrokerPartitionAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionAssignment) *LocalPartition {
isLeaer := assignment.LeaderBroker == string(self)
localPartition := &LocalPartition{
Partition: Partition{
RangeStart: assignment.PartitionStart,
RangeStop: assignment.PartitionStop,
RingSize: PartitionCount,
},
isLeader: isLeaer,
Partition: FromPbPartition(assignment.Partition),
isLeader: isLeaer,
}
if !isLeaer {
return localPartition