Squashed commit of the following:

commit 4827425146
Author: chrislu <chris.lu@gmail.com>
Date:   Sat Sep 16 15:05:38 2023 -0700

    balancer works

commit 3b50139f68
Author: chrislu <chris.lu@gmail.com>
Date:   Fri Sep 15 22:22:32 2023 -0700

    comments

commit 7f685ce7ba
Author: chrislu <chris.lu@gmail.com>
Date:   Fri Sep 15 22:20:05 2023 -0700

    adjust APIs

commit 436d99443b
Author: chrislu <chris.lu@gmail.com>
Date:   Thu Sep 14 23:49:05 2023 -0700

    receive broker stats

commit b771fefa37
Merge: 0a851ec00 890881037
Author: chrislu <chris.lu@gmail.com>
Date:   Wed Sep 13 00:03:47 2023 -0700

    Merge branch 'master' into sub

commit 0a851ec00b
Author: chrislu <chris.lu@gmail.com>
Date:   Sun Sep 10 22:01:25 2023 -0700

    Create balancer.go

commit 39941edc0b
Author: chrislu <chris.lu@gmail.com>
Date:   Thu Sep 7 23:55:19 2023 -0700

    add publisher shutdown

commit 875f562779
Author: chrislu <chris.lu@gmail.com>
Date:   Wed Sep 6 23:16:41 2023 -0700

    server side send response at least once per second

commit 984b6c54cf
Author: chrislu <chris.lu@gmail.com>
Date:   Wed Sep 6 23:15:29 2023 -0700

    ack interval 128

commit 2492a45499
Author: chrislu <chris.lu@gmail.com>
Date:   Wed Sep 6 22:39:46 2023 -0700

    ack interval

commit ba67e6ca29
Author: chrislu <chris.lu@gmail.com>
Date:   Mon Sep 4 21:43:50 2023 -0700

    api for sub

commit 9e4f985698
Author: chrislu <chris.lu@gmail.com>
Date:   Mon Sep 4 21:43:30 2023 -0700

    publish, benchmark

commit cb470d44df
Author: chrislu <chris.lu@gmail.com>
Date:   Fri Sep 1 00:36:51 2023 -0700

    can pub and sub

commit 1eb2da46d5
Author: chrislu <chris.lu@gmail.com>
Date:   Mon Aug 28 09:02:12 2023 -0700

    connect and publish

commit 504ae8383a
Author: chrislu <chris.lu@gmail.com>
Date:   Mon Aug 28 09:01:25 2023 -0700

    protoc version

commit dbcba75271
Author: chrislu <chris.lu@gmail.com>
Date:   Sun Aug 27 18:59:04 2023 -0700

    rename to lookup

commit c9caf33119
Author: chrislu <chris.lu@gmail.com>
Date:   Sun Aug 27 18:33:46 2023 -0700

    move functions

commit 4d6c18d86f
Author: chrislu <chris.lu@gmail.com>
Date:   Sun Aug 27 17:50:59 2023 -0700

    pub sub initial tests

commit 4eb8e8624d
Author: chrislu <chris.lu@gmail.com>
Date:   Sun Aug 27 13:14:39 2023 -0700

    rename

commit 1990456670
Author: chrislu <chris.lu@gmail.com>
Date:   Sun Aug 27 13:13:14 2023 -0700

    sub

commit 905911853d
Author: chrislu <chris.lu@gmail.com>
Date:   Sat Aug 26 13:39:21 2023 -0700

    adjust proto
This commit is contained in:
chrislu
2023-09-16 15:06:16 -07:00
parent 8908810376
commit 89a1fd1751
36 changed files with 2874 additions and 804 deletions

View File

@@ -0,0 +1,24 @@
package balancer
import (
cmap "github.com/orcaman/concurrent-map/v2"
)
type Balancer struct {
Brokers cmap.ConcurrentMap[string, *BrokerStats]
}
type BrokerStats struct {
TopicPartitionCount int32
ConsumerCount int32
CpuUsagePercent int32
}
func NewBalancer() *Balancer {
return &Balancer{
Brokers: cmap.New[*BrokerStats](),
}
}
func NewBrokerStats() *BrokerStats {
return &BrokerStats{}
}

