renaming
This commit is contained in:
130
weed/mq/broker/broker_append.go
Normal file
130
weed/mq/broker/broker_append.go
Normal file
@@ -0,0 +1,130 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/security"
|
||||
"io"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *mq_pb.TopicConfiguration, data []byte) error {
|
||||
|
||||
assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
|
||||
dir, name := util.FullPath(targetFile).DirAndName()
|
||||
|
||||
// append the chunk
|
||||
if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
|
||||
request := &filer_pb.AppendToEntryRequest{
|
||||
Directory: dir,
|
||||
EntryName: name,
|
||||
Chunks: []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(assignResult.Fid, 0)},
|
||||
}
|
||||
|
||||
_, err := client.AppendToEntry(context.Background(), request)
|
||||
if err != nil {
|
||||
glog.V(0).Infof("append to file %v: %v", request, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return fmt.Errorf("append to file %v: %v", targetFile, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (broker *MessageBroker) assignAndUpload(topicConfig *mq_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
|
||||
|
||||
var assignResult = &operation.AssignResult{}
|
||||
|
||||
// assign a volume location
|
||||
if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
|
||||
assignErr := util.Retry("assignVolume", func() error {
|
||||
request := &filer_pb.AssignVolumeRequest{
|
||||
Count: 1,
|
||||
Replication: topicConfig.Replication,
|
||||
Collection: topicConfig.Collection,
|
||||
}
|
||||
|
||||
resp, err := client.AssignVolume(context.Background(), request)
|
||||
if err != nil {
|
||||
glog.V(0).Infof("assign volume failure %v: %v", request, err)
|
||||
return err
|
||||
}
|
||||
if resp.Error != "" {
|
||||
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
|
||||
}
|
||||
|
||||
assignResult.Auth = security.EncodedJwt(resp.Auth)
|
||||
assignResult.Fid = resp.FileId
|
||||
assignResult.Url = resp.Location.Url
|
||||
assignResult.PublicUrl = resp.Location.PublicUrl
|
||||
assignResult.GrpcPort = int(resp.Location.GrpcPort)
|
||||
assignResult.Count = uint64(resp.Count)
|
||||
|
||||
return nil
|
||||
})
|
||||
if assignErr != nil {
|
||||
return assignErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// upload data
|
||||
targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
|
||||
uploadOption := &operation.UploadOption{
|
||||
UploadUrl: targetUrl,
|
||||
Filename: "",
|
||||
Cipher: broker.option.Cipher,
|
||||
IsInputCompressed: false,
|
||||
MimeType: "",
|
||||
PairMap: nil,
|
||||
Jwt: assignResult.Auth,
|
||||
}
|
||||
uploadResult, err := operation.UploadData(data, uploadOption)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
|
||||
}
|
||||
// println("uploaded to", targetUrl)
|
||||
return assignResult, uploadResult, nil
|
||||
}
|
||||
|
||||
var _ = filer_pb.FilerClient(&MessageBroker{})
|
||||
|
||||
func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
|
||||
|
||||
for _, filer := range broker.option.Filers {
|
||||
if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil {
|
||||
if err == io.EOF {
|
||||
return
|
||||
}
|
||||
glog.V(0).Infof("fail to connect to %s: %v", filer, err)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string {
|
||||
return location.Url
|
||||
}
|
||||
37
weed/mq/broker/broker_grpc_server.go
Normal file
37
weed/mq/broker/broker_grpc_server.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||
)
|
||||
|
||||
func (broker *MessageBroker) ConfigureTopic(c context.Context, request *mq_pb.ConfigureTopicRequest) (*mq_pb.ConfigureTopicResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (broker *MessageBroker) DeleteTopic(c context.Context, request *mq_pb.DeleteTopicRequest) (*mq_pb.DeleteTopicResponse, error) {
|
||||
resp := &mq_pb.DeleteTopicResponse{}
|
||||
dir, entry := genTopicDirEntry(request.Namespace, request.Topic)
|
||||
if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil {
|
||||
return nil, err
|
||||
} else if exists {
|
||||
err = filer_pb.Remove(broker, dir, entry, true, true, true, false, nil)
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *mq_pb.GetTopicConfigurationRequest) (*mq_pb.GetTopicConfigurationResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func genTopicDir(namespace, topic string) string {
|
||||
return fmt.Sprintf("%s/%s/%s", filer.TopicsDir, namespace, topic)
|
||||
}
|
||||
|
||||
func genTopicDirEntry(namespace, topic string) (dir, entry string) {
|
||||
return fmt.Sprintf("%s/%s", filer.TopicsDir, namespace), topic
|
||||
}
|
||||
122
weed/mq/broker/broker_grpc_server_discovery.go
Normal file
122
weed/mq/broker/broker_grpc_server_discovery.go
Normal file
@@ -0,0 +1,122 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/cluster"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||
)
|
||||
|
||||
/*
|
||||
Topic discovery:
|
||||
|
||||
When pub or sub connects, it ask for the whole broker list, and run consistent hashing to find the broker.
|
||||
|
||||
The broker will check peers whether it is already hosted by some other broker, if that broker is alive and acknowledged alive, redirect to it.
|
||||
Otherwise, just host the topic.
|
||||
|
||||
So, if the pub or sub connects around the same time, they would connect to the same broker. Everyone is happy.
|
||||
If one of the pub or sub connects very late, and the system topo changed quite a bit with new servers added or old servers died, checking peers will help.
|
||||
|
||||
*/
|
||||
|
||||
func (broker *MessageBroker) FindBroker(c context.Context, request *mq_pb.FindBrokerRequest) (*mq_pb.FindBrokerResponse, error) {
|
||||
|
||||
t := &mq_pb.FindBrokerResponse{}
|
||||
var peers []string
|
||||
|
||||
targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition)
|
||||
|
||||
for _, filer := range broker.option.Filers {
|
||||
err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
|
||||
resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{
|
||||
Resource: targetTopicPartition,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if resp.Found && len(resp.Resources) > 0 {
|
||||
t.Broker = resp.Resources[0].GrpcAddresses
|
||||
return nil
|
||||
}
|
||||
for _, b := range resp.Resources {
|
||||
peers = append(peers, b.GrpcAddresses)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
t.Broker = PickMember(peers, []byte(targetTopicPartition))
|
||||
|
||||
return t, nil
|
||||
|
||||
}
|
||||
|
||||
func (broker *MessageBroker) checkFilers() {
|
||||
|
||||
// contact a filer about masters
|
||||
var masters []pb.ServerAddress
|
||||
found := false
|
||||
for !found {
|
||||
for _, filer := range broker.option.Filers {
|
||||
err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
|
||||
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, m := range resp.Masters {
|
||||
masters = append(masters, pb.ServerAddress(m))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err == nil {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
glog.V(0).Infof("received master list: %s", masters)
|
||||
|
||||
// contact each masters for filers
|
||||
var filers []pb.ServerAddress
|
||||
found = false
|
||||
for !found {
|
||||
for _, master := range masters {
|
||||
err := broker.withMasterClient(false, master, func(client master_pb.SeaweedClient) error {
|
||||
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
|
||||
ClientType: cluster.FilerType,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, clusterNode := range resp.ClusterNodes {
|
||||
filers = append(filers, pb.ServerAddress(clusterNode.Address))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err == nil {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
glog.V(0).Infof("failed to list filers: %v", err)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
glog.V(0).Infof("received filer list: %s", filers)
|
||||
|
||||
broker.option.Filers = filers
|
||||
|
||||
}
|
||||
112
weed/mq/broker/broker_grpc_server_publish.go
Normal file
112
weed/mq/broker/broker_grpc_server_publish.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||
)
|
||||
|
||||
func (broker *MessageBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
|
||||
|
||||
// process initial request
|
||||
in, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO look it up
|
||||
topicConfig := &mq_pb.TopicConfiguration{
|
||||
// IsTransient: true,
|
||||
}
|
||||
|
||||
// send init response
|
||||
initResponse := &mq_pb.PublishResponse{
|
||||
Config: nil,
|
||||
Redirect: nil,
|
||||
}
|
||||
err = stream.Send(initResponse)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if initResponse.Redirect != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// get lock
|
||||
tp := TopicPartition{
|
||||
Namespace: in.Init.Namespace,
|
||||
Topic: in.Init.Topic,
|
||||
Partition: in.Init.Partition,
|
||||
}
|
||||
|
||||
tpDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, tp.Namespace, tp.Topic)
|
||||
md5File := fmt.Sprintf("p%02d.md5", tp.Partition)
|
||||
// println("chan data stored under", tpDir, "as", md5File)
|
||||
|
||||
if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists {
|
||||
return fmt.Errorf("channel is already closed")
|
||||
}
|
||||
|
||||
tl := broker.topicManager.RequestLock(tp, topicConfig, true)
|
||||
defer broker.topicManager.ReleaseLock(tp, true)
|
||||
|
||||
md5hash := md5.New()
|
||||
// process each message
|
||||
for {
|
||||
// println("recv")
|
||||
in, err := stream.Recv()
|
||||
// glog.V(0).Infof("recieved %v err: %v", in, err)
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if in.Data == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// fmt.Printf("received: %d : %s\n", len(in.Data.Value), string(in.Data.Value))
|
||||
|
||||
data, err := proto.Marshal(in.Data)
|
||||
if err != nil {
|
||||
glog.Errorf("marshall error: %v\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs)
|
||||
|
||||
if in.Data.IsClose {
|
||||
// println("server received closing")
|
||||
break
|
||||
}
|
||||
|
||||
md5hash.Write(in.Data.Value)
|
||||
|
||||
}
|
||||
|
||||
if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil {
|
||||
glog.V(0).Infof("err writing %s: %v", md5File, err)
|
||||
}
|
||||
|
||||
// fmt.Printf("received md5 %X\n", md5hash.Sum(nil))
|
||||
|
||||
// send the close ack
|
||||
// println("server send ack closing")
|
||||
if err := stream.Send(&mq_pb.PublishResponse{IsClosed: true}); err != nil {
|
||||
glog.V(0).Infof("err sending close response: %v", err)
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
178
weed/mq/broker/broker_grpc_server_subscribe.go
Normal file
178
weed/mq/broker/broker_grpc_server_subscribe.go
Normal file
@@ -0,0 +1,178 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||
)
|
||||
|
||||
func (broker *MessageBroker) Subscribe(stream mq_pb.SeaweedMessaging_SubscribeServer) error {
|
||||
|
||||
// process initial request
|
||||
in, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var processedTsNs int64
|
||||
var messageCount int64
|
||||
subscriberId := in.Init.SubscriberId
|
||||
|
||||
// TODO look it up
|
||||
topicConfig := &mq_pb.TopicConfiguration{
|
||||
// IsTransient: true,
|
||||
}
|
||||
|
||||
// get lock
|
||||
tp := TopicPartition{
|
||||
Namespace: in.Init.Namespace,
|
||||
Topic: in.Init.Topic,
|
||||
Partition: in.Init.Partition,
|
||||
}
|
||||
fmt.Printf("+ subscriber %s for %s\n", subscriberId, tp.String())
|
||||
defer func() {
|
||||
fmt.Printf("- subscriber %s for %s %d messages last %v\n", subscriberId, tp.String(), messageCount, time.Unix(0, processedTsNs))
|
||||
}()
|
||||
|
||||
lock := broker.topicManager.RequestLock(tp, topicConfig, false)
|
||||
defer broker.topicManager.ReleaseLock(tp, false)
|
||||
|
||||
isConnected := true
|
||||
go func() {
|
||||
for isConnected {
|
||||
if _, err := stream.Recv(); err != nil {
|
||||
// println("disconnecting connection to", subscriberId, tp.String())
|
||||
isConnected = false
|
||||
lock.cond.Signal()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
lastReadTime := time.Now()
|
||||
switch in.Init.StartPosition {
|
||||
case mq_pb.SubscriberMessage_InitMessage_TIMESTAMP:
|
||||
lastReadTime = time.Unix(0, in.Init.TimestampNs)
|
||||
case mq_pb.SubscriberMessage_InitMessage_LATEST:
|
||||
case mq_pb.SubscriberMessage_InitMessage_EARLIEST:
|
||||
lastReadTime = time.Unix(0, 0)
|
||||
}
|
||||
|
||||
// how to process each message
|
||||
// an error returned will end the subscription
|
||||
eachMessageFn := func(m *mq_pb.Message) error {
|
||||
err := stream.Send(&mq_pb.BrokerMessage{
|
||||
Data: m,
|
||||
})
|
||||
if err != nil {
|
||||
glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error {
|
||||
m := &mq_pb.Message{}
|
||||
if err = proto.Unmarshal(logEntry.Data, m); err != nil {
|
||||
glog.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
|
||||
return err
|
||||
}
|
||||
// fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs)
|
||||
if err = eachMessageFn(m); err != nil {
|
||||
glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
|
||||
return err
|
||||
}
|
||||
if m.IsClose {
|
||||
// println("processed EOF")
|
||||
return io.EOF
|
||||
}
|
||||
processedTsNs = logEntry.TsNs
|
||||
messageCount++
|
||||
return nil
|
||||
}
|
||||
|
||||
// fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime)
|
||||
|
||||
for {
|
||||
|
||||
if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
|
||||
if err != io.EOF {
|
||||
// println("stopping from persisted logs", err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if processedTsNs != 0 {
|
||||
lastReadTime = time.Unix(0, processedTsNs)
|
||||
}
|
||||
|
||||
lastReadTime, _, err = lock.logBuffer.LoopProcessLogData("broker", lastReadTime, 0, func() bool {
|
||||
lock.Mutex.Lock()
|
||||
lock.cond.Wait()
|
||||
lock.Mutex.Unlock()
|
||||
return isConnected
|
||||
}, eachLogEntryFn)
|
||||
if err != nil {
|
||||
if err == log_buffer.ResumeFromDiskError {
|
||||
continue
|
||||
}
|
||||
glog.Errorf("processed to %v: %v", lastReadTime, err)
|
||||
if err != log_buffer.ResumeError {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) {
|
||||
startTime = startTime.UTC()
|
||||
startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
|
||||
startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
|
||||
|
||||
sizeBuf := make([]byte, 4)
|
||||
startTsNs := startTime.UnixNano()
|
||||
|
||||
topicDir := genTopicDir(tp.Namespace, tp.Topic)
|
||||
partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition)
|
||||
|
||||
return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error {
|
||||
dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name)
|
||||
return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error {
|
||||
if dayEntry.Name == startDate {
|
||||
hourMinute := util.FileNameBase(hourMinuteEntry.Name)
|
||||
if strings.Compare(hourMinute, startHourMinute) < 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix) {
|
||||
return nil
|
||||
}
|
||||
// println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name)
|
||||
chunkedFileReader := filer.NewChunkStreamReader(broker, hourMinuteEntry.Chunks)
|
||||
defer chunkedFileReader.Close()
|
||||
if _, err := filer.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, 0, eachLogEntryFn); err != nil {
|
||||
chunkedFileReader.Close()
|
||||
if err == io.EOF {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err)
|
||||
}
|
||||
return nil
|
||||
}, "", false, 24*60)
|
||||
}, startDate, true, 366)
|
||||
|
||||
}
|
||||
116
weed/mq/broker/broker_server.go
Normal file
116
weed/mq/broker/broker_server.go
Normal file
@@ -0,0 +1,116 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
)
|
||||
|
||||
type MessageBrokerOption struct {
|
||||
Filers []pb.ServerAddress
|
||||
DefaultReplication string
|
||||
MaxMB int
|
||||
Ip string
|
||||
Port int
|
||||
Cipher bool
|
||||
}
|
||||
|
||||
type MessageBroker struct {
|
||||
mq_pb.UnimplementedSeaweedMessagingServer
|
||||
option *MessageBrokerOption
|
||||
grpcDialOption grpc.DialOption
|
||||
topicManager *TopicManager
|
||||
}
|
||||
|
||||
func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) {
|
||||
|
||||
messageBroker = &MessageBroker{
|
||||
option: option,
|
||||
grpcDialOption: grpcDialOption,
|
||||
}
|
||||
|
||||
messageBroker.topicManager = NewTopicManager(messageBroker)
|
||||
|
||||
messageBroker.checkFilers()
|
||||
|
||||
go messageBroker.keepConnectedToOneFiler()
|
||||
|
||||
return messageBroker, nil
|
||||
}
|
||||
|
||||
func (broker *MessageBroker) keepConnectedToOneFiler() {
|
||||
|
||||
for {
|
||||
for _, filer := range broker.option.Filers {
|
||||
broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
stream, err := client.KeepConnected(ctx)
|
||||
if err != nil {
|
||||
glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
|
||||
return err
|
||||
}
|
||||
|
||||
initRequest := &filer_pb.KeepConnectedRequest{
|
||||
Name: broker.option.Ip,
|
||||
GrpcPort: uint32(broker.option.Port),
|
||||
}
|
||||
for _, tp := range broker.topicManager.ListTopicPartitions() {
|
||||
initRequest.Resources = append(initRequest.Resources, tp.String())
|
||||
}
|
||||
if err := stream.Send(&filer_pb.KeepConnectedRequest{
|
||||
Name: broker.option.Ip,
|
||||
GrpcPort: uint32(broker.option.Port),
|
||||
}); err != nil {
|
||||
glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO send events of adding/removing topics
|
||||
|
||||
glog.V(0).Infof("conntected with filer: %v", filer)
|
||||
for {
|
||||
if err := stream.Send(&filer_pb.KeepConnectedRequest{
|
||||
Name: broker.option.Ip,
|
||||
GrpcPort: uint32(broker.option.Port),
|
||||
}); err != nil {
|
||||
glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err)
|
||||
return err
|
||||
}
|
||||
// println("send heartbeat")
|
||||
if _, err := stream.Recv(); err != nil {
|
||||
glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err)
|
||||
return err
|
||||
}
|
||||
// println("received reply")
|
||||
time.Sleep(11 * time.Second)
|
||||
// println("woke up")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (broker *MessageBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
|
||||
|
||||
return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)
|
||||
|
||||
}
|
||||
|
||||
func (broker *MessageBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
|
||||
|
||||
return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
|
||||
return fn(client)
|
||||
})
|
||||
|
||||
}
|
||||
38
weed/mq/broker/consistent_distribution.go
Normal file
38
weed/mq/broker/consistent_distribution.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"github.com/buraksezer/consistent"
|
||||
"github.com/cespare/xxhash"
|
||||
)
|
||||
|
||||
type Member string
|
||||
|
||||
func (m Member) String() string {
|
||||
return string(m)
|
||||
}
|
||||
|
||||
type hasher struct{}
|
||||
|
||||
func (h hasher) Sum64(data []byte) uint64 {
|
||||
return xxhash.Sum64(data)
|
||||
}
|
||||
|
||||
func PickMember(members []string, key []byte) string {
|
||||
cfg := consistent.Config{
|
||||
PartitionCount: 9791,
|
||||
ReplicationFactor: 2,
|
||||
Load: 1.25,
|
||||
Hasher: hasher{},
|
||||
}
|
||||
|
||||
cmembers := []consistent.Member{}
|
||||
for _, m := range members {
|
||||
cmembers = append(cmembers, Member(m))
|
||||
}
|
||||
|
||||
c := consistent.New(cmembers, cfg)
|
||||
|
||||
m := c.LocateKey(key)
|
||||
|
||||
return m.String()
|
||||
}
|
||||
32
weed/mq/broker/consistent_distribution_test.go
Normal file
32
weed/mq/broker/consistent_distribution_test.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestPickMember(t *testing.T) {
|
||||
|
||||
servers := []string{
|
||||
"s1:port",
|
||||
"s2:port",
|
||||
"s3:port",
|
||||
"s5:port",
|
||||
"s4:port",
|
||||
}
|
||||
|
||||
total := 1000
|
||||
|
||||
distribution := make(map[string]int)
|
||||
for i := 0; i < total; i++ {
|
||||
tp := fmt.Sprintf("tp:%2d", i)
|
||||
m := PickMember(servers, []byte(tp))
|
||||
// println(tp, "=>", m)
|
||||
distribution[m]++
|
||||
}
|
||||
|
||||
for member, count := range distribution {
|
||||
fmt.Printf("member: %s, key count: %d load=%.2f\n", member, count, float64(count*100)/float64(total/len(servers)))
|
||||
}
|
||||
|
||||
}
|
||||
124
weed/mq/broker/topic_manager.go
Normal file
124
weed/mq/broker/topic_manager.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
|
||||
)
|
||||
|
||||
type TopicPartition struct {
|
||||
Namespace string
|
||||
Topic string
|
||||
Partition int32
|
||||
}
|
||||
|
||||
const (
|
||||
TopicPartitionFmt = "%s/%s_%02d"
|
||||
)
|
||||
|
||||
func (tp *TopicPartition) String() string {
|
||||
return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition)
|
||||
}
|
||||
|
||||
type TopicControl struct {
|
||||
sync.Mutex
|
||||
cond *sync.Cond
|
||||
subscriberCount int
|
||||
publisherCount int
|
||||
logBuffer *log_buffer.LogBuffer
|
||||
}
|
||||
|
||||
type TopicManager struct {
|
||||
sync.Mutex
|
||||
topicControls map[TopicPartition]*TopicControl
|
||||
broker *MessageBroker
|
||||
}
|
||||
|
||||
func NewTopicManager(messageBroker *MessageBroker) *TopicManager {
|
||||
return &TopicManager{
|
||||
topicControls: make(map[TopicPartition]*TopicControl),
|
||||
broker: messageBroker,
|
||||
}
|
||||
}
|
||||
|
||||
func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *mq_pb.TopicConfiguration) *log_buffer.LogBuffer {
|
||||
|
||||
flushFn := func(startTime, stopTime time.Time, buf []byte) {
|
||||
|
||||
if topicConfig.IsTransient {
|
||||
// return
|
||||
}
|
||||
|
||||
// fmt.Printf("flushing with topic config %+v\n", topicConfig)
|
||||
|
||||
startTime, stopTime = startTime.UTC(), stopTime.UTC()
|
||||
targetFile := fmt.Sprintf(
|
||||
"%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
|
||||
filer.TopicsDir, tp.Namespace, tp.Topic,
|
||||
startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
|
||||
tp.Partition,
|
||||
)
|
||||
|
||||
if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil {
|
||||
glog.V(0).Infof("log write failed %s: %v", targetFile, err)
|
||||
}
|
||||
}
|
||||
logBuffer := log_buffer.NewLogBuffer("broker", time.Minute, flushFn, func() {
|
||||
tl.cond.Broadcast()
|
||||
})
|
||||
|
||||
return logBuffer
|
||||
}
|
||||
|
||||
func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *mq_pb.TopicConfiguration, isPublisher bool) *TopicControl {
|
||||
tm.Lock()
|
||||
defer tm.Unlock()
|
||||
|
||||
tc, found := tm.topicControls[partition]
|
||||
if !found {
|
||||
tc = &TopicControl{}
|
||||
tc.cond = sync.NewCond(&tc.Mutex)
|
||||
tm.topicControls[partition] = tc
|
||||
tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig)
|
||||
}
|
||||
if isPublisher {
|
||||
tc.publisherCount++
|
||||
} else {
|
||||
tc.subscriberCount++
|
||||
}
|
||||
return tc
|
||||
}
|
||||
|
||||
func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) {
|
||||
tm.Lock()
|
||||
defer tm.Unlock()
|
||||
|
||||
lock, found := tm.topicControls[partition]
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
if isPublisher {
|
||||
lock.publisherCount--
|
||||
} else {
|
||||
lock.subscriberCount--
|
||||
}
|
||||
if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
|
||||
delete(tm.topicControls, partition)
|
||||
lock.logBuffer.Shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) {
|
||||
tm.Lock()
|
||||
defer tm.Unlock()
|
||||
|
||||
for k := range tm.topicControls {
|
||||
tps = append(tps, k)
|
||||
}
|
||||
return
|
||||
}
|
||||
5
weed/mq/msgclient/chan_config.go
Normal file
5
weed/mq/msgclient/chan_config.go
Normal file
@@ -0,0 +1,5 @@
|
||||
package msgclient
|
||||
|
||||
func (mc *MessagingClient) DeleteChannel(chanName string) error {
|
||||
return mc.DeleteTopic("chan", chanName)
|
||||
}
|
||||
76
weed/mq/msgclient/chan_pub.go
Normal file
76
weed/mq/msgclient/chan_pub.go
Normal file
@@ -0,0 +1,76 @@
|
||||
package msgclient
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"hash"
|
||||
"io"
|
||||
"log"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/mq/broker"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||
)
|
||||
|
||||
type PubChannel struct {
|
||||
client mq_pb.SeaweedMessaging_PublishClient
|
||||
grpcConnection *grpc.ClientConn
|
||||
md5hash hash.Hash
|
||||
}
|
||||
|
||||
func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
|
||||
tp := broker.TopicPartition{
|
||||
Namespace: "chan",
|
||||
Topic: chanName,
|
||||
Partition: 0,
|
||||
}
|
||||
grpcConnection, err := mc.findBroker(tp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pc, err := setupPublisherClient(grpcConnection, tp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &PubChannel{
|
||||
client: pc,
|
||||
grpcConnection: grpcConnection,
|
||||
md5hash: md5.New(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (pc *PubChannel) Publish(m []byte) error {
|
||||
err := pc.client.Send(&mq_pb.PublishRequest{
|
||||
Data: &mq_pb.Message{
|
||||
Value: m,
|
||||
},
|
||||
})
|
||||
if err == nil {
|
||||
pc.md5hash.Write(m)
|
||||
}
|
||||
return err
|
||||
}
|
||||
func (pc *PubChannel) Close() error {
|
||||
|
||||
// println("send closing")
|
||||
if err := pc.client.Send(&mq_pb.PublishRequest{
|
||||
Data: &mq_pb.Message{
|
||||
IsClose: true,
|
||||
},
|
||||
}); err != nil {
|
||||
log.Printf("err send close: %v", err)
|
||||
}
|
||||
// println("receive closing")
|
||||
if _, err := pc.client.Recv(); err != nil && err != io.EOF {
|
||||
log.Printf("err receive close: %v", err)
|
||||
}
|
||||
// println("close connection")
|
||||
if err := pc.grpcConnection.Close(); err != nil {
|
||||
log.Printf("err connection close: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pc *PubChannel) Md5() []byte {
|
||||
return pc.md5hash.Sum(nil)
|
||||
}
|
||||
85
weed/mq/msgclient/chan_sub.go
Normal file
85
weed/mq/msgclient/chan_sub.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package msgclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"hash"
|
||||
"io"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/mq/broker"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||
)
|
||||
|
||||
type SubChannel struct {
|
||||
ch chan []byte
|
||||
stream mq_pb.SeaweedMessaging_SubscribeClient
|
||||
md5hash hash.Hash
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) {
|
||||
tp := broker.TopicPartition{
|
||||
Namespace: "chan",
|
||||
Topic: chanName,
|
||||
Partition: 0,
|
||||
}
|
||||
grpcConnection, err := mc.findBroker(tp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t := &SubChannel{
|
||||
ch: make(chan []byte),
|
||||
stream: sc,
|
||||
md5hash: md5.New(),
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
resp, subErr := t.stream.Recv()
|
||||
if subErr == io.EOF {
|
||||
return
|
||||
}
|
||||
if subErr != nil {
|
||||
log.Printf("fail to receive from netchan %s: %v", chanName, subErr)
|
||||
return
|
||||
}
|
||||
if resp.Data == nil {
|
||||
// this could be heartbeat from broker
|
||||
continue
|
||||
}
|
||||
if resp.Data.IsClose {
|
||||
t.stream.Send(&mq_pb.SubscriberMessage{
|
||||
IsClose: true,
|
||||
})
|
||||
close(t.ch)
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
t.ch <- resp.Data.Value
|
||||
t.md5hash.Write(resp.Data.Value)
|
||||
}
|
||||
}()
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (sc *SubChannel) Channel() chan []byte {
|
||||
return sc.ch
|
||||
}
|
||||
|
||||
func (sc *SubChannel) Md5() []byte {
|
||||
return sc.md5hash.Sum(nil)
|
||||
}
|
||||
|
||||
func (sc *SubChannel) Cancel() {
|
||||
sc.cancel()
|
||||
}
|
||||
55
weed/mq/msgclient/client.go
Normal file
55
weed/mq/msgclient/client.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package msgclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/mq/broker"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/security"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
type MessagingClient struct {
|
||||
bootstrapBrokers []string
|
||||
grpcConnections map[broker.TopicPartition]*grpc.ClientConn
|
||||
grpcDialOption grpc.DialOption
|
||||
}
|
||||
|
||||
func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient {
|
||||
return &MessagingClient{
|
||||
bootstrapBrokers: bootstrapBrokers,
|
||||
grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn),
|
||||
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"),
|
||||
}
|
||||
}
|
||||
|
||||
func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) {
|
||||
|
||||
for _, broker := range mc.bootstrapBrokers {
|
||||
grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
|
||||
if err != nil {
|
||||
log.Printf("dial broker %s: %v", broker, err)
|
||||
continue
|
||||
}
|
||||
defer grpcConnection.Close()
|
||||
|
||||
resp, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(),
|
||||
&mq_pb.FindBrokerRequest{
|
||||
Namespace: tp.Namespace,
|
||||
Topic: tp.Topic,
|
||||
Parition: tp.Partition,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
targetBroker := resp.Broker
|
||||
return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption)
|
||||
}
|
||||
return nil, fmt.Errorf("no broker found for %+v", tp)
|
||||
}
|
||||
63
weed/mq/msgclient/config.go
Normal file
63
weed/mq/msgclient/config.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package msgclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/mq/broker"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||
)
|
||||
|
||||
func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error {
|
||||
|
||||
return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error {
|
||||
_, err := client.ConfigureTopic(context.Background(),
|
||||
&mq_pb.ConfigureTopicRequest{
|
||||
Namespace: tp.Namespace,
|
||||
Topic: tp.Topic,
|
||||
Configuration: &mq_pb.TopicConfiguration{
|
||||
PartitionCount: 0,
|
||||
Collection: "",
|
||||
Replication: "",
|
||||
IsTransient: false,
|
||||
Partitoning: 0,
|
||||
},
|
||||
})
|
||||
return err
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func (mc *MessagingClient) DeleteTopic(namespace, topic string) error {
|
||||
|
||||
return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error {
|
||||
_, err := client.DeleteTopic(context.Background(),
|
||||
&mq_pb.DeleteTopicRequest{
|
||||
Namespace: namespace,
|
||||
Topic: topic,
|
||||
})
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (mc *MessagingClient) withAnyBroker(fn func(client mq_pb.SeaweedMessagingClient) error) error {
|
||||
|
||||
var lastErr error
|
||||
for _, broker := range mc.bootstrapBrokers {
|
||||
grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
|
||||
if err != nil {
|
||||
log.Printf("dial broker %s: %v", broker, err)
|
||||
continue
|
||||
}
|
||||
defer grpcConnection.Close()
|
||||
|
||||
err = fn(mq_pb.NewSeaweedMessagingClient(grpcConnection))
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
lastErr = err
|
||||
}
|
||||
|
||||
return lastErr
|
||||
}
|
||||
118
weed/mq/msgclient/publisher.go
Normal file
118
weed/mq/msgclient/publisher.go
Normal file
@@ -0,0 +1,118 @@
|
||||
package msgclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OneOfOne/xxhash"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/mq/broker"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||
)
|
||||
|
||||
type Publisher struct {
|
||||
publishClients []mq_pb.SeaweedMessaging_PublishClient
|
||||
topicConfiguration *mq_pb.TopicConfiguration
|
||||
messageCount uint64
|
||||
publisherId string
|
||||
}
|
||||
|
||||
func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
|
||||
// read topic configuration
|
||||
topicConfiguration := &mq_pb.TopicConfiguration{
|
||||
PartitionCount: 4,
|
||||
}
|
||||
publishClients := make([]mq_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
|
||||
for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
|
||||
tp := broker.TopicPartition{
|
||||
Namespace: namespace,
|
||||
Topic: topic,
|
||||
Partition: int32(i),
|
||||
}
|
||||
grpcClientConn, err := mc.findBroker(tp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client, err := setupPublisherClient(grpcClientConn, tp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
publishClients[i] = client
|
||||
}
|
||||
return &Publisher{
|
||||
publishClients: publishClients,
|
||||
topicConfiguration: topicConfiguration,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (mq_pb.SeaweedMessaging_PublishClient, error) {
|
||||
|
||||
stream, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// send init message
|
||||
err = stream.Send(&mq_pb.PublishRequest{
|
||||
Init: &mq_pb.PublishRequest_InitMessage{
|
||||
Namespace: tp.Namespace,
|
||||
Topic: tp.Topic,
|
||||
Partition: tp.Partition,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// process init response
|
||||
initResponse, err := stream.Recv()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if initResponse.Redirect != nil {
|
||||
// TODO follow redirection
|
||||
}
|
||||
if initResponse.Config != nil {
|
||||
}
|
||||
|
||||
// setup looks for control messages
|
||||
doneChan := make(chan error, 1)
|
||||
go func() {
|
||||
for {
|
||||
in, err := stream.Recv()
|
||||
if err != nil {
|
||||
doneChan <- err
|
||||
return
|
||||
}
|
||||
if in.Redirect != nil {
|
||||
}
|
||||
if in.Config != nil {
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return stream, nil
|
||||
|
||||
}
|
||||
|
||||
func (p *Publisher) Publish(m *mq_pb.Message) error {
|
||||
hashValue := p.messageCount
|
||||
p.messageCount++
|
||||
if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_NonNullKeyHash {
|
||||
if m.Key != nil {
|
||||
hashValue = xxhash.Checksum64(m.Key)
|
||||
}
|
||||
} else if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_KeyHash {
|
||||
hashValue = xxhash.Checksum64(m.Key)
|
||||
} else {
|
||||
// round robin
|
||||
}
|
||||
|
||||
idx := int(hashValue) % len(p.publishClients)
|
||||
if idx < 0 {
|
||||
idx += len(p.publishClients)
|
||||
}
|
||||
return p.publishClients[idx].Send(&mq_pb.PublishRequest{
|
||||
Data: m,
|
||||
})
|
||||
}
|
||||
120
weed/mq/msgclient/subscriber.go
Normal file
120
weed/mq/msgclient/subscriber.go
Normal file
@@ -0,0 +1,120 @@
|
||||
package msgclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/mq/broker"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type Subscriber struct {
|
||||
subscriberClients []mq_pb.SeaweedMessaging_SubscribeClient
|
||||
subscriberCancels []context.CancelFunc
|
||||
subscriberId string
|
||||
}
|
||||
|
||||
func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) {
|
||||
// read topic configuration
|
||||
topicConfiguration := &mq_pb.TopicConfiguration{
|
||||
PartitionCount: 4,
|
||||
}
|
||||
subscriberClients := make([]mq_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
|
||||
subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount)
|
||||
|
||||
for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
|
||||
if partitionId >= 0 && i != partitionId {
|
||||
continue
|
||||
}
|
||||
tp := broker.TopicPartition{
|
||||
Namespace: namespace,
|
||||
Topic: topic,
|
||||
Partition: int32(i),
|
||||
}
|
||||
grpcClientConn, err := mc.findBroker(tp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
client, err := setupSubscriberClient(ctx, grpcClientConn, tp, subscriberId, startTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
subscriberClients[i] = client
|
||||
subscriberCancels[i] = cancel
|
||||
}
|
||||
|
||||
return &Subscriber{
|
||||
subscriberClients: subscriberClients,
|
||||
subscriberCancels: subscriberCancels,
|
||||
subscriberId: subscriberId,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream mq_pb.SeaweedMessaging_SubscribeClient, err error) {
|
||||
stream, err = mq_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// send init message
|
||||
err = stream.Send(&mq_pb.SubscriberMessage{
|
||||
Init: &mq_pb.SubscriberMessage_InitMessage{
|
||||
Namespace: tp.Namespace,
|
||||
Topic: tp.Topic,
|
||||
Partition: tp.Partition,
|
||||
StartPosition: mq_pb.SubscriberMessage_InitMessage_TIMESTAMP,
|
||||
TimestampNs: startTime.UnixNano(),
|
||||
SubscriberId: subscriberId,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
func doSubscribe(subscriberClient mq_pb.SeaweedMessaging_SubscribeClient, processFn func(m *mq_pb.Message)) error {
|
||||
for {
|
||||
resp, listenErr := subscriberClient.Recv()
|
||||
if listenErr == io.EOF {
|
||||
return nil
|
||||
}
|
||||
if listenErr != nil {
|
||||
println(listenErr.Error())
|
||||
return listenErr
|
||||
}
|
||||
if resp.Data == nil {
|
||||
// this could be heartbeat from broker
|
||||
continue
|
||||
}
|
||||
processFn(resp.Data)
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe starts goroutines to process the messages
|
||||
func (s *Subscriber) Subscribe(processFn func(m *mq_pb.Message)) {
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < len(s.subscriberClients); i++ {
|
||||
if s.subscriberClients[i] != nil {
|
||||
wg.Add(1)
|
||||
go func(subscriberClient mq_pb.SeaweedMessaging_SubscribeClient) {
|
||||
defer wg.Done()
|
||||
doSubscribe(subscriberClient, processFn)
|
||||
}(s.subscriberClients[i])
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (s *Subscriber) Shutdown() {
|
||||
for i := 0; i < len(s.subscriberClients); i++ {
|
||||
if s.subscriberCancels[i] != nil {
|
||||
s.subscriberCancels[i]()
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user