Merge accumulated changes related to message queue (#5098)
* balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * balance partitions on brokers * prepare topic partition first and then publish, move partition * purge unused APIs * clean up * adjust logs * add BalanceTopics() grpc API * configure topic * configure topic command * refactor * repair missing partitions * sequence of operations to ensure ordering * proto to close publishers and consumers * rename file * topic partition versioned by unixTimeNs * create local topic partition * close publishers * randomize the client name * wait until no publishers * logs * close stop publisher channel * send last ack * comments * comment * comments * support list of brokers * add cli options * Update .gitignore * logs * return io.eof directly * refactor * optionally create topic * refactoring * detect consumer disconnection * sub client wait for more messages * subscribe by time stamp * rename * rename to sub_balancer * rename * adjust comments * rename * fix compilation * rename * rename * SubscriberToSubCoordinator * sticky rebalance * go fmt * add tests * tracking topic=>broker * merge * comment
This commit is contained in:
@@ -3,18 +3,13 @@ package broker
|
||||
import (
|
||||
"context"
|
||||
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
|
||||
func (b *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
|
||||
ret := &mq_pb.FindBrokerLeaderResponse{}
|
||||
err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
|
||||
err := b.withMasterClient(false, b.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
|
||||
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
||||
ClientType: cluster.BrokerType,
|
||||
FilerGroup: request.FilerGroup,
|
||||
@@ -30,219 +25,3 @@ func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *m
|
||||
})
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) {
|
||||
ret := &mq_pb.AssignSegmentBrokersResponse{}
|
||||
segment := topic.FromPbSegment(request.Segment)
|
||||
|
||||
// check existing segment locations on filer
|
||||
existingBrokers, err := broker.checkSegmentOnFiler(segment)
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
|
||||
if len(existingBrokers) > 0 {
|
||||
// good if the segment is still on the brokers
|
||||
isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers)
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
if isActive {
|
||||
for _, broker := range existingBrokers {
|
||||
ret.Brokers = append(ret.Brokers, string(broker))
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
}
|
||||
|
||||
// randomly pick up to 10 brokers, and find the ones with the lightest load
|
||||
selectedBrokers, err := broker.selectBrokers()
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
|
||||
// save the allocated brokers info for this segment on the filer
|
||||
if err := broker.saveSegmentBrokersOnFiler(segment, selectedBrokers); err != nil {
|
||||
return ret, err
|
||||
}
|
||||
|
||||
for _, broker := range selectedBrokers {
|
||||
ret.Brokers = append(ret.Brokers, string(broker))
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) {
|
||||
ret := &mq_pb.CheckSegmentStatusResponse{}
|
||||
// TODO add in memory active segment
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) {
|
||||
ret := &mq_pb.CheckBrokerLoadResponse{}
|
||||
// TODO read broker's load
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// createOrUpdateTopicPartitions creates the topic partitions on the broker
|
||||
// 1. check
|
||||
func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignments []*mq_pb.BrokerPartitionAssignment) (err error) {
|
||||
// create or update each partition
|
||||
if prevAssignments == nil {
|
||||
broker.createOrUpdateTopicPartition(topic, nil)
|
||||
} else {
|
||||
for _, brokerPartitionAssignment := range prevAssignments {
|
||||
broker.createOrUpdateTopicPartition(topic, brokerPartitionAssignment)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (newAssignment *mq_pb.BrokerPartitionAssignment) {
|
||||
shouldCreate := broker.confirmBrokerPartitionAssignment(topic, oldAssignment)
|
||||
if !shouldCreate {
|
||||
|
||||
}
|
||||
return
|
||||
}
|
||||
func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (shouldCreate bool) {
|
||||
if oldAssignment == nil {
|
||||
return true
|
||||
}
|
||||
for _, b := range oldAssignment.FollowerBrokers {
|
||||
pb.WithBrokerGrpcClient(false, b, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
_, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{
|
||||
Namespace: string(topic.Namespace),
|
||||
Topic: topic.Name,
|
||||
BrokerPartitionAssignment: oldAssignment,
|
||||
ShouldCancelIfNotMatch: true,
|
||||
})
|
||||
if err != nil {
|
||||
shouldCreate = true
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *topic.Segment, brokers []pb.ServerAddress) (active bool, err error) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, candidate := range brokers {
|
||||
wg.Add(1)
|
||||
go func(candidate pb.ServerAddress) {
|
||||
defer wg.Done()
|
||||
broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
resp, checkErr := client.CheckSegmentStatus(context.Background(), &mq_pb.CheckSegmentStatusRequest{
|
||||
Segment: &mq_pb.Segment{
|
||||
Namespace: string(segment.Topic.Namespace),
|
||||
Topic: segment.Topic.Name,
|
||||
Id: segment.Id,
|
||||
},
|
||||
})
|
||||
if checkErr != nil {
|
||||
err = checkErr
|
||||
glog.V(0).Infof("check segment status on %s: %v", candidate, checkErr)
|
||||
return nil
|
||||
}
|
||||
if resp.IsActive == false {
|
||||
active = false
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}(candidate)
|
||||
}
|
||||
wg.Wait()
|
||||
return
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) selectBrokers() (brokers []pb.ServerAddress, err error) {
|
||||
candidates, err := broker.selectCandidatesFromMaster(10)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
brokers, err = broker.pickLightestCandidates(candidates, 3)
|
||||
return
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) selectCandidatesFromMaster(limit int32) (candidates []pb.ServerAddress, err error) {
|
||||
err = broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
|
||||
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
||||
ClientType: cluster.BrokerType,
|
||||
FilerGroup: broker.option.FilerGroup,
|
||||
Limit: limit,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(resp.ClusterNodes) == 0 {
|
||||
return nil
|
||||
}
|
||||
for _, node := range resp.ClusterNodes {
|
||||
candidates = append(candidates, pb.ServerAddress(node.Address))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
type CandidateStatus struct {
|
||||
address pb.ServerAddress
|
||||
messageCount int64
|
||||
bytesCount int64
|
||||
load int64
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) pickLightestCandidates(candidates []pb.ServerAddress, limit int) (selected []pb.ServerAddress, err error) {
|
||||
|
||||
if len(candidates) <= limit {
|
||||
return candidates, nil
|
||||
}
|
||||
|
||||
candidateStatuses, err := broker.checkBrokerStatus(candidates)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sort.Slice(candidateStatuses, func(i, j int) bool {
|
||||
return candidateStatuses[i].load < candidateStatuses[j].load
|
||||
})
|
||||
|
||||
for i, candidate := range candidateStatuses {
|
||||
if i >= limit {
|
||||
break
|
||||
}
|
||||
selected = append(selected, candidate.address)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddress) (candidateStatuses []*CandidateStatus, err error) {
|
||||
|
||||
candidateStatuses = make([]*CandidateStatus, len(candidates))
|
||||
var wg sync.WaitGroup
|
||||
for i, candidate := range candidates {
|
||||
wg.Add(1)
|
||||
go func(i int, candidate pb.ServerAddress) {
|
||||
defer wg.Done()
|
||||
err = broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
resp, checkErr := client.CheckBrokerLoad(context.Background(), &mq_pb.CheckBrokerLoadRequest{})
|
||||
if checkErr != nil {
|
||||
err = checkErr
|
||||
return err
|
||||
}
|
||||
candidateStatuses[i] = &CandidateStatus{
|
||||
address: candidate,
|
||||
messageCount: resp.MessageCount,
|
||||
bytesCount: resp.BytesCount,
|
||||
load: resp.MessageCount + resp.BytesCount/(64*1024),
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}(i, candidate)
|
||||
}
|
||||
wg.Wait()
|
||||
return
|
||||
}
|
||||
|
||||
31
weed/mq/broker/broker_grpc_balance.go
Normal file
31
weed/mq/broker/broker_grpc_balance.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.BalanceTopicsRequest) (resp *mq_pb.BalanceTopicsResponse, err error) {
|
||||
if b.currentBalancer == "" {
|
||||
return nil, status.Errorf(codes.Unavailable, "no balancer")
|
||||
}
|
||||
if !b.lockAsBalancer.IsLocked() {
|
||||
proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
resp, err = client.BalanceTopics(ctx, request)
|
||||
return nil
|
||||
})
|
||||
if proxyErr != nil {
|
||||
return nil, proxyErr
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
ret := &mq_pb.BalanceTopicsResponse{}
|
||||
|
||||
actions := b.Balancer.BalancePublishers()
|
||||
err = b.Balancer.ExecuteBalanceAction(actions, b.grpcDialOption)
|
||||
|
||||
return ret, err
|
||||
}
|
||||
107
weed/mq/broker/broker_grpc_configure.go
Normal file
107
weed/mq/broker/broker_grpc_configure.go
Normal file
@@ -0,0 +1,107 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer
|
||||
// It generates an assignments based on existing allocations,
|
||||
// and then assign the partitions to the brokers.
|
||||
func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.ConfigureTopicRequest) (resp *mq_pb.ConfigureTopicResponse, err error) {
|
||||
if b.currentBalancer == "" {
|
||||
return nil, status.Errorf(codes.Unavailable, "no balancer")
|
||||
}
|
||||
if !b.lockAsBalancer.IsLocked() {
|
||||
proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
resp, err = client.ConfigureTopic(ctx, request)
|
||||
return nil
|
||||
})
|
||||
if proxyErr != nil {
|
||||
return nil, proxyErr
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
ret := &mq_pb.ConfigureTopicResponse{}
|
||||
ret.BrokerPartitionAssignments, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount)
|
||||
|
||||
for _, bpa := range ret.BrokerPartitionAssignments {
|
||||
// fmt.Printf("create topic %s on %s\n", request.Topic, bpa.LeaderBroker)
|
||||
if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
|
||||
_, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{
|
||||
Topic: request.Topic,
|
||||
BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{
|
||||
{
|
||||
Partition: bpa.Partition,
|
||||
},
|
||||
},
|
||||
IsLeader: true,
|
||||
IsDraining: false,
|
||||
})
|
||||
if doCreateErr != nil {
|
||||
return fmt.Errorf("do create topic %s on %s: %v", request.Topic, bpa.LeaderBroker, doCreateErr)
|
||||
}
|
||||
brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker)
|
||||
if !found {
|
||||
brokerStats = pub_balancer.NewBrokerStats()
|
||||
if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
|
||||
brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker)
|
||||
}
|
||||
}
|
||||
brokerStats.RegisterAssignment(request.Topic, bpa.Partition)
|
||||
return nil
|
||||
}); doCreateErr != nil {
|
||||
return nil, doCreateErr
|
||||
}
|
||||
}
|
||||
|
||||
// TODO revert if some error happens in the middle of the assignments
|
||||
|
||||
return ret, err
|
||||
}
|
||||
|
||||
// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
|
||||
func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
|
||||
ret := &mq_pb.AssignTopicPartitionsResponse{}
|
||||
self := pb.ServerAddress(fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port))
|
||||
|
||||
// drain existing topic partition subscriptions
|
||||
for _, brokerPartition := range request.BrokerPartitionAssignments {
|
||||
localPartition := topic.FromPbBrokerPartitionAssignment(self, brokerPartition)
|
||||
if request.IsDraining {
|
||||
// TODO drain existing topic partition subscriptions
|
||||
|
||||
b.localTopicManager.RemoveTopicPartition(
|
||||
topic.FromPbTopic(request.Topic),
|
||||
localPartition.Partition)
|
||||
} else {
|
||||
b.localTopicManager.AddTopicPartition(
|
||||
topic.FromPbTopic(request.Topic),
|
||||
localPartition)
|
||||
}
|
||||
}
|
||||
|
||||
// if is leader, notify the followers to drain existing topic partition subscriptions
|
||||
if request.IsLeader {
|
||||
for _, brokerPartition := range request.BrokerPartitionAssignments {
|
||||
for _, follower := range brokerPartition.FollowerBrokers {
|
||||
err := pb.WithBrokerGrpcClient(false, follower, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
_, err := client.AssignTopicPartitions(context.Background(), request)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
@@ -1,72 +0,0 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/balancer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer
|
||||
func (broker *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.ConfigureTopicRequest) (resp *mq_pb.ConfigureTopicResponse, err error) {
|
||||
if broker.currentBalancer == "" {
|
||||
return nil, status.Errorf(codes.Unavailable, "no balancer")
|
||||
}
|
||||
if !broker.lockAsBalancer.IsLocked() {
|
||||
proxyErr := broker.withBrokerClient(false, broker.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
resp, err = client.ConfigureTopic(ctx, request)
|
||||
return nil
|
||||
})
|
||||
if proxyErr != nil {
|
||||
return nil, proxyErr
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
ret := &mq_pb.ConfigureTopicResponse{}
|
||||
ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount)
|
||||
|
||||
for _, bpa := range ret.BrokerPartitionAssignments {
|
||||
// fmt.Printf("create topic %s on %s\n", request.Topic, bpa.LeaderBroker)
|
||||
if doCreateErr := broker.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
|
||||
_, doCreateErr := client.DoConfigureTopic(ctx, &mq_pb.DoConfigureTopicRequest{
|
||||
Topic: request.Topic,
|
||||
Partition: bpa.Partition,
|
||||
})
|
||||
if doCreateErr != nil {
|
||||
return fmt.Errorf("do create topic %s on %s: %v", request.Topic, bpa.LeaderBroker, doCreateErr)
|
||||
}
|
||||
brokerStats, found := broker.Balancer.Brokers.Get(bpa.LeaderBroker)
|
||||
if !found {
|
||||
brokerStats = balancer.NewBrokerStats()
|
||||
if !broker.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
|
||||
brokerStats, _ = broker.Balancer.Brokers.Get(bpa.LeaderBroker)
|
||||
}
|
||||
}
|
||||
brokerStats.RegisterAssignment(request.Topic, bpa.Partition)
|
||||
return nil
|
||||
}); doCreateErr != nil {
|
||||
return nil, doCreateErr
|
||||
}
|
||||
}
|
||||
|
||||
// TODO revert if some error happens in the middle of the assignments
|
||||
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) DoConfigureTopic(ctx context.Context, req *mq_pb.DoConfigureTopicRequest) (resp *mq_pb.DoConfigureTopicResponse, err error) {
|
||||
ret := &mq_pb.DoConfigureTopicResponse{}
|
||||
t, p := topic.FromPbTopic(req.Topic), topic.FromPbPartition(req.Partition)
|
||||
localTopicPartition := broker.localTopicManager.GetTopicPartition(t, p)
|
||||
if localTopicPartition == nil {
|
||||
localTopicPartition = topic.NewLocalPartition(t, p, true, nil)
|
||||
broker.localTopicManager.AddTopicPartition(t, localTopicPartition)
|
||||
}
|
||||
|
||||
return ret, err
|
||||
}
|
||||
@@ -20,12 +20,12 @@ import (
|
||||
// 2.2 if the topic is found, return the brokers
|
||||
//
|
||||
// 3. unlock the topic
|
||||
func (broker *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
|
||||
if broker.currentBalancer == "" {
|
||||
func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
|
||||
if b.currentBalancer == "" {
|
||||
return nil, status.Errorf(codes.Unavailable, "no balancer")
|
||||
}
|
||||
if !broker.lockAsBalancer.IsLocked() {
|
||||
proxyErr := broker.withBrokerClient(false, broker.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
if !b.lockAsBalancer.IsLocked() {
|
||||
proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
resp, err = client.LookupTopicBrokers(ctx, request)
|
||||
return nil
|
||||
})
|
||||
@@ -37,22 +37,16 @@ func (broker *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, reques
|
||||
|
||||
ret := &mq_pb.LookupTopicBrokersResponse{}
|
||||
ret.Topic = request.Topic
|
||||
ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, request.IsForPublish, 6)
|
||||
ret.BrokerPartitionAssignments, err = b.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, request.IsForPublish, 6)
|
||||
return ret, err
|
||||
}
|
||||
|
||||
// CheckTopicPartitionsStatus check the topic partitions on the broker
|
||||
func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) {
|
||||
ret := &mq_pb.CheckTopicPartitionsStatusResponse{}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
|
||||
if broker.currentBalancer == "" {
|
||||
func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
|
||||
if b.currentBalancer == "" {
|
||||
return nil, status.Errorf(codes.Unavailable, "no balancer")
|
||||
}
|
||||
if !broker.lockAsBalancer.IsLocked() {
|
||||
proxyErr := broker.withBrokerClient(false, broker.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
if !b.lockAsBalancer.IsLocked() {
|
||||
proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
resp, err = client.ListTopics(ctx, request)
|
||||
return nil
|
||||
})
|
||||
@@ -64,9 +58,9 @@ func (broker *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb
|
||||
|
||||
ret := &mq_pb.ListTopicsResponse{}
|
||||
knownTopics := make(map[string]struct{})
|
||||
for brokerStatsItem := range broker.Balancer.Brokers.IterBuffered() {
|
||||
for brokerStatsItem := range b.Balancer.Brokers.IterBuffered() {
|
||||
_, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
|
||||
for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() {
|
||||
for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
|
||||
topicPartitionStat := topicPartitionStatsItem.Val
|
||||
topic := &mq_pb.Topic{
|
||||
Namespace: topicPartitionStat.TopicPartition.Namespace,
|
||||
|
||||
@@ -5,71 +5,36 @@ import (
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"google.golang.org/grpc/peer"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// For a new or re-configured topic, or one of the broker went offline,
|
||||
// the pub clients ask one broker what are the brokers for all the topic partitions.
|
||||
// The broker will lock the topic on write.
|
||||
// 1. if the topic is not found, create the topic, and allocate the topic partitions to the brokers
|
||||
// 2. if the topic is found, return the brokers for the topic partitions
|
||||
// For a topic to read from, the sub clients ask one broker what are the brokers for all the topic partitions.
|
||||
// The broker will lock the topic on read.
|
||||
// 1. if the topic is not found, return error
|
||||
// 2. if the topic is found, return the brokers for the topic partitions
|
||||
//
|
||||
// If the topic needs to be re-balanced, the admin client will lock the topic,
|
||||
// 1. collect throughput information for all the brokers
|
||||
// 2. adjust the topic partitions to the brokers
|
||||
// 3. notify the brokers to add/remove partitions to host
|
||||
// 3.1 When locking the topic, the partitions and brokers should be remembered in the lock.
|
||||
// 4. the brokers will stop process incoming messages if not the right partition
|
||||
// 4.1 the pub clients will need to re-partition the messages and publish to the right brokers for the partition3
|
||||
// 4.2 the sub clients will need to change the brokers to read from
|
||||
//
|
||||
// The following is from each individual component's perspective:
|
||||
// For a pub client
|
||||
// For current topic/partition, ask one broker for the brokers for the topic partitions
|
||||
// 1. connect to the brokers and keep sending, until the broker returns error, or the broker leader is moved.
|
||||
// For a sub client
|
||||
// For current topic/partition, ask one broker for the brokers for the topic partitions
|
||||
// 1. connect to the brokers and keep reading, until the broker returns error, or the broker leader is moved.
|
||||
// For a broker
|
||||
// Upon a pub client lookup:
|
||||
// 1. lock the topic
|
||||
// 2. if already has topic partition assignment, check all brokers are healthy
|
||||
// 3. if not, create topic partition assignment
|
||||
// 2. return the brokers for the topic partitions
|
||||
// 3. unlock the topic
|
||||
// Upon a sub client lookup:
|
||||
// 1. lock the topic
|
||||
// 2. if already has topic partition assignment, check all brokers are healthy
|
||||
// 3. if not, return error
|
||||
// 2. return the brokers for the topic partitions
|
||||
// 3. unlock the topic
|
||||
// For an admin tool
|
||||
// 0. collect stats from all the brokers, and find the topic worth moving
|
||||
// 1. lock the topic
|
||||
// 2. collect throughput information for all the brokers
|
||||
// 3. adjust the topic partitions to the brokers
|
||||
// 4. notify the brokers to add/remove partitions to host
|
||||
// 5. the brokers will stop process incoming messages if not the right partition
|
||||
// 6. unlock the topic
|
||||
// PUB
|
||||
// 1. gRPC API to configure a topic
|
||||
// 1.1 create a topic with existing partition count
|
||||
// 1.2 assign partitions to brokers
|
||||
// 2. gRPC API to lookup topic partitions
|
||||
// 3. gRPC API to publish by topic partitions
|
||||
|
||||
/*
|
||||
The messages are buffered in memory, and saved to filer under
|
||||
/topics/<topic>/<date>/<hour>/<segment>/*.msg
|
||||
/topics/<topic>/<date>/<hour>/segment
|
||||
/topics/<topic>/info/segment_<id>.meta
|
||||
// SUB
|
||||
// 1. gRPC API to lookup a topic partitions
|
||||
|
||||
// Re-balance topic partitions for publishing
|
||||
// 1. collect stats from all the brokers
|
||||
// 2. Rebalance and configure new generation of partitions on brokers
|
||||
// 3. Tell brokers to close current gneration of publishing.
|
||||
// Publishers needs to lookup again and publish to the new generation of partitions.
|
||||
|
||||
// Re-balance topic partitions for subscribing
|
||||
// 1. collect stats from all the brokers
|
||||
// Subscribers needs to listen for new partitions and connect to the brokers.
|
||||
// Each subscription may not get data. It can act as a backup.
|
||||
|
||||
*/
|
||||
|
||||
func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
|
||||
func (b *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
|
||||
// 1. write to the volume server
|
||||
// 2. find the topic metadata owning filer
|
||||
// 3. write to the filer
|
||||
@@ -85,19 +50,23 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
|
||||
initMessage := req.GetInit()
|
||||
if initMessage != nil {
|
||||
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
|
||||
localTopicPartition = broker.localTopicManager.GetTopicPartition(t, p)
|
||||
localTopicPartition = b.localTopicManager.GetTopicPartition(t, p)
|
||||
if localTopicPartition == nil {
|
||||
localTopicPartition = topic.NewLocalPartition(t, p, true, nil)
|
||||
broker.localTopicManager.AddTopicPartition(t, localTopicPartition)
|
||||
response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
|
||||
glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
|
||||
return stream.Send(response)
|
||||
}
|
||||
ackInterval = int(initMessage.AckInterval)
|
||||
stream.Send(response)
|
||||
} else {
|
||||
response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
|
||||
glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
|
||||
response.Error = fmt.Sprintf("missing init message")
|
||||
glog.Errorf("missing init message")
|
||||
return stream.Send(response)
|
||||
}
|
||||
|
||||
clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
|
||||
localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
|
||||
|
||||
ackCounter := 0
|
||||
var ackSequence int64
|
||||
var isStopping int32
|
||||
@@ -105,6 +74,7 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
|
||||
defer func() {
|
||||
atomic.StoreInt32(&isStopping, 1)
|
||||
close(respChan)
|
||||
localTopicPartition.Publishers.RemovePublisher(clientName)
|
||||
}()
|
||||
go func() {
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
@@ -127,6 +97,11 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
|
||||
} else {
|
||||
return
|
||||
}
|
||||
case <-localTopicPartition.StopPublishersCh:
|
||||
respChan <- &mq_pb.PublishResponse{
|
||||
AckSequence: ackSequence,
|
||||
ShouldClose: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -156,33 +131,22 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
|
||||
}
|
||||
}
|
||||
|
||||
glog.Infof("publish stream closed")
|
||||
glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
|
||||
func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
|
||||
ret := &mq_pb.AssignTopicPartitionsResponse{}
|
||||
self := pb.ServerAddress(fmt.Sprintf("%s:%d", broker.option.Ip, broker.option.Port))
|
||||
|
||||
for _, brokerPartition := range request.BrokerPartitionAssignments {
|
||||
localPartiton := topic.FromPbBrokerPartitionAssignment(self, brokerPartition)
|
||||
broker.localTopicManager.AddTopicPartition(
|
||||
topic.FromPbTopic(request.Topic),
|
||||
localPartiton)
|
||||
if request.IsLeader {
|
||||
for _, follower := range localPartiton.FollowerBrokers {
|
||||
err := pb.WithBrokerGrpcClient(false, follower.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
_, err := client.AssignTopicPartitions(context.Background(), request)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return ret, err
|
||||
}
|
||||
}
|
||||
}
|
||||
// duplicated from master_grpc_server.go
|
||||
func findClientAddress(ctx context.Context) string {
|
||||
// fmt.Printf("FromContext %+v\n", ctx)
|
||||
pr, ok := peer.FromContext(ctx)
|
||||
if !ok {
|
||||
glog.Error("failed to get peer from ctx")
|
||||
return ""
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
if pr.Addr == net.Addr(nil) {
|
||||
glog.Error("failed to get peer address")
|
||||
return ""
|
||||
}
|
||||
return pr.Addr.String()
|
||||
}
|
||||
|
||||
@@ -2,15 +2,15 @@ package broker
|
||||
|
||||
import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/balancer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// ConnectToBalancer receives connections from brokers and collects stats
|
||||
func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessaging_ConnectToBalancerServer) error {
|
||||
if !broker.lockAsBalancer.IsLocked() {
|
||||
// PublisherToPubBalancer receives connections from brokers and collects stats
|
||||
func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessaging_PublisherToPubBalancerServer) error {
|
||||
if !b.lockAsBalancer.IsLocked() {
|
||||
return status.Errorf(codes.Unavailable, "not current broker balancer")
|
||||
}
|
||||
req, err := stream.Recv()
|
||||
@@ -20,21 +20,14 @@ func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessagin
|
||||
|
||||
// process init message
|
||||
initMessage := req.GetInit()
|
||||
var brokerStats *balancer.BrokerStats
|
||||
var brokerStats *pub_balancer.BrokerStats
|
||||
if initMessage != nil {
|
||||
var found bool
|
||||
brokerStats, found = broker.Balancer.Brokers.Get(initMessage.Broker)
|
||||
if !found {
|
||||
brokerStats = balancer.NewBrokerStats()
|
||||
if !broker.Balancer.Brokers.SetIfAbsent(initMessage.Broker, brokerStats) {
|
||||
brokerStats, _ = broker.Balancer.Brokers.Get(initMessage.Broker)
|
||||
}
|
||||
}
|
||||
brokerStats = b.Balancer.OnBrokerConnected(initMessage.Broker)
|
||||
} else {
|
||||
return status.Errorf(codes.InvalidArgument, "balancer init message is empty")
|
||||
}
|
||||
defer func() {
|
||||
broker.Balancer.Brokers.Remove(initMessage.Broker)
|
||||
b.Balancer.OnBrokerDisconnected(initMessage.Broker, brokerStats)
|
||||
}()
|
||||
|
||||
// process stats message
|
||||
@@ -43,12 +36,11 @@ func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessagin
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !broker.lockAsBalancer.IsLocked() {
|
||||
if !b.lockAsBalancer.IsLocked() {
|
||||
return status.Errorf(codes.Unavailable, "not current broker balancer")
|
||||
}
|
||||
if receivedStats := req.GetStats(); receivedStats != nil {
|
||||
brokerStats.UpdateStats(receivedStats)
|
||||
|
||||
b.Balancer.OnBrokerStatsUpdated(initMessage.Broker, brokerStats, receivedStats)
|
||||
glog.V(4).Infof("broker %s stats: %+v", initMessage.Broker, brokerStats)
|
||||
glog.V(4).Infof("received stats: %+v", receivedStats)
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
@@ -9,10 +10,11 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error {
|
||||
func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error {
|
||||
|
||||
localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.GetInit().Topic),
|
||||
topic.FromPbPartition(req.GetInit().Partition))
|
||||
t := topic.FromPbTopic(req.GetInit().Topic)
|
||||
partition := topic.FromPbPartition(req.GetInit().Partition)
|
||||
localTopicPartition := b.localTopicManager.GetTopicPartition(t, partition)
|
||||
if localTopicPartition == nil {
|
||||
stream.Send(&mq_pb.SubscribeResponse{
|
||||
Message: &mq_pb.SubscribeResponse_Ctrl{
|
||||
@@ -25,13 +27,59 @@ func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream
|
||||
}
|
||||
|
||||
clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
|
||||
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
|
||||
glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
|
||||
isConnected := true
|
||||
sleepIntervalCount := 0
|
||||
defer func() {
|
||||
isConnected = false
|
||||
localTopicPartition.Subscribers.RemoveSubscriber(clientName)
|
||||
glog.V(0).Infof("Subscriber %s on %v %v disconnected", clientName, t, partition)
|
||||
}()
|
||||
|
||||
ctx := stream.Context()
|
||||
var startTime time.Time
|
||||
if startTs := req.GetInit().GetStartTimestampNs(); startTs > 0 {
|
||||
startTime = time.Unix(0, startTs)
|
||||
} else {
|
||||
startTime = time.Now()
|
||||
}
|
||||
|
||||
localTopicPartition.Subscribe(clientName, startTime, func() bool {
|
||||
if !isConnected {
|
||||
return false
|
||||
}
|
||||
sleepIntervalCount++
|
||||
if sleepIntervalCount > 10 {
|
||||
sleepIntervalCount = 10
|
||||
}
|
||||
time.Sleep(time.Duration(sleepIntervalCount) * 2339 * time.Millisecond)
|
||||
|
||||
// Check if the client has disconnected by monitoring the context
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err := ctx.Err()
|
||||
if err == context.Canceled {
|
||||
// Client disconnected
|
||||
return false
|
||||
}
|
||||
glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
|
||||
return false
|
||||
default:
|
||||
// Continue processing the request
|
||||
}
|
||||
|
||||
return true
|
||||
}, func(logEntry *filer_pb.LogEntry) error {
|
||||
// reset the sleep interval count
|
||||
sleepIntervalCount = 0
|
||||
|
||||
localTopicPartition.Subscribe(clientName, time.Now(), func(logEntry *filer_pb.LogEntry) error {
|
||||
value := logEntry.GetData()
|
||||
if err := stream.Send(&mq_pb.SubscribeResponse{Message: &mq_pb.SubscribeResponse_Data{
|
||||
Data: &mq_pb.DataMessage{
|
||||
Key: []byte(fmt.Sprintf("key-%d", logEntry.PartitionKeyHash)),
|
||||
Value: value,
|
||||
TsNs: logEntry.TsNs,
|
||||
},
|
||||
}}); err != nil {
|
||||
glog.Errorf("Error sending setup response: %v", err)
|
||||
|
||||
77
weed/mq/broker/broker_grpc_sub_coordinator.go
Normal file
77
weed/mq/broker/broker_grpc_sub_coordinator.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// SubscriberToSubCoordinator coordinates the subscribers
|
||||
func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMessaging_SubscriberToSubCoordinatorServer) error {
|
||||
if !b.lockAsBalancer.IsLocked() {
|
||||
return status.Errorf(codes.Unavailable, "not current broker balancer")
|
||||
}
|
||||
req, err := stream.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var cgi *sub_coordinator.ConsumerGroupInstance
|
||||
// process init message
|
||||
initMessage := req.GetInit()
|
||||
if initMessage != nil {
|
||||
cgi = b.Coordinator.AddSubscriber(initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic)
|
||||
glog.V(0).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic)
|
||||
} else {
|
||||
return status.Errorf(codes.InvalidArgument, "subscriber init message is empty")
|
||||
}
|
||||
defer func() {
|
||||
b.Coordinator.RemoveSubscriber(initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic)
|
||||
glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err)
|
||||
}()
|
||||
|
||||
ctx := stream.Context()
|
||||
|
||||
// process ack messages
|
||||
go func() {
|
||||
for {
|
||||
_, err := stream.Recv()
|
||||
if err != nil {
|
||||
glog.V(0).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err := ctx.Err()
|
||||
if err == context.Canceled {
|
||||
// Client disconnected
|
||||
return
|
||||
}
|
||||
return
|
||||
default:
|
||||
// Continue processing the request
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// send commands to subscriber
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err := ctx.Err()
|
||||
if err == context.Canceled {
|
||||
// Client disconnected
|
||||
return err
|
||||
}
|
||||
glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err)
|
||||
return err
|
||||
case message := <- cgi.ResponseChan:
|
||||
if err := stream.Send(message); err != nil {
|
||||
glog.V(0).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
28
weed/mq/broker/broker_grpc_topic_partition_control.go
Normal file
28
weed/mq/broker/broker_grpc_topic_partition_control.go
Normal file
@@ -0,0 +1,28 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
)
|
||||
|
||||
func (b *MessageQueueBroker) ClosePublishers(ctx context.Context, request *mq_pb.ClosePublishersRequest) (resp *mq_pb.ClosePublishersResponse, err error) {
|
||||
resp = &mq_pb.ClosePublishersResponse{}
|
||||
|
||||
t := topic.FromPbTopic(request.Topic)
|
||||
|
||||
b.localTopicManager.ClosePublishers(t, request.UnixTimeNs)
|
||||
|
||||
// wait until all publishers are closed
|
||||
b.localTopicManager.WaitUntilNoPublishers(t)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (b *MessageQueueBroker) CloseSubscribers(ctx context.Context, request *mq_pb.CloseSubscribersRequest) (resp *mq_pb.CloseSubscribersResponse, err error) {
|
||||
resp = &mq_pb.CloseSubscribersResponse{}
|
||||
|
||||
b.localTopicManager.CloseSubscribers(topic.FromPbTopic(request.Topic), request.UnixTimeNs)
|
||||
|
||||
return
|
||||
}
|
||||
@@ -1,89 +0,0 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
jsonpb "google.golang.org/protobuf/encoding/protojson"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *topic.Segment) (brokers []pb.ServerAddress, err error) {
|
||||
info, found, err := broker.readSegmentInfoOnFiler(segment)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
for _, b := range info.Brokers {
|
||||
brokers = append(brokers, pb.ServerAddress(b))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *topic.Segment, brokers []pb.ServerAddress) (err error) {
|
||||
var nodes []string
|
||||
for _, b := range brokers {
|
||||
nodes = append(nodes, string(b))
|
||||
}
|
||||
broker.saveSegmentInfoToFiler(segment, &mq_pb.SegmentInfo{
|
||||
Segment: segment.ToPbSegment(),
|
||||
StartTsNs: time.Now().UnixNano(),
|
||||
Brokers: nodes,
|
||||
StopTsNs: 0,
|
||||
PreviousSegments: nil,
|
||||
NextSegments: nil,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) readSegmentInfoOnFiler(segment *topic.Segment) (info *mq_pb.SegmentInfo, found bool, err error) {
|
||||
dir, name := segment.DirAndName()
|
||||
|
||||
found, err = filer_pb.Exists(broker, dir, name, false)
|
||||
if !found || err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = pb.WithFilerClient(false, 0, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
// read filer conf first
|
||||
data, err := filer.ReadInsideFiler(client, dir, name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ReadEntry: %v", err)
|
||||
}
|
||||
|
||||
// parse into filer conf object
|
||||
info = &mq_pb.SegmentInfo{}
|
||||
if err = jsonpb.Unmarshal(data, info); err != nil {
|
||||
return err
|
||||
}
|
||||
found = true
|
||||
return nil
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) saveSegmentInfoToFiler(segment *topic.Segment, info *mq_pb.SegmentInfo) (err error) {
|
||||
dir, name := segment.DirAndName()
|
||||
|
||||
var buf bytes.Buffer
|
||||
filer.ProtoToText(&buf, info)
|
||||
|
||||
err = pb.WithFilerClient(false, 0, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
||||
// read filer conf first
|
||||
err := filer.SaveInsideFiler(client, dir, name, buf.Bytes())
|
||||
if err != nil {
|
||||
return fmt.Errorf("save segment info: %v", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
@@ -3,7 +3,8 @@ package broker
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/balancer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
|
||||
"time"
|
||||
|
||||
@@ -37,20 +38,24 @@ type MessageQueueBroker struct {
|
||||
filers map[pb.ServerAddress]struct{}
|
||||
currentFiler pb.ServerAddress
|
||||
localTopicManager *topic.LocalTopicManager
|
||||
Balancer *balancer.Balancer
|
||||
Balancer *pub_balancer.Balancer
|
||||
lockAsBalancer *cluster.LiveLock
|
||||
currentBalancer pb.ServerAddress
|
||||
Coordinator *sub_coordinator.Coordinator
|
||||
}
|
||||
|
||||
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
|
||||
|
||||
pub_broker_balancer := pub_balancer.NewBalancer()
|
||||
|
||||
mqBroker = &MessageQueueBroker{
|
||||
option: option,
|
||||
grpcDialOption: grpcDialOption,
|
||||
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
|
||||
filers: make(map[pb.ServerAddress]struct{}),
|
||||
localTopicManager: topic.NewLocalTopicManager(),
|
||||
Balancer: balancer.NewBalancer(),
|
||||
Balancer: pub_broker_balancer,
|
||||
Coordinator: sub_coordinator.NewCoordinator(pub_broker_balancer),
|
||||
}
|
||||
mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
|
||||
|
||||
@@ -67,10 +72,10 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
|
||||
time.Sleep(time.Millisecond * 237)
|
||||
}
|
||||
self := fmt.Sprintf("%s:%d", option.Ip, option.Port)
|
||||
glog.V(1).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
|
||||
glog.V(0).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
|
||||
|
||||
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
|
||||
mqBroker.lockAsBalancer = lockClient.StartLock(balancer.LockBrokerBalancer, self)
|
||||
mqBroker.lockAsBalancer = lockClient.StartLock(pub_balancer.LockBrokerBalancer, self)
|
||||
for {
|
||||
err := mqBroker.BrokerConnectToBalancer(self)
|
||||
if err != nil {
|
||||
@@ -83,22 +88,22 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
|
||||
return mqBroker, nil
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
|
||||
func (b *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
|
||||
if update.NodeType != cluster.FilerType {
|
||||
return
|
||||
}
|
||||
|
||||
address := pb.ServerAddress(update.Address)
|
||||
if update.IsAdd {
|
||||
broker.filers[address] = struct{}{}
|
||||
if broker.currentFiler == "" {
|
||||
broker.currentFiler = address
|
||||
b.filers[address] = struct{}{}
|
||||
if b.currentFiler == "" {
|
||||
b.currentFiler = address
|
||||
}
|
||||
} else {
|
||||
delete(broker.filers, address)
|
||||
if broker.currentFiler == address {
|
||||
for filer := range broker.filers {
|
||||
broker.currentFiler = filer
|
||||
delete(b.filers, address)
|
||||
if b.currentFiler == address {
|
||||
for filer := range b.filers {
|
||||
b.currentFiler = filer
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -106,39 +111,39 @@ func (broker *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUp
|
||||
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) GetFiler() pb.ServerAddress {
|
||||
return broker.currentFiler
|
||||
func (b *MessageQueueBroker) GetFiler() pb.ServerAddress {
|
||||
return b.currentFiler
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
|
||||
func (b *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
|
||||
|
||||
return pb.WithFilerClient(streamingMode, 0, broker.GetFiler(), broker.grpcDialOption, fn)
|
||||
return pb.WithFilerClient(streamingMode, 0, b.GetFiler(), b.grpcDialOption, fn)
|
||||
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string {
|
||||
func (b *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string {
|
||||
|
||||
return location.Url
|
||||
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) GetDataCenter() string {
|
||||
func (b *MessageQueueBroker) GetDataCenter() string {
|
||||
|
||||
return ""
|
||||
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
|
||||
func (b *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
|
||||
|
||||
return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
|
||||
return pb.WithMasterClient(streamingMode, master, b.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
|
||||
return fn(client)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func (broker *MessageQueueBroker) withBrokerClient(streamingMode bool, server pb.ServerAddress, fn func(client mq_pb.SeaweedMessagingClient) error) error {
|
||||
func (b *MessageQueueBroker) withBrokerClient(streamingMode bool, server pb.ServerAddress, fn func(client mq_pb.SeaweedMessagingClient) error) error {
|
||||
|
||||
return pb.WithBrokerGrpcClient(streamingMode, server.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
return pb.WithBrokerGrpcClient(streamingMode, server.String(), b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
return fn(client)
|
||||
})
|
||||
|
||||
|
||||
@@ -4,21 +4,22 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/balancer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
|
||||
"io"
|
||||
"math/rand"
|
||||
"time"
|
||||
)
|
||||
|
||||
// BrokerConnectToBalancer connects to the broker balancer and sends stats
|
||||
func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
|
||||
func (b *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
|
||||
// find the lock owner
|
||||
var brokerBalancer string
|
||||
err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
err := b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
resp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
|
||||
Name: balancer.LockBrokerBalancer,
|
||||
Name: pub_balancer.LockBrokerBalancer,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -29,7 +30,7 @@ func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
broker.currentBalancer = pb.ServerAddress(brokerBalancer)
|
||||
b.currentBalancer = pb.ServerAddress(brokerBalancer)
|
||||
|
||||
glog.V(0).Infof("broker %s found balancer %s", self, brokerBalancer)
|
||||
if brokerBalancer == "" {
|
||||
@@ -37,15 +38,15 @@ func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
|
||||
}
|
||||
|
||||
// connect to the lock owner
|
||||
err = pb.WithBrokerGrpcClient(false, brokerBalancer, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
stream, err := client.ConnectToBalancer(context.Background())
|
||||
err = pb.WithBrokerGrpcClient(false, brokerBalancer, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
|
||||
stream, err := client.PublisherToPubBalancer(context.Background())
|
||||
if err != nil {
|
||||
return fmt.Errorf("connect to balancer %v: %v", brokerBalancer, err)
|
||||
}
|
||||
defer stream.CloseSend()
|
||||
err = stream.Send(&mq_pb.ConnectToBalancerRequest{
|
||||
Message: &mq_pb.ConnectToBalancerRequest_Init{
|
||||
Init: &mq_pb.ConnectToBalancerRequest_InitMessage{
|
||||
err = stream.Send(&mq_pb.PublisherToPubBalancerRequest{
|
||||
Message: &mq_pb.PublisherToPubBalancerRequest_Init{
|
||||
Init: &mq_pb.PublisherToPubBalancerRequest_InitMessage{
|
||||
Broker: self,
|
||||
},
|
||||
},
|
||||
@@ -55,13 +56,16 @@ func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
|
||||
}
|
||||
|
||||
for {
|
||||
stats := broker.localTopicManager.CollectStats(time.Second * 5)
|
||||
err = stream.Send(&mq_pb.ConnectToBalancerRequest{
|
||||
Message: &mq_pb.ConnectToBalancerRequest_Stats{
|
||||
stats := b.localTopicManager.CollectStats(time.Second * 5)
|
||||
err = stream.Send(&mq_pb.PublisherToPubBalancerRequest{
|
||||
Message: &mq_pb.PublisherToPubBalancerRequest_Stats{
|
||||
Stats: stats,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("send stats message: %v", err)
|
||||
}
|
||||
glog.V(3).Infof("sent stats: %+v", stats)
|
||||
|
||||
Reference in New Issue
Block a user