View File

@@ -88,69 +88,38 @@ func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq
return ret, nil
}
// FindTopicBrokers returns the brokers that are serving the topic
//
// 1. lock the topic
//
// 2. find the topic partitions on the filer
// 2.1 if the topic is not found, return error
// 2.2 if the request is_for_publish, create the topic
// 2.2.1 if the request is_for_subscribe, return error not found
// 2.2.2 if the request is_for_publish, create the topic
// 2.2 if the topic is found, return the brokers
//
// 3. unlock the topic
func (broker *MessageQueueBroker) FindTopicBrokers(c context.Context, request *mq_pb.FindTopicBrokersRequest) (*mq_pb.FindTopicBrokersResponse, error) {
ret := &mq_pb.FindTopicBrokersResponse{}
// lock the topic
// find the topic partitions on the filer
// if the topic is not found
// if the request is_for_publish
// create the topic
// if the request is_for_subscribe
// return error not found
return ret, nil
}
// CheckTopicPartitionsStatus check the topic partitions on the broker
func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) {
ret := &mq_pb.CheckTopicPartitionsStatusResponse{}
return ret, nil
}
// createOrUpdateTopicPartitions creates the topic partitions on the broker
// 1. check
func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment) (err error) {
func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignments []*mq_pb.BrokerPartitionAssignment) (err error) {
// create or update each partition
if prevAssignment == nil {
if prevAssignments == nil {
broker.createOrUpdateTopicPartition(topic, nil)
} else {
for _, partitionAssignment := range prevAssignment.BrokerPartitions {
broker.createOrUpdateTopicPartition(topic, partitionAssignment)
for _, brokerPartitionAssignment := range prevAssignments {
broker.createOrUpdateTopicPartition(topic, brokerPartitionAssignment)
}
}
return nil
}
func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (newAssignment *mq_pb.BrokerPartitionsAssignment) {
func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (newAssignment *mq_pb.BrokerPartitionAssignment) {
shouldCreate := broker.confirmBrokerPartitionAssignment(topic, oldAssignment)
if !shouldCreate {
}
return
}
func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (shouldCreate bool) {
func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (shouldCreate bool) {
if oldAssignment == nil {
return true
}
for _, b := range oldAssignment.FollowerBrokers {
pb.WithBrokerClient(false, pb.ServerAddress(b), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
pb.WithBrokerGrpcClient(false, b, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{
Namespace: string(topic.Namespace),
Topic: topic.Name,
BrokerPartitionsAssignment: oldAssignment,
ShouldCancelIfNotMatch: true,
Namespace: string(topic.Namespace),
Topic: topic.Name,
BrokerPartitionAssignment: oldAssignment,
ShouldCancelIfNotMatch: true,
})
if err != nil {
shouldCreate = true

View File

@@ -0,0 +1,52 @@
package broker
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// BrokerConnectToBalancer receives connections from brokers and collects stats
func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessaging_ConnectToBalancerServer) error {
if !broker.lockAsBalancer.IsLocked() {
return status.Errorf(codes.Unavailable, "not current broker balancer")
}
req, err := stream.Recv()
if err != nil {
return err
}
// process init message
initMessage := req.GetInit()
brokerStats := balancer.NewBrokerStats()
if initMessage != nil {
broker.Balancer.Brokers.Set(initMessage.Broker, brokerStats)
} else {
return status.Errorf(codes.InvalidArgument, "balancer init message is empty")
}
defer func() {
broker.Balancer.Brokers.Remove(initMessage.Broker)
}()
// process stats message
for {
req, err := stream.Recv()
if err != nil {
return err
}
if !broker.lockAsBalancer.IsLocked() {
return status.Errorf(codes.Unavailable, "not current broker balancer")
}
if receivedStats := req.GetStats(); receivedStats != nil {
brokerStats.TopicPartitionCount = receivedStats.TopicPartitionCount
brokerStats.ConsumerCount = receivedStats.ConsumerCount
brokerStats.CpuUsagePercent = receivedStats.CpuUsagePercent
glog.V(3).Infof("broker %s stats: %+v", initMessage.Broker, brokerStats)
}
}
return nil
}

View File

@@ -0,0 +1,50 @@
package broker
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
// FindTopicBrokers returns the brokers that are serving the topic
//
// 1. lock the topic
//
// 2. find the topic partitions on the filer
// 2.1 if the topic is not found, return error
// 2.2 if the request is_for_publish, create the topic
// 2.2.1 if the request is_for_subscribe, return error not found
// 2.2.2 if the request is_for_publish, create the topic
// 2.2 if the topic is found, return the brokers
//
// 3. unlock the topic
func (broker *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (*mq_pb.LookupTopicBrokersResponse, error) {
ret := &mq_pb.LookupTopicBrokersResponse{}
// TODO lock the topic
// find the topic partitions on the filer
// if the topic is not found
// if the request is_for_publish
// create the topic
// if the request is_for_subscribe
// return error not found
// t := topic.FromPbTopic(request.Topic)
ret.Topic = request.Topic
ret.BrokerPartitionAssignments = []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:17777",
FollowerBrokers: []string{"localhost:17777"},
Partition: &mq_pb.Partition{
RingSize: MaxPartitionCount,
RangeStart: 0,
RangeStop: MaxPartitionCount,
},
},
}
return ret, nil
}
// CheckTopicPartitionsStatus check the topic partitions on the broker
func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) {
ret := &mq_pb.CheckTopicPartitionsStatusResponse{}
return ret, nil
}

View File

@@ -7,6 +7,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"sync/atomic"
"time"
)
// For a new or re-configured topic, or one of the broker went offline,
@@ -73,39 +75,89 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
// 3. write to the filer
var localTopicPartition *topic.LocalPartition
req, err := stream.Recv()
if err != nil {
return err
}
response := &mq_pb.PublishResponse{}
// TODO check whether current broker should be the leader for the topic partition
ackInterval := 1
initMessage := req.GetInit()
if initMessage != nil {
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
localTopicPartition = broker.localTopicManager.GetTopicPartition(t, p)
if localTopicPartition == nil {
localTopicPartition = topic.NewLocalPartition(t, p, true, nil)
broker.localTopicManager.AddTopicPartition(t, localTopicPartition)
}
ackInterval = int(initMessage.AckInterval)
stream.Send(response)
} else {
response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
return stream.Send(response)
}
ackCounter := 0
var ackSequence int64
var isStopping int32
respChan := make(chan *mq_pb.PublishResponse, 128)
defer func() {
atomic.StoreInt32(&isStopping, 1)
close(respChan)
}()
go func() {
ticker := time.NewTicker(1 * time.Second)
for {
select {
case resp := <-respChan:
if resp != nil {
if err := stream.Send(resp); err != nil {
glog.Errorf("Error sending response %v: %v", resp, err)
}
} else {
return
}
case <-ticker.C:
if atomic.LoadInt32(&isStopping) == 0 {
response := &mq_pb.PublishResponse{
AckSequence: ackSequence,
}
respChan <- response
} else {
return
}
}
}
}()
// process each published messages
for {
// receive a message
req, err := stream.Recv()
if err != nil {
return err
}
// Process the received message
sequence := req.GetSequence()
response := &mq_pb.PublishResponse{
AckSequence: sequence,
}
if dataMessage := req.GetData(); dataMessage != nil {
if localTopicPartition == nil {
response.Error = "topic partition not initialized"
glog.Errorf("topic partition not found")
} else {
localTopicPartition.Publish(dataMessage)
}
} else if initMessage := req.GetInit(); initMessage != nil {
localTopicPartition = broker.localTopicManager.GetTopicPartition(
topic.NewTopic(topic.Namespace(initMessage.Segment.Namespace), initMessage.Segment.Topic),
topic.FromPbPartition(initMessage.Segment.Partition),
)
if localTopicPartition == nil {
response.Error = fmt.Sprintf("topic partition %v not found", initMessage.Segment)
glog.Errorf("topic partition %v not found", initMessage.Segment)
}
localTopicPartition.Publish(dataMessage)
}
if err := stream.Send(response); err != nil {
glog.Errorf("Error sending setup response: %v", err)
ackCounter++
ackSequence++
if ackCounter >= ackInterval {
ackCounter = 0
// send back the ack
response := &mq_pb.PublishResponse{
AckSequence: ackSequence,
}
respChan <- response
}
}
glog.Infof("publish stream closed")
return nil
}
@@ -114,14 +166,14 @@ func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, reque
ret := &mq_pb.AssignTopicPartitionsResponse{}
self := pb.ServerAddress(fmt.Sprintf("%s:%d", broker.option.Ip, broker.option.Port))
for _, partition := range request.TopicPartitionsAssignment.BrokerPartitions {
localPartiton := topic.FromPbBrokerPartitionsAssignment(self, partition)
for _, brokerPartition := range request.BrokerPartitionAssignments {
localPartiton := topic.FromPbBrokerPartitionAssignment(self, brokerPartition)
broker.localTopicManager.AddTopicPartition(
topic.FromPbTopic(request.Topic),
localPartiton)
if request.IsLeader {
for _, follower := range localPartiton.FollowerBrokers {
err := pb.WithBrokerClient(false, follower, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
err := pb.WithBrokerGrpcClient(false, follower.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.AssignTopicPartitions(context.Background(), request)
return err
})

View File

@@ -0,0 +1,44 @@
package broker
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"time"
)
func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error {
localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.Cursor.Topic),
topic.FromPbPartition(req.Cursor.Partition))
if localTopicPartition == nil {
stream.Send(&mq_pb.SubscribeResponse{
Message: &mq_pb.SubscribeResponse_Ctrl{
Ctrl: &mq_pb.SubscribeResponse_CtrlMessage{
Error: "not initialized",
},
},
})
return nil
}
clientName := fmt.Sprintf("%s/%s-%s", req.Consumer.ConsumerGroup, req.Consumer.ConsumerId, req.Consumer.ClientId)
localTopicPartition.Subscribe(clientName, time.Now(), func(logEntry *filer_pb.LogEntry) error {
value := logEntry.GetData()
if err := stream.Send(&mq_pb.SubscribeResponse{Message: &mq_pb.SubscribeResponse_Data{
Data: &mq_pb.DataMessage{
Key: []byte(fmt.Sprintf("key-%d", logEntry.PartitionKeyHash)),
Value: value,
},
}}); err != nil {
glog.Errorf("Error sending setup response: %v", err)
return err
}
return nil
})
return nil
}

View File

@@ -1,6 +1,9 @@
package broker
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"time"
@@ -34,6 +37,8 @@ type MessageQueueBroker struct {
filers map[pb.ServerAddress]struct{}
currentFiler pb.ServerAddress
localTopicManager *topic.LocalTopicManager
Balancer *balancer.Balancer
lockAsBalancer *cluster.LiveLock
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
@@ -41,9 +46,10 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
mqBroker = &MessageQueueBroker{
option: option,
grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
filers: make(map[pb.ServerAddress]struct{}),
localTopicManager: topic.NewLocalTopicManager(),
Balancer: balancer.NewBalancer(),
}
mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
@@ -54,6 +60,25 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
mqBroker.OnBrokerUpdate(newNode, time.Now())
}
// keep connecting to balancer
go func() {
for mqBroker.currentFiler == "" {
time.Sleep(time.Millisecond * 237)
}
self := fmt.Sprintf("%s:%d", option.Ip, option.Port)
glog.V(1).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
mqBroker.lockAsBalancer = lockClient.StartLock(LockBrokerBalancer, self)
for {
err := mqBroker.BrokerConnectToBalancer(self)
if err != nil {
fmt.Printf("BrokerConnectToBalancer: %v\n", err)
}
time.Sleep(time.Second)
}
}()
return mqBroker, nil
}
@@ -112,7 +137,7 @@ func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb
func (broker *MessageQueueBroker) withBrokerClient(streamingMode bool, server pb.ServerAddress, fn func(client mq_pb.SeaweedMessagingClient) error) error {
return pb.WithBrokerClient(streamingMode, server, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
return pb.WithBrokerGrpcClient(streamingMode, server.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
return fn(client)
})

View File

@@ -0,0 +1,74 @@
package broker
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"math/rand"
"time"
)
const (
LockBrokerBalancer = "broker_balancer"
)
// BrokerConnectToBalancer connects to the broker balancer and sends stats
func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
// find the lock owner
var brokerBalancer string
err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
Name: LockBrokerBalancer,
})
if err != nil {
return err
}
brokerBalancer = resp.Owner
return nil
})
if err != nil {
return err
}
glog.V(1).Infof("broker %s found balancer %s", self, brokerBalancer)
// connect to the lock owner
err = pb.WithBrokerGrpcClient(false, brokerBalancer, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
stream, err := client.ConnectToBalancer(context.Background())
if err != nil {
return fmt.Errorf("connect to balancer %v: %v", brokerBalancer, err)
}
defer stream.CloseSend()
err = stream.Send(&mq_pb.ConnectToBalancerRequest{
Message: &mq_pb.ConnectToBalancerRequest_Init{
Init: &mq_pb.ConnectToBalancerRequest_InitMessage{
Broker: self,
},
},
})
if err != nil {
return fmt.Errorf("send init message: %v", err)
}
for {
stats := broker.localTopicManager.CollectStats(time.Second * 5)
err = stream.Send(&mq_pb.ConnectToBalancerRequest{
Message: &mq_pb.ConnectToBalancerRequest_Stats{
Stats: stats,
},
})
if err != nil {
return fmt.Errorf("send stats message: %v", err)
}
time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond)
}
return nil
})
return err
}

