Squashed commit of the following:
commit 32f4b1a13057d56b6de487cdb80ff7c205af01a6
Author: chrislu <chris.lu@gmail.com>
Date: Sun Aug 20 22:52:19 2023 -0700
fix compilation
commit e77ad33b7ca0423138fbae26a4433b60923a9588
Author: chrislu <chris.lu@gmail.com>
Date: Sun Aug 20 22:46:44 2023 -0700
pub
commit f431f30cc7ca277ca299e3cd118c05537fb9f5c3
Author: chrislu <chris.lu@gmail.com>
Date: Sun Aug 20 13:27:39 2023 -0700
fix generic type
commit 4e9dcb18293fd1e3e306e2dceb995dfd67a35e1d
Merge: 30f942580 16e3f2d52
Author: chrislu <chris.lu@gmail.com>
Date: Sun Aug 20 12:47:14 2023 -0700
Merge branch 'master' into pubsub
commit 30f942580ad1bb32ae94aade2e3a21ec3ab63e21
Author: chrislu <chris.lu@gmail.com>
Date: Sun Aug 20 11:10:58 2023 -0700
wip
commit f8b00980bc2f3879bb43decffd9a08d842f196f2
Author: chrislu <chris.lu@gmail.com>
Date: Tue Jul 25 09:14:35 2023 -0700
add design document
commit 08d2bebe42a26ebc39f1542f54d99e73620727dd
Author: chrislu <chris.lu@gmail.com>
Date: Tue Jul 25 09:14:06 2023 -0700
minor
commit bcfa7982b262a40fcdce6fc6613fad2ce07c13da
Author: chrislu <chris.lu@gmail.com>
Date: Tue Jul 25 09:13:49 2023 -0700
rename
This commit is contained in:
@@ -4,7 +4,7 @@ import (
|
||||
"context"
|
||||
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
@@ -12,6 +12,10 @@ import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
MaxPartitionCount = 1024
|
||||
)
|
||||
|
||||
func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
|
||||
ret := &mq_pb.FindBrokerLeaderResponse{}
|
||||
err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
|
||||
@@ -31,21 +35,9 @@ func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *m
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) {
|
||||
ret := &mq_pb.CheckSegmentStatusResponse{}
|
||||
// TODO add in memory active segment
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) {
|
||||
ret := &mq_pb.CheckBrokerLoadResponse{}
|
||||
// TODO read broker's load
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) {
|
||||
ret := &mq_pb.AssignSegmentBrokersResponse{}
|
||||
segment := mq.FromPbSegment(request.Segment)
|
||||
segment := topic.FromPbSegment(request.Segment)
|
||||
|
||||
// check existing segment locations on filer
|
||||
existingBrokers, err := broker.checkSegmentOnFiler(segment)
|
||||
@@ -84,7 +76,92 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) {
|
||||
func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) {
|
||||
ret := &mq_pb.CheckSegmentStatusResponse{}
|
||||
// TODO add in memory active segment
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) {
|
||||
ret := &mq_pb.CheckBrokerLoadResponse{}
|
||||
// TODO read broker's load
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// FindTopicBrokers returns the brokers that are serving the topic
|
||||
//
|
||||
// 1. lock the topic
|
||||
//
|
||||
// 2. find the topic partitions on the filer
|
||||
// 2.1 if the topic is not found, return error
|
||||
// 2.2 if the request is_for_publish, create the topic
|
||||
// 2.2.1 if the request is_for_subscribe, return error not found
|
||||
// 2.2.2 if the request is_for_publish, create the topic
|
||||
// 2.2 if the topic is found, return the brokers
|
||||
//
|
||||
// 3. unlock the topic
|
||||
func (broker *MessageQueueBroker) FindTopicBrokers(c context.Context, request *mq_pb.FindTopicBrokersRequest) (*mq_pb.FindTopicBrokersResponse, error) {
|
||||
ret := &mq_pb.FindTopicBrokersResponse{}
|
||||
// lock the topic
|
||||
|
||||
// find the topic partitions on the filer
|
||||
// if the topic is not found
|
||||
// if the request is_for_publish
|
||||
// create the topic
|
||||
// if the request is_for_subscribe
|
||||
// return error not found
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// CheckTopicPartitionsStatus check the topic partitions on the broker
|
||||
func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) {
|
||||
ret := &mq_pb.CheckTopicPartitionsStatusResponse{}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// createOrUpdateTopicPartitions creates the topic partitions on the broker
|
||||
// 1. check
|
||||
func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment) (err error) {
|
||||
// create or update each partition
|
||||
if prevAssignment == nil {
|
||||
broker.createOrUpdateTopicPartition(topic, nil)
|
||||
} else {
|
||||
for _, partitionAssignment := range prevAssignment.BrokerPartitions {
|
||||
broker.createOrUpdateTopicPartition(topic, partitionAssignment)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (newAssignment *mq_pb.BrokerPartitionsAssignment) {
|
||||
shouldCreate := broker.confirmBrokerPartitionAssignment(topic, oldAssignment)
|
||||
if !shouldCreate {
|
||||
|
||||
}
|
||||
return
|
||||
}
|
||||
func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (shouldCreate bool) {
|
||||
if oldAssignment == nil {
|
||||
return true
|
||||
}
|
||||
for _, b := range oldAssignment.FollowerBrokers {
|
||||
pb.WithBrokerClient(false, pb.ServerAddress(b), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
_, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{
|
||||
Namespace: string(topic.Namespace),
|
||||
Topic: topic.Name,
|
||||
BrokerPartitionsAssignment: oldAssignment,
|
||||
ShouldCancelIfNotMatch: true,
|
||||
})
|
||||
if err != nil {
|
||||
shouldCreate = true
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *topic.Segment, brokers []pb.ServerAddress) (active bool, err error) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, candidate := range brokers {
|
||||
|
||||
@@ -1,16 +1,136 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
)
|
||||
|
||||
// For a new or re-configured topic, or one of the broker went offline,
|
||||
// the pub clients ask one broker what are the brokers for all the topic partitions.
|
||||
// The broker will lock the topic on write.
|
||||
// 1. if the topic is not found, create the topic, and allocate the topic partitions to the brokers
|
||||
// 2. if the topic is found, return the brokers for the topic partitions
|
||||
// For a topic to read from, the sub clients ask one broker what are the brokers for all the topic partitions.
|
||||
// The broker will lock the topic on read.
|
||||
// 1. if the topic is not found, return error
|
||||
// 2. if the topic is found, return the brokers for the topic partitions
|
||||
//
|
||||
// If the topic needs to be re-balanced, the admin client will lock the topic,
|
||||
// 1. collect throughput information for all the brokers
|
||||
// 2. adjust the topic partitions to the brokers
|
||||
// 3. notify the brokers to add/remove partitions to host
|
||||
// 3.1 When locking the topic, the partitions and brokers should be remembered in the lock.
|
||||
// 4. the brokers will stop process incoming messages if not the right partition
|
||||
// 4.1 the pub clients will need to re-partition the messages and publish to the right brokers for the partition3
|
||||
// 4.2 the sub clients will need to change the brokers to read from
|
||||
//
|
||||
// The following is from each individual component's perspective:
|
||||
// For a pub client
|
||||
// For current topic/partition, ask one broker for the brokers for the topic partitions
|
||||
// 1. connect to the brokers and keep sending, until the broker returns error, or the broker leader is moved.
|
||||
// For a sub client
|
||||
// For current topic/partition, ask one broker for the brokers for the topic partitions
|
||||
// 1. connect to the brokers and keep reading, until the broker returns error, or the broker leader is moved.
|
||||
// For a broker
|
||||
// Upon a pub client lookup:
|
||||
// 1. lock the topic
|
||||
// 2. if already has topic partition assignment, check all brokers are healthy
|
||||
// 3. if not, create topic partition assignment
|
||||
// 2. return the brokers for the topic partitions
|
||||
// 3. unlock the topic
|
||||
// Upon a sub client lookup:
|
||||
// 1. lock the topic
|
||||
// 2. if already has topic partition assignment, check all brokers are healthy
|
||||
// 3. if not, return error
|
||||
// 2. return the brokers for the topic partitions
|
||||
// 3. unlock the topic
|
||||
// For an admin tool
|
||||
// 0. collect stats from all the brokers, and find the topic worth moving
|
||||
// 1. lock the topic
|
||||
// 2. collect throughput information for all the brokers
|
||||
// 3. adjust the topic partitions to the brokers
|
||||
// 4. notify the brokers to add/remove partitions to host
|
||||
// 5. the brokers will stop process incoming messages if not the right partition
|
||||
// 6. unlock the topic
|
||||
|
||||
/*
|
||||
The messages is buffered in memory, and saved to filer under
|
||||
The messages are buffered in memory, and saved to filer under
|
||||
/topics/<topic>/<date>/<hour>/<segment>/*.msg
|
||||
/topics/<topic>/<date>/<hour>/segment
|
||||
/topics/<topic>/info/segment_<id>.meta
|
||||
|
||||
|
||||
|
||||
*/
|
||||
|
||||
func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
|
||||
// 1. write to the volume server
|
||||
// 2. find the topic metadata owning filer
|
||||
// 3. write to the filer
|
||||
|
||||
var localTopicPartition *topic.LocalPartition
|
||||
for {
|
||||
req, err := stream.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Process the received message
|
||||
sequence := req.GetSequence()
|
||||
response := &mq_pb.PublishResponse{
|
||||
AckSequence: sequence,
|
||||
}
|
||||
if dataMessage := req.GetData(); dataMessage != nil {
|
||||
if localTopicPartition == nil {
|
||||
response.Error = "topic partition not initialized"
|
||||
glog.Errorf("topic partition not found")
|
||||
} else {
|
||||
localTopicPartition.Publish(dataMessage)
|
||||
}
|
||||
} else if initMessage := req.GetInit(); initMessage != nil {
|
||||
localTopicPartition = broker.localTopicManager.GetTopicPartition(
|
||||
topic.NewTopic(topic.Namespace(initMessage.Segment.Namespace), initMessage.Segment.Topic),
|
||||
topic.FromPbPartition(initMessage.Segment.Partition),
|
||||
)
|
||||
if localTopicPartition == nil {
|
||||
response.Error = fmt.Sprintf("topic partition %v not found", initMessage.Segment)
|
||||
glog.Errorf("topic partition %v not found", initMessage.Segment)
|
||||
}
|
||||
}
|
||||
if err := stream.Send(response); err != nil {
|
||||
glog.Errorf("Error sending setup response: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
|
||||
func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
|
||||
ret := &mq_pb.AssignTopicPartitionsResponse{}
|
||||
self := pb.ServerAddress(fmt.Sprintf("%s:%d", broker.option.Ip, broker.option.Port))
|
||||
|
||||
for _, partition := range request.TopicPartitionsAssignment.BrokerPartitions {
|
||||
localPartiton := topic.FromPbBrokerPartitionsAssignment(self, partition)
|
||||
broker.localTopicManager.AddTopicPartition(
|
||||
topic.FromPbTopic(request.Topic),
|
||||
localPartiton)
|
||||
if request.IsLeader {
|
||||
for _, follower := range localPartiton.FollowerBrokers {
|
||||
err := pb.WithBrokerClient(false, follower, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
_, err := client.AssignTopicPartitions(context.Background(), request)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
@@ -12,8 +12,8 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) {
|
||||
info, found, err := broker.readSegmentOnFiler(segment)
|
||||
func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *topic.Segment) (brokers []pb.ServerAddress, err error) {
|
||||
info, found, err := broker.readSegmentInfoOnFiler(segment)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -27,12 +27,12 @@ func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brok
|
||||
return
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) {
|
||||
func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *topic.Segment, brokers []pb.ServerAddress) (err error) {
|
||||
var nodes []string
|
||||
for _, b := range brokers {
|
||||
nodes = append(nodes, string(b))
|
||||
}
|
||||
broker.saveSegmentToFiler(segment, &mq_pb.SegmentInfo{
|
||||
broker.saveSegmentInfoToFiler(segment, &mq_pb.SegmentInfo{
|
||||
Segment: segment.ToPbSegment(),
|
||||
StartTsNs: time.Now().UnixNano(),
|
||||
Brokers: nodes,
|
||||
@@ -43,7 +43,7 @@ func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *mq.Segment,
|
||||
return
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info *mq_pb.SegmentInfo, found bool, err error) {
|
||||
func (broker *MessageQueueBroker) readSegmentInfoOnFiler(segment *topic.Segment) (info *mq_pb.SegmentInfo, found bool, err error) {
|
||||
dir, name := segment.DirAndName()
|
||||
|
||||
found, err = filer_pb.Exists(broker, dir, name, false)
|
||||
@@ -70,7 +70,7 @@ func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info
|
||||
return
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) saveSegmentToFiler(segment *mq.Segment, info *mq_pb.SegmentInfo) (err error) {
|
||||
func (broker *MessageQueueBroker) saveSegmentInfoToFiler(segment *topic.Segment, info *mq_pb.SegmentInfo) (err error) {
|
||||
dir, name := segment.DirAndName()
|
||||
|
||||
var buf bytes.Buffer
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
||||
@@ -27,20 +28,22 @@ type MessageQueueBrokerOption struct {
|
||||
|
||||
type MessageQueueBroker struct {
|
||||
mq_pb.UnimplementedSeaweedMessagingServer
|
||||
option *MessageQueueBrokerOption
|
||||
grpcDialOption grpc.DialOption
|
||||
MasterClient *wdclient.MasterClient
|
||||
filers map[pb.ServerAddress]struct{}
|
||||
currentFiler pb.ServerAddress
|
||||
option *MessageQueueBrokerOption
|
||||
grpcDialOption grpc.DialOption
|
||||
MasterClient *wdclient.MasterClient
|
||||
filers map[pb.ServerAddress]struct{}
|
||||
currentFiler pb.ServerAddress
|
||||
localTopicManager *topic.LocalTopicManager
|
||||
}
|
||||
|
||||
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
|
||||
|
||||
mqBroker = &MessageQueueBroker{
|
||||
option: option,
|
||||
grpcDialOption: grpcDialOption,
|
||||
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters),
|
||||
filers: make(map[pb.ServerAddress]struct{}),
|
||||
option: option,
|
||||
grpcDialOption: grpcDialOption,
|
||||
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters),
|
||||
filers: make(map[pb.ServerAddress]struct{}),
|
||||
localTopicManager: topic.NewLocalTopicManager(),
|
||||
}
|
||||
mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user