FUSE Mount: Fix buffer allocation during copy (#6863)
Fix buffer allocation during FUSE copy
This commit is contained in:
@@ -3,11 +3,11 @@ package mount
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/util/version"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -23,6 +23,7 @@ import (
|
|||||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
|
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/util/grace"
|
"github.com/seaweedfs/seaweedfs/weed/util/grace"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/util/version"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
||||||
|
|
||||||
"github.com/hanwen/go-fuse/v2/fs"
|
"github.com/hanwen/go-fuse/v2/fs"
|
||||||
@@ -77,6 +78,8 @@ type WFS struct {
|
|||||||
chunkCache *chunk_cache.TieredChunkCache
|
chunkCache *chunk_cache.TieredChunkCache
|
||||||
signature int32
|
signature int32
|
||||||
concurrentWriters *util.LimitedConcurrentExecutor
|
concurrentWriters *util.LimitedConcurrentExecutor
|
||||||
|
copyBufferPool sync.Pool
|
||||||
|
concurrentCopiersSem chan struct{}
|
||||||
inodeToPath *InodeToPath
|
inodeToPath *InodeToPath
|
||||||
fhMap *FileHandleToInode
|
fhMap *FileHandleToInode
|
||||||
dhMap *DirectoryHandleToInode
|
dhMap *DirectoryHandleToInode
|
||||||
@@ -139,6 +142,10 @@ func NewSeaweedFileSystem(option *Option) *WFS {
|
|||||||
|
|
||||||
if wfs.option.ConcurrentWriters > 0 {
|
if wfs.option.ConcurrentWriters > 0 {
|
||||||
wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
|
wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
|
||||||
|
wfs.concurrentCopiersSem = make(chan struct{}, wfs.option.ConcurrentWriters)
|
||||||
|
}
|
||||||
|
wfs.copyBufferPool.New = func() any {
|
||||||
|
return make([]byte, option.ChunkSizeLimit)
|
||||||
}
|
}
|
||||||
return wfs
|
return wfs
|
||||||
}
|
}
|
||||||
@@ -183,7 +190,6 @@ func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
|
func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
|
||||||
|
|
||||||
// glog.V(3).Infof("read entry cache miss %s", fullpath)
|
// glog.V(3).Infof("read entry cache miss %s", fullpath)
|
||||||
dir, name := fullpath.DirAndName()
|
dir, name := fullpath.DirAndName()
|
||||||
|
|
||||||
|
|||||||
@@ -1,13 +1,13 @@
|
|||||||
package mount
|
package mount
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hanwen/go-fuse/v2/fuse"
|
"github.com/hanwen/go-fuse/v2/fuse"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CopyFileRange copies data from one file to another from and to specified offsets.
|
// CopyFileRange copies data from one file to another from and to specified offsets.
|
||||||
@@ -70,30 +70,85 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
|
|||||||
in.OffOut, in.OffOut+in.Len,
|
in.OffOut, in.OffOut+in.Len,
|
||||||
)
|
)
|
||||||
|
|
||||||
data := make([]byte, in.Len)
|
// Concurrent copy operations could allocate too much memory, so we want to
|
||||||
totalRead, err := readDataByFileHandle(data, fhIn, int64(in.OffIn))
|
// throttle our concurrency, scaling with the number of writers the mount
|
||||||
|
// was configured with.
|
||||||
|
if wfs.concurrentCopiersSem != nil {
|
||||||
|
wfs.concurrentCopiersSem <- struct{}{}
|
||||||
|
defer func() { <-wfs.concurrentCopiersSem }()
|
||||||
|
}
|
||||||
|
|
||||||
|
// We want to stream the copy operation to avoid allocating massive buffers.
|
||||||
|
nowUnixNano := time.Now().UnixNano()
|
||||||
|
totalCopied := int64(0)
|
||||||
|
buff := wfs.copyBufferPool.Get().([]byte)
|
||||||
|
defer wfs.copyBufferPool.Put(buff)
|
||||||
|
for {
|
||||||
|
// Comply with cancellation as best as we can, given that the underlying
|
||||||
|
// IO functions aren't cancellation-aware.
|
||||||
|
select {
|
||||||
|
case <-cancel:
|
||||||
|
glog.Warningf("canceled CopyFileRange for %s (copied %d)",
|
||||||
|
fhIn.FullPath(), totalCopied)
|
||||||
|
return uint32(totalCopied), fuse.EINTR
|
||||||
|
default: // keep going
|
||||||
|
}
|
||||||
|
|
||||||
|
// We can save one IO by breaking early if we already know the next read
|
||||||
|
// will result in zero bytes.
|
||||||
|
remaining := int64(in.Len) - totalCopied
|
||||||
|
readLen := min(remaining, int64(len(buff)))
|
||||||
|
if readLen == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform the read
|
||||||
|
offsetIn := totalCopied + int64(in.OffIn)
|
||||||
|
numBytesRead, err := readDataByFileHandle(
|
||||||
|
buff[:readLen], fhIn, offsetIn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warningf("file handle read %s %d: %v", fhIn.FullPath(), totalRead, err)
|
glog.Warningf("file handle read %s %d (total %d): %v",
|
||||||
|
fhIn.FullPath(), numBytesRead, totalCopied, err)
|
||||||
return 0, fuse.EIO
|
return 0, fuse.EIO
|
||||||
}
|
}
|
||||||
data = data[:totalRead]
|
|
||||||
|
|
||||||
if totalRead == 0 {
|
// Break if we're done copying (no more bytes to read)
|
||||||
|
if numBytesRead == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
offsetOut := int64(in.OffOut) + totalCopied
|
||||||
|
|
||||||
|
// Detect mime type only during the beginning of our stream, since
|
||||||
|
// DetectContentType is expecting some of the first 512 bytes of the
|
||||||
|
// file. See [http.DetectContentType] for details.
|
||||||
|
if offsetOut <= 512 {
|
||||||
|
fhOut.contentType = http.DetectContentType(buff[:numBytesRead])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform the write
|
||||||
|
fhOut.dirtyPages.writerPattern.MonitorWriteAt(offsetOut, int(numBytesRead))
|
||||||
|
fhOut.dirtyPages.AddPage(
|
||||||
|
offsetOut,
|
||||||
|
buff[:numBytesRead],
|
||||||
|
fhOut.dirtyPages.writerPattern.IsSequentialMode(),
|
||||||
|
nowUnixNano)
|
||||||
|
|
||||||
|
// Accumulate for the next loop iteration
|
||||||
|
totalCopied += numBytesRead
|
||||||
|
}
|
||||||
|
|
||||||
|
if totalCopied == 0 {
|
||||||
return 0, fuse.OK
|
return 0, fuse.OK
|
||||||
}
|
}
|
||||||
|
|
||||||
// put data at the specified offset in target file
|
fhOut.entry.Attributes.FileSize = uint64(max(
|
||||||
fhOut.dirtyPages.writerPattern.MonitorWriteAt(int64(in.OffOut), int(in.Len))
|
totalCopied+int64(in.OffOut),
|
||||||
|
int64(fhOut.entry.Attributes.FileSize),
|
||||||
|
))
|
||||||
fhOut.entry.Content = nil
|
fhOut.entry.Content = nil
|
||||||
fhOut.dirtyPages.AddPage(int64(in.OffOut), data, fhOut.dirtyPages.writerPattern.IsSequentialMode(), time.Now().UnixNano())
|
|
||||||
fhOut.entry.Attributes.FileSize = uint64(max(int64(in.OffOut)+totalRead, int64(fhOut.entry.Attributes.FileSize)))
|
|
||||||
fhOut.dirtyMetadata = true
|
fhOut.dirtyMetadata = true
|
||||||
written = uint32(totalRead)
|
|
||||||
|
|
||||||
// detect mime type
|
|
||||||
if written > 0 && in.OffOut <= 512 {
|
|
||||||
fhOut.contentType = http.DetectContentType(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
written = uint32(totalCopied)
|
||||||
return written, fuse.OK
|
return written, fuse.OK
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user