View File

@@ -0,0 +1,58 @@
package main
import (
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
"log"
"sync"
"time"
)
var (
messageCount = flag.Int("n", 1000, "message count")
concurrency = flag.Int("c", 4, "concurrency count")
)
func doPublish(publisher *pub_client.TopicPublisher, id int) {
startTime := time.Now()
for i := 0; i < *messageCount / *concurrency; i++ {
// Simulate publishing a message
key := []byte(fmt.Sprintf("key-%d-%d", id, i))
value := []byte(fmt.Sprintf("value-%d-%d", id, i))
publisher.Publish(key, value) // Call your publisher function here
// println("Published", string(key), string(value))
}
elapsed := time.Since(startTime)
log.Printf("Publisher %d finished in %s", id, elapsed)
}
func main() {
flag.Parse()
publisher := pub_client.NewTopicPublisher(
"test", "test")
if err := publisher.Connect("localhost:17777"); err != nil {
fmt.Println(err)
return
}
startTime := time.Now()
// Start multiple publishers
var wg sync.WaitGroup
for i := 0; i < *concurrency; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
doPublish(publisher, id)
}(i)
}
// Wait for all publishers to finish
wg.Wait()
elapsed := time.Since(startTime)
publisher.Shutdown()
log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds())
}

View File

