add broker connects to filer

This commit is contained in:
Chris Lu
2020-05-05 02:05:28 -07:00
parent 47234760f4
commit 1e3e4b3072
17 changed files with 761 additions and 327 deletions

View File

@@ -0,0 +1,87 @@
package broker
import (
"context"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)
/*
Topic discovery:
When pub or sub connects, it ask for the whole broker list, and run consistent hashing to find the broker.
The broker will check peers whether it is already hosted by some other broker, if that broker is alive and acknowledged alive, redirect to it.
Otherwise, just host the topic.
So, if the pub or sub connects around the same time, they would connect to the same broker. Everyone is happy.
If one of the pub or sub connects very late, and the system topo changed quite a bit with new servers added or old servers died, checking peers will help.
*/
func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) {
panic("implement me")
}
func (broker *MessageBroker) checkPeers() {
// contact a filer about masters
var masters []string
found := false
for !found {
for _, filer := range broker.option.Filers {
err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
}
masters = append(masters, resp.Masters...)
return nil
})
if err == nil {
found = true
break
}
glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
time.Sleep(time.Second)
}
}
glog.V(0).Infof("received master list: %s", masters)
// contact each masters for filers
var filers []string
found = false
for !found {
for _, master := range masters {
err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
ClientType: "filer",
})
if err != nil {
return err
}
filers = append(filers, resp.GrpcAddresses...)
return nil
})
if err == nil {
found = true
break
}
glog.V(0).Infof("failed to list filers: %v", err)
time.Sleep(time.Second)
}
}
glog.V(0).Infof("received filer list: %s", filers)
broker.option.Filers = filers
}

View File

@@ -38,7 +38,6 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
}
if err = stream.Send(&messaging_pb.BrokerMessage{
Redirect: nil,
}); err != nil {
return err
}

View File

@@ -16,6 +16,7 @@ type MessageBrokerOption struct {
Filers []string
DefaultReplication string
MaxMB int
Ip string
Port int
Cipher bool
}
@@ -37,73 +38,44 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio
messageBroker.checkPeers()
// go messageBroker.loopForEver()
go messageBroker.keepConnectedToOneFiler()
return messageBroker, nil
}
func (broker *MessageBroker) loopForEver() {
func (broker *MessageBroker) keepConnectedToOneFiler() {
for {
broker.checkPeers()
time.Sleep(3 * time.Second)
}
}
func (broker *MessageBroker) checkPeers() {
// contact a filer about masters
var masters []string
found := false
for !found {
for _, filer := range broker.option.Filers {
err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
stream, err := client.KeepConnected(context.Background())
if err != nil {
glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
return err
}
masters = append(masters, resp.Masters...)
return nil
})
if err == nil {
found = true
break
}
glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
time.Sleep(time.Second)
}
}
glog.V(0).Infof("received master list: %s", masters)
// contact each masters for filers
var filers []string
found = false
for !found {
for _, master := range masters {
err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
ClientType: "filer",
})
if err != nil {
return err
glog.V(0).Infof("conntected with filer: %v", filer)
for {
if err := stream.Send(&filer_pb.KeepConnectedRequest{
Name: broker.option.Ip,
GrpcPort: uint32(broker.option.Port),
}); err != nil {
glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err)
return err
}
// println("send heartbeat")
if _, err := stream.Recv(); err != nil {
glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err)
return err
}
// println("received reply")
time.Sleep(11*time.Second)
// println("woke up")
}
filers = append(filers, resp.GrpcAddresses...)
return nil
})
if err == nil {
found = true
break
}
glog.V(0).Infof("failed to list filers: %v", err)
time.Sleep(time.Second)
time.Sleep(3*time.Second)
}
}
glog.V(0).Infof("received filer list: %s", filers)
broker.option.Filers = filers
}

View File

