improve worm support (#5983)
* improve worm support Signed-off-by: lou <alex1988@outlook.com> * worm mode in filer Signed-off-by: lou <alex1988@outlook.com> * update after review Signed-off-by: lou <alex1988@outlook.com> * update after review Signed-off-by: lou <alex1988@outlook.com> * move to fs configure Signed-off-by: lou <alex1988@outlook.com> * remove flag Signed-off-by: lou <alex1988@outlook.com> * update after review Signed-off-by: lou <alex1988@outlook.com> * support worm hardlink Signed-off-by: lou <alex1988@outlook.com> * update after review Signed-off-by: lou <alex1988@outlook.com> * typo Signed-off-by: lou <alex1988@outlook.com> * sync filer conf Signed-off-by: lou <alex1988@outlook.com> --------- Signed-off-by: lou <alex1988@outlook.com>
This commit is contained in:
118
weed/mount/filer_conf.go
Normal file
118
weed/mount/filer_conf.go
Normal file
@@ -0,0 +1,118 @@
|
||||
package mount
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
func (wfs *WFS) subscribeFilerConfEvents() (func(), error) {
|
||||
now := time.Now()
|
||||
confDir := filer.DirectoryEtcSeaweedFS
|
||||
confName := filer.FilerConfName
|
||||
confFullName := filepath.Join(filer.DirectoryEtcSeaweedFS, filer.FilerConfName)
|
||||
|
||||
// read current conf
|
||||
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
content, err := filer.ReadInsideFiler(client, confDir, confName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fc := filer.NewFilerConf()
|
||||
if len(content) > 0 {
|
||||
if err := fc.LoadFromBytes(content); err != nil {
|
||||
return fmt.Errorf("parse %s: %v", confFullName, err)
|
||||
}
|
||||
}
|
||||
|
||||
wfs.FilerConf = fc
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||
message := resp.EventNotification
|
||||
if message.NewEntry == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
dir := resp.Directory
|
||||
name := resp.EventNotification.NewEntry.Name
|
||||
|
||||
if dir != confDir || name != confName {
|
||||
return nil
|
||||
}
|
||||
|
||||
content := message.NewEntry.Content
|
||||
fc := filer.NewFilerConf()
|
||||
if len(content) > 0 {
|
||||
if err = fc.LoadFromBytes(content); err != nil {
|
||||
return fmt.Errorf("parse %s: %v", confFullName, err)
|
||||
}
|
||||
}
|
||||
|
||||
wfs.FilerConf = fc
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
metadataFollowOption := &pb.MetadataFollowOption{
|
||||
ClientName: "fuse",
|
||||
ClientId: wfs.signature,
|
||||
ClientEpoch: 1,
|
||||
SelfSignature: 0,
|
||||
PathPrefix: confFullName,
|
||||
AdditionalPathPrefixes: nil,
|
||||
StartTsNs: now.UnixNano(),
|
||||
StopTsNs: 0,
|
||||
EventErrorType: pb.FatalOnError,
|
||||
}
|
||||
|
||||
return func() {
|
||||
// sync new conf changes
|
||||
util.RetryUntil("followFilerConfChanges", func() error {
|
||||
metadataFollowOption.ClientEpoch++
|
||||
i := atomic.LoadInt32(&wfs.option.filerIndex)
|
||||
n := len(wfs.option.FilerAddresses)
|
||||
err = pb.FollowMetadata(wfs.option.FilerAddresses[i], wfs.option.GrpcDialOption, metadataFollowOption, processEventFn)
|
||||
if err == nil {
|
||||
atomic.StoreInt32(&wfs.option.filerIndex, i)
|
||||
return nil
|
||||
}
|
||||
|
||||
i++
|
||||
if i >= int32(n) {
|
||||
i = 0
|
||||
}
|
||||
|
||||
return err
|
||||
}, func(err error) bool {
|
||||
glog.V(0).Infof("fuse follow filer conf changes: %v", err)
|
||||
return true
|
||||
})
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (wfs *WFS) wormEnabledForEntry(path util.FullPath, entry *filer_pb.Entry) bool {
|
||||
if entry == nil || entry.Attributes == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
rule := wfs.FilerConf.MatchStorageRule(string(path))
|
||||
if !rule.Worm {
|
||||
return false
|
||||
}
|
||||
|
||||
return entry.Attributes.FileSize > 0 || entry.Attributes.Crtime != entry.Attributes.Mtime
|
||||
}
|
||||
@@ -47,8 +47,6 @@ type Option struct {
|
||||
Quota int64
|
||||
DisableXAttr bool
|
||||
|
||||
WriteOnceReadMany bool
|
||||
|
||||
MountUid uint32
|
||||
MountGid uint32
|
||||
MountMode os.FileMode
|
||||
@@ -82,6 +80,7 @@ type WFS struct {
|
||||
fuseServer *fuse.Server
|
||||
IsOverQuota bool
|
||||
fhLockTable *util.LockTable[FileHandleId]
|
||||
FilerConf *filer.FilerConf
|
||||
}
|
||||
|
||||
func NewSeaweedFileSystem(option *Option) *WFS {
|
||||
@@ -141,10 +140,19 @@ func NewSeaweedFileSystem(option *Option) *WFS {
|
||||
return wfs
|
||||
}
|
||||
|
||||
func (wfs *WFS) StartBackgroundTasks() {
|
||||
func (wfs *WFS) StartBackgroundTasks() error {
|
||||
fn, err := wfs.subscribeFilerConfEvents()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go fn()
|
||||
|
||||
startTime := time.Now()
|
||||
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
|
||||
go wfs.loopCheckQuota()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wfs *WFS) String() string {
|
||||
|
||||
@@ -65,6 +65,7 @@ func (wfs *WFS) Open(cancel <-chan struct{}, in *fuse.OpenIn, out *fuse.OpenOut)
|
||||
fileHandle, status = wfs.AcquireHandle(in.NodeId, in.Flags, in.Uid, in.Gid)
|
||||
if status == fuse.OK {
|
||||
out.Fh = uint64(fileHandle.fh)
|
||||
out.OpenFlags = in.Flags
|
||||
// TODO https://github.com/libfuse/libfuse/blob/master/include/fuse_common.h#L64
|
||||
}
|
||||
return status
|
||||
|
||||
@@ -130,6 +130,10 @@ func (wfs *WFS) Unlink(cancel <-chan struct{}, header *fuse.InHeader, name strin
|
||||
return code
|
||||
}
|
||||
|
||||
if wfs.wormEnabledForEntry(entryFullPath, entry) {
|
||||
return fuse.EPERM
|
||||
}
|
||||
|
||||
// first, ensure the filer store can correctly delete
|
||||
glog.V(3).Infof("remove file: %v", entryFullPath)
|
||||
isDeleteData := entry != nil && entry.HardLinkCounter <= 1
|
||||
|
||||
@@ -3,13 +3,14 @@ package mount
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/hanwen/go-fuse/v2/fuse"
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
/**
|
||||
@@ -128,9 +129,6 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
|
||||
if entry.Attributes.Gid == 0 {
|
||||
entry.Attributes.Gid = gid
|
||||
}
|
||||
if entry.Attributes.Crtime == 0 {
|
||||
entry.Attributes.Crtime = time.Now().Unix()
|
||||
}
|
||||
entry.Attributes.Mtime = time.Now().Unix()
|
||||
}
|
||||
|
||||
|
||||
@@ -3,19 +3,16 @@ package mount
|
||||
import (
|
||||
"github.com/hanwen/go-fuse/v2/fuse"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"time"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
func (wfs *WFS) AcquireHandle(inode uint64, flags, uid, gid uint32) (fileHandle *FileHandle, status fuse.Status) {
|
||||
var entry *filer_pb.Entry
|
||||
_, _, entry, status = wfs.maybeReadEntry(inode)
|
||||
var path util.FullPath
|
||||
path, _, entry, status = wfs.maybeReadEntry(inode)
|
||||
if status == fuse.OK {
|
||||
if entry != nil && wfs.option.WriteOnceReadMany {
|
||||
if entry.Attributes.Mtime+10 < time.Now().Unix() {
|
||||
if flags&fuse.O_ANYWRITE != 0 {
|
||||
return nil, fuse.EPERM
|
||||
}
|
||||
}
|
||||
if wfs.wormEnabledForEntry(path, entry) && flags&fuse.O_ANYWRITE != 0 {
|
||||
return nil, fuse.EPERM
|
||||
}
|
||||
// need to AcquireFileHandle again to ensure correct handle counter
|
||||
fileHandle = wfs.fhMap.AcquireFileHandle(wfs, inode, entry)
|
||||
|
||||
@@ -25,7 +25,6 @@ When creating a link:
|
||||
|
||||
/** Create a hard link to a file */
|
||||
func (wfs *WFS) Link(cancel <-chan struct{}, in *fuse.LinkIn, name string, out *fuse.EntryOut) (code fuse.Status) {
|
||||
|
||||
if wfs.IsOverQuota {
|
||||
return fuse.Status(syscall.ENOSPC)
|
||||
}
|
||||
@@ -49,6 +48,11 @@ func (wfs *WFS) Link(cancel <-chan struct{}, in *fuse.LinkIn, name string, out *
|
||||
return status
|
||||
}
|
||||
|
||||
// hardlink is not allowed in WORM mode
|
||||
if wfs.wormEnabledForEntry(oldEntryPath, oldEntry) {
|
||||
return fuse.EPERM
|
||||
}
|
||||
|
||||
// update old file to hardlink mode
|
||||
if len(oldEntry.HardLinkId) == 0 {
|
||||
oldEntry.HardLinkId = filer.NewHardLinkId()
|
||||
|
||||
Reference in New Issue
Block a user