@@ -0,0 +1,44 @@
package main
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
subscriberConfig := &sub_client.SubscriberConfiguration{
ClientId: "testSubscriber",
GroupId: "test",
GroupInstanceId: "test",
GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
}
contentConfig := &sub_client.ContentConfiguration{
Namespace: "test",
Topic: "test",
Filter: "",
}
subscriber := sub_client.NewTopicSubscriber(subscriberConfig, contentConfig)
if err := subscriber.Connect("localhost:17777"); err != nil {
fmt.Println(err)
return
}
subscriber.SetEachMessageFunc(func(key, value []byte) bool {
println(string(key), "=>", string(value))
return true
})
subscriber.SetCompletionFunc(func() {
println("done subscribing")
})
if err := subscriber.Subscribe(); err != nil {
fmt.Println(err)
}
}

View File

@@ -0,0 +1,116 @@
package pub_client
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (p *TopicPublisher) doLookup(brokerAddress string) error {
err := pb.WithBrokerGrpcClient(true,
brokerAddress,
p.grpcDialOption,
func(client mq_pb.SeaweedMessagingClient) error {
lookupResp, err := client.LookupTopicBrokers(context.Background(),
&mq_pb.LookupTopicBrokersRequest{
Topic: &mq_pb.Topic{
Namespace: p.namespace,
Name: p.topic,
},
IsForPublish: true,
})
if err != nil {
return err
}
for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments {
// partition => publishClient
publishClient, redirectTo, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker)
if err != nil {
return err
}
for redirectTo != "" {
publishClient, redirectTo, err = p.doConnect(brokerPartitionAssignment.Partition, redirectTo)
if err != nil {
return err
}
}
p.partition2Broker.Insert(
brokerPartitionAssignment.Partition.RangeStart,
brokerPartitionAssignment.Partition.RangeStop,
publishClient)
}
return nil
})
if err != nil {
return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err)
}
return nil
}
// broker => publish client
// send init message
// save the publishing client
func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, redirectTo string, err error) {
grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
if err != nil {
return publishClient, redirectTo, fmt.Errorf("dial broker %s: %v", brokerAddress, err)
}
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
stream, err := brokerClient.Publish(context.Background())
if err != nil {
return publishClient, redirectTo, fmt.Errorf("create publish client: %v", err)
}
publishClient = &PublishClient{
SeaweedMessaging_PublishClient: stream,
Broker: brokerAddress,
}
if err = publishClient.Send(&mq_pb.PublishRequest{
Message: &mq_pb.PublishRequest_Init{
Init: &mq_pb.PublishRequest_InitMessage{
Topic: &mq_pb.Topic{
Namespace: p.namespace,
Name: p.topic,
},
Partition: &mq_pb.Partition{
RingSize: partition.RingSize,
RangeStart: partition.RangeStart,
RangeStop: partition.RangeStop,
},
AckInterval: 128,
},
},
}); err != nil {
return publishClient, redirectTo, fmt.Errorf("send init message: %v", err)
}
resp, err := stream.Recv()
if err != nil {
return publishClient, redirectTo, fmt.Errorf("recv init response: %v", err)
}
if resp.Error != "" {
return publishClient, redirectTo, fmt.Errorf("init response error: %v", resp.Error)
}
if resp.RedirectToBroker != "" {
redirectTo = resp.RedirectToBroker
return publishClient, redirectTo, nil
}
go func() {
for {
_, err := publishClient.Recv()
if err != nil {
e, ok := status.FromError(err)
if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
return
}
publishClient.Err = err
fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
return
}
}
}()
return publishClient, redirectTo, nil
}

