Merge branch 'mq-subscribe'

This commit is contained in:
chrislu
2024-03-16 11:42:23 -07:00
88 changed files with 5302 additions and 2368 deletions

View File

@@ -245,8 +245,8 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
fullpath := util.NewFullPath(req.Directory, req.EntryName)
lockClient := cluster.NewLockClient(fs.grpcDialOption, fs.option.Host)
lock := lockClient.NewLock(string(fullpath), string(fs.option.Host))
defer lock.StopLock()
lock := lockClient.NewShortLivedLock(string(fullpath), string(fs.option.Host))
defer lock.StopShortLivedLock()
var offset int64 = 0
entry, err := fs.filer.FindEntry(ctx, fullpath)

View File

@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -18,7 +19,7 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe
var movedTo pb.ServerAddress
expiredAtNs := time.Now().Add(time.Duration(req.SecondsToLock) * time.Second).UnixNano()
resp.RenewToken, movedTo, err = fs.filer.Dlm.LockWithTimeout(req.Name, expiredAtNs, req.RenewToken, req.Owner)
resp.LockOwner, resp.RenewToken, movedTo, err = fs.filer.Dlm.LockWithTimeout(req.Name, expiredAtNs, req.RenewToken, req.Owner)
glog.V(3).Infof("lock %s %v %v %v, isMoved=%v %v", req.Name, req.SecondsToLock, req.RenewToken, req.Owner, req.IsMoved, movedTo)
if movedTo != "" && movedTo != fs.option.Host && !req.IsMoved {
err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
@@ -31,7 +32,7 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe
})
if err == nil {
resp.RenewToken = secondResp.RenewToken
} else {
resp.LockOwner = secondResp.LockOwner
resp.Error = secondResp.Error
}
return err
@@ -42,7 +43,7 @@ func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRe
resp.Error = fmt.Sprintf("%v", err)
}
if movedTo != "" {
resp.MovedTo = string(movedTo)
resp.LockHostMovedTo = string(movedTo)
}
return resp, nil
@@ -81,10 +82,7 @@ func (fs *FilerServer) DistributedUnlock(ctx context.Context, req *filer_pb.Unlo
func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLockOwnerRequest) (*filer_pb.FindLockOwnerResponse, error) {
owner, movedTo, err := fs.filer.Dlm.FindLockOwner(req.Name)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if !req.IsMoved && movedTo != "" {
if !req.IsMoved && movedTo != "" || err == lock_manager.LockNotFound {
err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
secondResp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
Name: req.Name,
@@ -100,6 +98,15 @@ func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLock
return nil, err
}
}
if owner == "" {
glog.V(0).Infof("find lock %s moved to %v: %v", req.Name, movedTo, err)
return nil, status.Error(codes.NotFound, fmt.Sprintf("lock %s not found", req.Name))
}
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &filer_pb.FindLockOwnerResponse{
Owner: owner,
}, nil

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/stats"
"strings"
"sync/atomic"
"time"
"google.golang.org/protobuf/proto"
@@ -32,7 +33,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
}
defer fs.deleteClient("", clientName, req.ClientId, req.ClientEpoch)
lastReadTime := time.Unix(0, req.SinceNs)
lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2)
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
@@ -57,7 +58,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
}
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
@@ -113,7 +114,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
fs.deleteClient("local", clientName, req.ClientId, req.ClientEpoch)
}()
lastReadTime := time.Unix(0, req.SinceNs)
lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2)
glog.V(0).Infof(" + %v local subscribe %s from %+v clientId:%d", clientName, req.PathPrefix, lastReadTime, req.ClientId)
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
@@ -138,7 +139,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
} else {
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
time.Sleep(1127 * time.Millisecond)
@@ -150,7 +151,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
fs.listenersLock.Lock()
atomic.AddInt64(&fs.listenersWaits, 1)
fs.listenersCond.Wait()
atomic.AddInt64(&fs.listenersWaits, -1)
fs.listenersLock.Unlock()
if !fs.hasClient(req.ClientId, req.ClientEpoch) {
return false
@@ -178,19 +181,19 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
}
func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) func(logEntry *filer_pb.LogEntry) error {
return func(logEntry *filer_pb.LogEntry) error {
func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) log_buffer.EachLogEntryFuncType {
return func(logEntry *filer_pb.LogEntry) (bool, error) {
event := &filer_pb.SubscribeMetadataResponse{}
if err := proto.Unmarshal(logEntry.Data, event); err != nil {
glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
return false, fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
}
if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil {
return err
return false, err
}
return nil
return false, nil
}
}

View File

@@ -7,6 +7,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/stats"
@@ -76,7 +77,13 @@ type FilerOption struct {
}
type FilerServer struct {
inFlightDataSize int64
inFlightDataSize int64
listenersWaits int64
// notifying clients
listenersLock sync.Mutex
listenersCond *sync.Cond
inFlightDataLimitCond *sync.Cond
filer_pb.UnimplementedSeaweedFilerServer
@@ -90,10 +97,6 @@ type FilerServer struct {
metricsAddress string
metricsIntervalSec int
// notifying clients
listenersLock sync.Mutex
listenersCond *sync.Cond
// track known metadata listeners
knownListenersLock sync.Mutex
knownListeners map[int32]int32
@@ -135,7 +138,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
v.SetDefault("filer.options.max_file_name_length", 255)
maxFilenameLength := v.GetUint32("filer.options.max_file_name_length")
fs.filer = filer.NewFiler(*option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, maxFilenameLength, func() {
fs.listenersCond.Broadcast()
if atomic.LoadInt64(&fs.listenersWaits) > 0 {
fs.listenersCond.Broadcast()
}
})
fs.filer.Cipher = option.Cipher
// we do not support IP whitelist right now