@@ -1,30 +0,0 @@
package client
import (
"context"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
type MessagingClient struct {
bootstrapBrokers []string
grpcConnection *grpc.ClientConn
}
func NewMessagingClient(bootstrapBrokers []string) (*MessagingClient, error) {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_client")
grpcConnection, err := pb.GrpcDial(context.Background(), "localhost:17777", grpcDialOption)
if err != nil {
return nil, err
}
return &MessagingClient{
bootstrapBrokers: bootstrapBrokers,
grpcConnection: grpcConnection,
}, nil
}

View File

@@ -0,0 +1,56 @@
package msgclient
import (
"context"
"fmt"
"log"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/messaging/broker"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
type MessagingClient struct {
bootstrapBrokers []string
grpcConnections map[broker.TopicPartition]*grpc.ClientConn
grpcDialOption grpc.DialOption
}
func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient {
return &MessagingClient{
bootstrapBrokers: bootstrapBrokers,
grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn),
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"),
}
}
func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) {
for _, broker := range mc.bootstrapBrokers {
grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
if err != nil {
log.Printf("dial broker %s: %v", broker, err)
continue
}
defer grpcConnection.Close()
resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(),
&messaging_pb.FindBrokerRequest{
Namespace: tp.Namespace,
Topic: tp.Topic,
Parition: tp.Partition,
})
if err != nil {
return nil, err
}
targetBroker := resp.Broker
return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption)
}
return nil, fmt.Errorf("no broker found for %+v", tp)
}

View File

@@ -0,0 +1,96 @@
package msgclient
import (
"io"
"log"
"time"
"github.com/chrislusf/seaweedfs/weed/messaging/broker"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)
type PubChannel struct {
client messaging_pb.SeaweedMessaging_PublishClient
}
func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
tp := broker.TopicPartition{
Namespace: "chan",
Topic: chanName,
Partition: 0,
}
grpcConnection, err := mc.findBroker(tp)
if err != nil {
return nil, err
}
pc, err := setupPublisherClient(grpcConnection, tp)
if err != nil {
return nil, err
}
return &PubChannel{
client: pc,
}, nil
}
func (pc *PubChannel) Publish(m []byte) error {
return pc.client.Send(&messaging_pb.PublishRequest{
Data: &messaging_pb.Message{
Value: m,
},
})
}
func (pc *PubChannel) Close() error {
return pc.client.CloseSend()
}
type SubChannel struct {
ch chan []byte
stream messaging_pb.SeaweedMessaging_SubscribeClient
}
func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) {
tp := broker.TopicPartition{
Namespace: "chan",
Topic: chanName,
Partition: 0,
}
grpcConnection, err := mc.findBroker(tp)
if err != nil {
return nil, err
}
sc, err := setupSubscriberClient(grpcConnection, "", "chan", chanName, 0, time.Unix(0,0))
if err != nil {
return nil, err
}
t := &SubChannel{
ch: make(chan []byte),
stream: sc,
}
go func() {
for {
resp, subErr := t.stream.Recv()
if subErr == io.EOF {
return
}
if subErr != nil {
log.Printf("fail to receive from netchan %s: %v", chanName, subErr)
return
}
if resp.IsClose {
close(t.ch)
return
}
if resp.Data != nil {
t.ch <- resp.Data.Value
}
}
}()
return t, nil
}
func (sc *SubChannel) Channel() chan []byte {
return sc.ch
}

View File

@@ -1,10 +1,12 @@
package client
package msgclient
import (
"context"
"github.com/OneOfOne/xxhash"
"github.com/chrislusf/seaweedfs/weed/messaging/broker"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)
@@ -34,9 +36,9 @@ func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*
}, nil
}
func (mc *MessagingClient) setupPublisherClient(namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_PublishClient, error) {
func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) {
stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Publish(context.Background())
stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
if err != nil {
return nil, err
}
@@ -44,9 +46,9 @@ func (mc *MessagingClient) setupPublisherClient(namespace, topic string, partiti
// send init message
err = stream.Send(&messaging_pb.PublishRequest{
Init: &messaging_pb.PublishRequest_InitMessage{
Namespace: namespace,
Topic: topic,
Partition: partition,
Namespace: tp.Namespace,
Topic: tp.Topic,
Partition: tp.Partition,
},
})
if err != nil {

View File

@@ -1,4 +1,4 @@
package client
package msgclient
import (
"context"
@@ -36,9 +36,22 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string,
func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) {
stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background())
stream, newBroker, err := mc.initSubscriberClient(subscriberId, namespace, topic, partition, startTime)
if err != nil {
return nil, err
return client, err
}
if newBroker != nil {
}
return stream, nil
}
func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, namespace string, topic string, partition int32, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) {
stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(context.Background())
if err != nil {
return
}
// send init message
@@ -53,20 +66,18 @@ func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic
},
})
if err != nil {
return nil, err
return
}
// process init response
initResponse, err := stream.Recv()
if err != nil {
return nil, err
return
}
if initResponse.Redirect != nil {
// TODO follow redirection
}
return stream, nil
}
func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error {