View File

@@ -0,0 +1,41 @@
package pub_client
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/broker"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func (p *TopicPublisher) Publish(key, value []byte) error {
hashKey := util.HashToInt32(key) % broker.MaxPartitionCount
if hashKey < 0 {
hashKey = -hashKey
}
publishClient, found := p.partition2Broker.Floor(hashKey, hashKey)
if !found {
return fmt.Errorf("no broker found for key %d", hashKey)
}
p.Lock()
defer p.Unlock()
// dead lock here
//google.golang.org/grpc/internal/transport.(*writeQuota).get(flowcontrol.go:59)
//google.golang.org/grpc/internal/transport.(*http2Client).Write(http2_client.go:1047)
//google.golang.org/grpc.(*csAttempt).sendMsg(stream.go:1040)
//google.golang.org/grpc.(*clientStream).SendMsg.func2(stream.go:892)
//google.golang.org/grpc.(*clientStream).withRetry(stream.go:752)
//google.golang.org/grpc.(*clientStream).SendMsg(stream.go:894)
//github.com/seaweedfs/seaweedfs/weed/pb/mq_pb.(*seaweedMessagingPublishClient).Send(mq_grpc.pb.go:141)
//github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client.(*TopicPublisher).Publish(publish.go:19)
if err := publishClient.Send(&mq_pb.PublishRequest{
Message: &mq_pb.PublishRequest_Data{
Data: &mq_pb.DataMessage{
Key: key,
Value: value,
},
},
}); err != nil {
return fmt.Errorf("send publish request: %v", err)
}
return nil
}

