Files
seaweedFS/weed/command/mount_std.go
Chris Lu 3d872e86f8 Implement POSIX file locking for FUSE mount (#8750)
* Add POSIX byte-range lock table for FUSE mount

Implement PosixLockTable with per-inode range lock tracking supporting:
- Shared (F_RDLCK) and exclusive (F_WRLCK) byte-range locks
- Conflict detection across different lock owners
- Lock coalescing for adjacent/overlapping same-owner same-type locks
- Lock splitting on partial-range unlock
- Blocking waiter support for SetLkw with cancellation
- Owner-based cleanup for Release

* Wire POSIX lock handlers into FUSE mount

Implement GetLk, SetLk, SetLkw on WFS delegating to PosixLockTable.
Add posixLocks field to WFS and initialize in constructor.
Clean up locks on Release via ReleaseOwner using ReleaseIn.LockOwner.
Remove ENOSYS stubs from weedfs_unsupported.go.

* Enable POSIX and flock lock capabilities in FUSE mount

Set EnableLocks: true in mount options to advertise
CAP_POSIX_LOCKS and CAP_FLOCK_LOCKS during FUSE INIT.

* Avoid thundering herd in lock waiter wake-up

Replace broadcast-all wakeWaiters with selective wakeEligibleWaiters
that checks each waiter's requested lock against remaining held locks.
Only waiters whose request no longer conflicts are woken; others stay
queued. Store the requested lockRange in each lockWaiter to enable this.

* Fix uint64 overflow in adjacency check for lock coalescing

Guard h.End+1 and lk.End+1 with < ^uint64(0) checks so that
End == math.MaxUint64 (EOF) does not wrap to 0 and falsely merge
non-adjacent locks.

* Add test for non-adjacent ranges with gap not being coalesced
2026-03-23 22:18:51 -07:00

410 lines
13 KiB
Go

//go:build linux || darwin || freebsd
package command
import (
"context"
"fmt"
"net"
"net/http"
"os"
"os/user"
"path"
"runtime"
"strconv"
"strings"
"syscall"
"time"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/seaweedfs/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mount"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/mount/unmount"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"google.golang.org/grpc/reflection"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
)
func runMount(cmd *Command, args []string) bool {
if *mountOptions.debug {
go http.ListenAndServe(fmt.Sprintf(":%d", *mountOptions.debugPort), nil)
}
grace.SetupProfiling(*mountCpuProfile, *mountMemProfile)
if *mountReadRetryTime < time.Second {
*mountReadRetryTime = time.Second
}
util.RetryWaitTime = *mountReadRetryTime
umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64)
if umaskErr != nil {
fmt.Printf("can not parse umask %s", *mountOptions.umaskString)
return false
}
if len(args) > 0 {
return false
}
return RunMount(&mountOptions, os.FileMode(umask))
}
func ensureBucketAllowEmptyFolders(ctx context.Context, filerClient filer_pb.FilerClient, mountRoot, bucketRootPath string) error {
bucketPath, isBucketRootMount := bucketPathForMountRoot(mountRoot, bucketRootPath)
if !isBucketRootMount {
return nil
}
entry, err := filer_pb.GetEntry(ctx, filerClient, util.FullPath(bucketPath))
if err != nil {
return err
}
if entry == nil {
return fmt.Errorf("bucket %s not found", bucketPath)
}
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
if strings.EqualFold(strings.TrimSpace(string(entry.Extended[s3_constants.ExtAllowEmptyFolders])), "true") {
return nil
}
entry.Extended[s3_constants.ExtAllowEmptyFolders] = []byte("true")
bucketFullPath := util.FullPath(bucketPath)
parent, _ := bucketFullPath.DirAndName()
if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{
Directory: parent,
Entry: entry,
})
}); err != nil {
return err
}
glog.V(3).Infof("RunMount: set bucket %s %s=true", bucketPath, s3_constants.ExtAllowEmptyFolders)
return nil
}
func bucketPathForMountRoot(mountRoot, bucketRootPath string) (string, bool) {
cleanPath := path.Clean("/" + strings.TrimPrefix(mountRoot, "/"))
cleanBucketRoot := path.Clean("/" + strings.TrimPrefix(bucketRootPath, "/"))
if cleanBucketRoot == "/" {
return "", false
}
prefix := cleanBucketRoot + "/"
if !strings.HasPrefix(cleanPath, prefix) {
return "", false
}
rest := strings.TrimPrefix(cleanPath, prefix)
bucketParts := strings.Split(rest, "/")
if len(bucketParts) != 1 || bucketParts[0] == "" {
return "", false
}
return cleanBucketRoot + "/" + bucketParts[0], true
}
func RunMount(option *MountOptions, umask os.FileMode) bool {
// basic checks
chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
if chunkSizeLimitMB <= 0 {
fmt.Printf("Please specify a reasonable buffer size.\n")
return false
}
// try to connect to filer
filerAddresses := pb.ServerAddresses(*option.filer).ToAddresses()
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
var bucketRootPath string
var err error
for i := 0; i < 10; i++ {
err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer grpc address %v configuration: %w", filerAddresses, err)
}
cipher = resp.Cipher
bucketRootPath = resp.DirBuckets
return nil
})
if err != nil {
glog.V(0).Infof("failed to talk to filer %v: %v", filerAddresses, err)
glog.V(0).Infof("wait for %d seconds ...", i+1)
time.Sleep(time.Duration(i+1) * time.Second)
}
}
if err != nil {
glog.Errorf("failed to talk to filer %v: %v", filerAddresses, err)
return true
}
if bucketRootPath == "" {
bucketRootPath = "/buckets"
}
filerMountRootPath := *option.filerMountRootPath
// clean up mount point
dir := util.ResolvePath(*option.dir)
if dir == "" {
fmt.Printf("Please specify the mount directory via \"-dir\"")
return false
}
unmount.Unmount(dir)
// start on local unix socket
if *option.localSocket == "" {
mountDirHash := util.HashToInt32([]byte(dir))
if mountDirHash < 0 {
mountDirHash = -mountDirHash
}
*option.localSocket = fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", mountDirHash)
}
if err := os.Remove(*option.localSocket); err != nil && !os.IsNotExist(err) {
glog.Fatalf("Failed to remove %s, error: %s", *option.localSocket, err.Error())
}
montSocketListener, err := net.Listen("unix", *option.localSocket)
if err != nil {
glog.Fatalf("Failed to listen on %s: %v", *option.localSocket, err)
}
// detect mount folder mode
if *option.dirAutoCreate {
os.MkdirAll(dir, os.FileMode(0777)&^umask)
}
fileInfo, err := os.Stat(dir)
// collect uid, gid
uid, gid := uint32(0), uint32(0)
mountMode := os.ModeDir | 0777
if err == nil {
mountMode = os.ModeDir | os.FileMode(0777)&^umask
uid, gid = util.GetFileUidGid(fileInfo)
fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, mountMode)
} else {
fmt.Printf("can not stat %s\n", dir)
return false
}
// detect uid, gid
if uid == 0 {
if u, err := user.Current(); err == nil {
if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil {
uid = uint32(parsedId)
}
if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil {
gid = uint32(parsedId)
}
fmt.Printf("current uid=%d gid=%d\n", uid, gid)
}
}
// mapping uid, gid
uidGidMapper, err := meta_cache.NewUidGidMapper(*option.uidMap, *option.gidMap)
if err != nil {
fmt.Printf("failed to parse %s %s: %v\n", *option.uidMap, *option.gidMap, err)
return false
}
// Ensure target mount point availability
skipAutofs := option.hasAutofs != nil && *option.hasAutofs
if isValid := checkMountPointAvailable(dir, skipAutofs); !isValid {
glog.Fatalf("Target mount point is not available: %s, please check!", dir)
return true
}
serverFriendlyName := strings.ReplaceAll(*option.filer, ",", "+")
// When autofs/systemd-mount is used, FsName must be "fuse" so util-linux/mount can recognize
// it as a pseudo filesystem. Otherwise, preserve the descriptive name for mount/df output.
fsName := serverFriendlyName + ":" + filerMountRootPath
if skipAutofs {
fsName = "fuse"
}
// mount fuse
fuseMountOptions := &fuse.MountOptions{
AllowOther: *option.allowOthers,
Options: option.extraOptions,
MaxBackground: 128,
MaxWrite: 1024 * 1024 * 2,
MaxReadAhead: 1024 * 1024 * 2,
IgnoreSecurityLabels: false,
RememberInodes: false,
FsName: fsName,
Name: "seaweedfs",
SingleThreaded: false,
DisableXAttrs: *option.disableXAttr,
Debug: *option.debug,
EnableLocks: true,
ExplicitDataCacheControl: false,
DirectMount: true,
DirectMountFlags: 0,
//SyncRead: false, // set to false to enable the FUSE_CAP_ASYNC_READ capability
EnableAcl: true,
}
if *option.defaultPermissions {
fuseMountOptions.Options = append(fuseMountOptions.Options, "default_permissions")
}
if *option.nonempty {
fuseMountOptions.Options = append(fuseMountOptions.Options, "nonempty")
}
if *option.readOnly {
if runtime.GOOS == "darwin" {
fuseMountOptions.Options = append(fuseMountOptions.Options, "rdonly")
} else {
fuseMountOptions.Options = append(fuseMountOptions.Options, "ro")
}
}
if runtime.GOOS == "darwin" {
// https://github-wiki-see.page/m/macfuse/macfuse/wiki/Mount-Options
ioSizeMB := 1
for ioSizeMB*2 <= *option.chunkSizeLimitMB && ioSizeMB*2 <= 32 {
ioSizeMB *= 2
}
fuseMountOptions.Options = append(fuseMountOptions.Options, "daemon_timeout=600")
if runtime.GOARCH == "amd64" {
fuseMountOptions.Options = append(fuseMountOptions.Options, "noapplexattr")
}
if option.novncache != nil && *option.novncache {
fuseMountOptions.Options = append(fuseMountOptions.Options, "novncache")
}
fuseMountOptions.Options = append(fuseMountOptions.Options, "slow_statfs")
fuseMountOptions.Options = append(fuseMountOptions.Options, "volname="+serverFriendlyName)
fuseMountOptions.Options = append(fuseMountOptions.Options, fmt.Sprintf("iosize=%d", ioSizeMB*1024*1024))
}
if option.writebackCache != nil {
fuseMountOptions.EnableWriteback = *option.writebackCache
}
if option.asyncDio != nil {
fuseMountOptions.EnableAsyncDio = *option.asyncDio
}
if option.cacheSymlink != nil && *option.cacheSymlink {
fuseMountOptions.EnableSymlinkCaching = true
}
// find mount point
mountRoot := filerMountRootPath
if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") {
mountRoot = mountRoot[0 : len(mountRoot)-1]
}
cacheDirForWrite := *option.cacheDirForWrite
if cacheDirForWrite == "" {
cacheDirForWrite = *option.cacheDirForRead
}
seaweedFileSystem := mount.NewSeaweedFileSystem(&mount.Option{
MountDirectory: dir,
FilerAddresses: filerAddresses,
GrpcDialOption: grpcDialOption,
FilerSigningKey: security.SigningKey(util.GetViper().GetString("jwt.filer_signing.key")),
FilerSigningExpiresAfterSec: util.GetViper().GetInt("jwt.filer_signing.expires_after_seconds"),
FilerMountRootPath: mountRoot,
Collection: *option.collection,
Replication: *option.replication,
TtlSec: int32(*option.ttlSec),
DiskType: types.ToDiskType(*option.diskType),
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
ConcurrentWriters: *option.concurrentWriters,
ConcurrentReaders: *option.concurrentReaders,
CacheDirForRead: *option.cacheDirForRead,
CacheSizeMBForRead: *option.cacheSizeMBForRead,
CacheDirForWrite: cacheDirForWrite,
CacheMetaTTlSec: *option.cacheMetaTtlSec,
DataCenter: *option.dataCenter,
Quota: int64(*option.collectionQuota) * 1024 * 1024,
MountUid: uid,
MountGid: gid,
MountMode: mountMode,
MountCtime: fileInfo.ModTime(),
MountMtime: time.Now(),
Umask: umask,
VolumeServerAccess: *mountOptions.volumeServerAccess,
Cipher: cipher,
UidGidMapper: uidGidMapper,
DisableXAttr: *option.disableXAttr,
IsMacOs: runtime.GOOS == "darwin",
MetadataFlushSeconds: *option.metadataFlushSeconds,
// RDMA acceleration options
RdmaEnabled: *option.rdmaEnabled,
RdmaSidecarAddr: *option.rdmaSidecarAddr,
RdmaFallback: *option.rdmaFallback,
RdmaReadOnly: *option.rdmaReadOnly,
RdmaMaxConcurrent: *option.rdmaMaxConcurrent,
RdmaTimeoutMs: *option.rdmaTimeoutMs,
DirIdleEvictSec: *option.dirIdleEvictSec,
WritebackCache: option.writebackCache != nil && *option.writebackCache,
})
// create mount root
mountRootPath := util.FullPath(mountRoot)
mountRootParent, mountDir := mountRootPath.DirAndName()
if err = filer_pb.Mkdir(context.Background(), seaweedFileSystem, mountRootParent, mountDir, nil); err != nil {
fmt.Printf("failed to create dir %s on filer %s: %v\n", mountRoot, filerAddresses, err)
return false
}
if err := ensureBucketAllowEmptyFolders(context.Background(), seaweedFileSystem, mountRoot, bucketRootPath); err != nil {
fmt.Printf("failed to set bucket auto-remove-empty-folders policy for %s: %v\n", mountRoot, err)
return false
}
server, err := fuse.NewServer(seaweedFileSystem, dir, fuseMountOptions)
if err != nil {
glog.Fatalf("Mount fail: %v", err)
}
grace.OnInterrupt(func() {
unmount.Unmount(dir)
})
if mountOptions.fuseCommandPid != 0 {
// send a signal to the parent process to notify that the mount is ready
err = syscall.Kill(mountOptions.fuseCommandPid, syscall.SIGTERM)
if err != nil {
fmt.Printf("failed to notify parent process: %v\n", err)
return false
}
}
grpcS := pb.NewGrpcServer()
mount_pb.RegisterSeaweedMountServer(grpcS, seaweedFileSystem)
reflection.Register(grpcS)
go grpcS.Serve(montSocketListener)
err = seaweedFileSystem.StartBackgroundTasks()
if err != nil {
fmt.Printf("failed to start background tasks: %v\n", err)
return false
}
glog.V(0).Infof("mounted %s%s to %v", *option.filer, mountRoot, dir)
glog.V(0).Infof("This is SeaweedFS version %s %s %s", version.Version(), runtime.GOOS, runtime.GOARCH)
server.Serve()
// Wait for any pending background flushes (writebackCache async mode)
// before clearing caches, to prevent data loss during clean unmount.
seaweedFileSystem.WaitForAsyncFlush()
seaweedFileSystem.ClearCacheDir()
return true
}