View File

@@ -0,0 +1,57 @@
package pub_client
import (
"github.com/rdleal/intervalst/interval"
"github.com/seaweedfs/seaweedfs/weed/mq/broker"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"sync"
"time"
)
type PublisherConfiguration struct {
}
type PublishClient struct {
mq_pb.SeaweedMessaging_PublishClient
Broker string
Err error
}
type TopicPublisher struct {
namespace string
topic string
partition2Broker *interval.SearchTree[*PublishClient, int32]
grpcDialOption grpc.DialOption
sync.Mutex // protects grpc
}
func NewTopicPublisher(namespace, topic string) *TopicPublisher {
return &TopicPublisher{
namespace: namespace,
topic: topic,
partition2Broker: interval.NewSearchTree[*PublishClient](func(a, b int32) int {
return int(a - b)
}),
grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
}
}
func (p *TopicPublisher) Connect(bootstrapBroker string) error {
if err := p.doLookup(bootstrapBroker); err != nil {
return err
}
return nil
}
func (p *TopicPublisher) Shutdown() error {
if clients, found := p.partition2Broker.AllIntersections(0, broker.MaxPartitionCount); found {
for _, client := range clients {
client.CloseSend()
}
}
time.Sleep(1100 * time.Millisecond)
return nil
}

View File

@@ -0,0 +1,34 @@
package sub_client
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
func (sub *TopicSubscriber) doLookup(brokerAddress string) error {
err := pb.WithBrokerGrpcClient(true,
brokerAddress,
sub.SubscriberConfig.GrpcDialOption,
func(client mq_pb.SeaweedMessagingClient) error {
lookupResp, err := client.LookupTopicBrokers(context.Background(),
&mq_pb.LookupTopicBrokersRequest{
Topic: &mq_pb.Topic{
Namespace: sub.ContentConfig.Namespace,
Name: sub.ContentConfig.Topic,
},
IsForPublish: false,
})
if err != nil {
return err
}
sub.brokerPartitionAssignments = lookupResp.BrokerPartitionAssignments
return nil
})
if err != nil {
return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
}
return nil
}

View File

@@ -0,0 +1,72 @@
package sub_client
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"sync"
)
// Subscribe subscribes to a topic's specified partitions.
// If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker.
func (sub *TopicSubscriber) Subscribe() error {
var wg sync.WaitGroup
for _, brokerPartitionAssignment := range sub.brokerPartitionAssignments {
brokerAddress := brokerPartitionAssignment.LeaderBroker
grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.SubscriberConfig.GrpcDialOption)
if err != nil {
return fmt.Errorf("dial broker %s: %v", brokerAddress, err)
}
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{
Consumer: &mq_pb.SubscribeRequest_Consumer{
ConsumerGroup: sub.SubscriberConfig.GroupId,
ConsumerId: sub.SubscriberConfig.GroupInstanceId,
},
Cursor: &mq_pb.SubscribeRequest_Cursor{
Topic: &mq_pb.Topic{
Namespace: sub.ContentConfig.Namespace,
Name: sub.ContentConfig.Topic,
},
Partition: &mq_pb.Partition{
RingSize: brokerPartitionAssignment.Partition.RingSize,
RangeStart: brokerPartitionAssignment.Partition.RangeStart,
RangeStop: brokerPartitionAssignment.Partition.RangeStop,
},
Filter: sub.ContentConfig.Filter,
},
})
if err != nil {
return fmt.Errorf("create subscribe client: %v", err)
}
wg.Add(1)
go func() {
defer wg.Done()
if sub.OnCompletionFunc != nil {
defer sub.OnCompletionFunc()
}
for {
resp, err := subscribeClient.Recv()
if err != nil {
fmt.Printf("subscribe error: %v\n", err)
return
}
if resp.Message == nil {
continue
}
switch m := resp.Message.(type) {
case *mq_pb.SubscribeResponse_Data:
if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) {
return
}
case *mq_pb.SubscribeResponse_Ctrl:
// ignore
}
}
}()
}
wg.Wait()
return nil
}

View File

@@ -0,0 +1,53 @@
package sub_client
import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
)
type SubscriberConfiguration struct {
ClientId string
GroupId string
GroupInstanceId string
BootstrapServers []string
GrpcDialOption grpc.DialOption
}
type ContentConfiguration struct {
Namespace string
Topic string
Filter string
}
type OnEachMessageFunc func(key, value []byte) (shouldContinue bool)
type OnCompletionFunc func()
type TopicSubscriber struct {
SubscriberConfig *SubscriberConfiguration
ContentConfig *ContentConfiguration
brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
OnEachMessageFunc OnEachMessageFunc
OnCompletionFunc OnCompletionFunc
}
func NewTopicSubscriber(subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
return &TopicSubscriber{
SubscriberConfig: subscriber,
ContentConfig: content,
}
}
func (sub *TopicSubscriber) Connect(bootstrapBroker string) error {
if err := sub.doLookup(bootstrapBroker); err != nil {
return err
}
return nil
}
func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) {
sub.OnEachMessageFunc = onEachMessageFn
}
func (sub *TopicSubscriber) SetCompletionFunc(onCompeletionFn OnCompletionFunc) {
sub.OnCompletionFunc = onCompeletionFn
}

View File

@@ -2,6 +2,9 @@ package topic
import (
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/shirou/gopsutil/v3/cpu"
"time"
)
// LocalTopicManager manages topics on local broker
@@ -25,6 +28,7 @@ func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition
Partitions: make([]*LocalPartition, 0),
}
}
manager.topics.SetIfAbsent(topic.String(), localTopic)
if localTopic.findPartition(localPartition.Partition) != nil {
return
}
@@ -52,3 +56,22 @@ func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Pa
}
return localTopic.removePartition(partition)
}
func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.BrokerStats {
stats := &mq_pb.BrokerStats{}
manager.topics.IterCb(func(topic string, localTopic *LocalTopic) {
for _, localPartition := range localTopic.Partitions {
stats.TopicPartitionCount++
stats.ConsumerCount += localPartition.ConsumerCount
}
})
// collect current broker's cpu usage
usages, err := cpu.Percent(duration, false)
if err == nil && len(usages) > 0 {
stats.CpuUsagePercent = int32(usages[0])
}
return stats
}

View File

@@ -1,7 +1,9 @@
package topic
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"time"
@@ -12,21 +14,44 @@ type LocalPartition struct {
isLeader bool
FollowerBrokers []pb.ServerAddress
logBuffer *log_buffer.LogBuffer
ConsumerCount int32
}
func (p LocalPartition) Publish(message *mq_pb.PublishRequest_DataMessage) {
func NewLocalPartition(topic Topic, partition Partition, isLeader bool, followerBrokers []pb.ServerAddress) *LocalPartition {
return &LocalPartition{
Partition: partition,
isLeader: isLeader,
FollowerBrokers: followerBrokers,
logBuffer: log_buffer.NewLogBuffer(
fmt.Sprintf("%s/%s/%4d-%4d", topic.Namespace, topic.Name, partition.RangeStart, partition.RangeStop),
2*time.Minute,
func(startTime, stopTime time.Time, buf []byte) {
},
func() {
},
),
}
}
type OnEachMessageFn func(logEntry *filer_pb.LogEntry) error
func (p LocalPartition) Publish(message *mq_pb.DataMessage) {
p.logBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
}
func FromPbBrokerPartitionsAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionsAssignment) *LocalPartition {
func (p LocalPartition) Subscribe(clientName string, startReadTime time.Time, eachMessageFn OnEachMessageFn) {
p.logBuffer.LoopProcessLogData(clientName, startReadTime, 0, func() bool {
return true
}, eachMessageFn)
}
func FromPbBrokerPartitionAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionAssignment) *LocalPartition {
isLeaer := assignment.LeaderBroker == string(self)
localPartition := &LocalPartition{
Partition: Partition{
RangeStart: assignment.PartitionStart,
RangeStop: assignment.PartitionStop,
RingSize: PartitionCount,
},
isLeader: isLeaer,
Partition: FromPbPartition(assignment.Partition),
isLeader: isLeaer,
}
if !isLeaer {
return localPartition