fix nfs list with prefix batch scan (#7694)
* fix nfs list with prefix batch scan * remove else branch
This commit is contained in:
@@ -2,7 +2,6 @@ package mount
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
"github.com/hanwen/go-fuse/v2/fuse"
|
||||
@@ -16,6 +15,7 @@ type DirectoryHandleId uint64
|
||||
|
||||
const (
|
||||
directoryStreamBaseOffset = 2 // . & ..
|
||||
batchSize = 1000
|
||||
)
|
||||
|
||||
// DirectoryHandle represents an open directory handle.
|
||||
@@ -166,11 +166,17 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
|
||||
}
|
||||
|
||||
var dirEntry fuse.DirEntry
|
||||
processEachEntryFn := func(entry *filer.Entry) bool {
|
||||
|
||||
// index is the position in entryStream, used to calculate the offset for next readdir
|
||||
processEachEntryFn := func(entry *filer.Entry, index int64) bool {
|
||||
dirEntry.Name = entry.Name()
|
||||
dirEntry.Mode = toSyscallMode(entry.Mode)
|
||||
inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, false)
|
||||
dirEntry.Ino = inode
|
||||
|
||||
// Set Off to the next offset so client can resume from correct position
|
||||
dirEntry.Off = dh.entryStreamOffset + uint64(index) + 1
|
||||
|
||||
if !isPlusMode {
|
||||
if !out.AddDirEntry(dirEntry) {
|
||||
isEarlyTerminated = true
|
||||
@@ -195,59 +201,92 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
|
||||
if input.Offset < directoryStreamBaseOffset {
|
||||
if !isPlusMode {
|
||||
if input.Offset == 0 {
|
||||
out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."})
|
||||
out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".", Off: 1})
|
||||
}
|
||||
out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."})
|
||||
out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "..", Off: 2})
|
||||
} else {
|
||||
if input.Offset == 0 {
|
||||
out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."})
|
||||
out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".", Off: 1})
|
||||
}
|
||||
out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."})
|
||||
out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "..", Off: 2})
|
||||
}
|
||||
input.Offset = directoryStreamBaseOffset
|
||||
}
|
||||
|
||||
var lastEntryName string
|
||||
|
||||
// Read from cache first, then load next batch if needed
|
||||
if input.Offset >= dh.entryStreamOffset {
|
||||
// Handle case: new handle with non-zero offset but empty cache
|
||||
// This happens when NFS-Ganesha opens multiple directory handles
|
||||
if len(dh.entryStream) == 0 && input.Offset > dh.entryStreamOffset {
|
||||
skipCount := int64(input.Offset - dh.entryStreamOffset)
|
||||
|
||||
if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil {
|
||||
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
|
||||
return fuse.EIO
|
||||
}
|
||||
|
||||
// Load entries from beginning to fill cache up to the requested offset
|
||||
loadErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, skipCount+int64(batchSize), func(entry *filer.Entry) (bool, error) {
|
||||
dh.entryStream = append(dh.entryStream, entry)
|
||||
return true, nil
|
||||
})
|
||||
if loadErr != nil {
|
||||
glog.Errorf("list meta cache: %v", loadErr)
|
||||
return fuse.EIO
|
||||
}
|
||||
}
|
||||
|
||||
if input.Offset > dh.entryStreamOffset {
|
||||
entryPreviousIndex := (input.Offset - dh.entryStreamOffset) - 1
|
||||
if uint64(len(dh.entryStream)) > entryPreviousIndex {
|
||||
lastEntryName = dh.entryStream[entryPreviousIndex].Name()
|
||||
dh.entryStream = dh.entryStream[entryPreviousIndex:]
|
||||
dh.entryStreamOffset = input.Offset - 1
|
||||
}
|
||||
}
|
||||
entryCurrentIndex := input.Offset - dh.entryStreamOffset
|
||||
for uint64(len(dh.entryStream)) > entryCurrentIndex {
|
||||
|
||||
entryCurrentIndex := int64(input.Offset - dh.entryStreamOffset)
|
||||
for int64(len(dh.entryStream)) > entryCurrentIndex {
|
||||
entry := dh.entryStream[entryCurrentIndex]
|
||||
if processEachEntryFn(entry) {
|
||||
if processEachEntryFn(entry, entryCurrentIndex) {
|
||||
lastEntryName = entry.Name()
|
||||
entryCurrentIndex++
|
||||
} else {
|
||||
// early terminated
|
||||
return fuse.OK
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil {
|
||||
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
|
||||
return fuse.EIO
|
||||
}
|
||||
listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) (bool, error) {
|
||||
dh.entryStream = append(dh.entryStream, entry)
|
||||
if !processEachEntryFn(entry) {
|
||||
return false, nil
|
||||
// Cache exhausted, load next batch
|
||||
if !isEarlyTerminated {
|
||||
if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil {
|
||||
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
|
||||
return fuse.EIO
|
||||
}
|
||||
|
||||
// Batch loading: fetch batchSize entries starting from lastEntryName
|
||||
loadedCount := 0
|
||||
bufferFull := false
|
||||
loadErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(batchSize), func(entry *filer.Entry) (bool, error) {
|
||||
currentIndex := int64(len(dh.entryStream))
|
||||
dh.entryStream = append(dh.entryStream, entry)
|
||||
loadedCount++
|
||||
if !processEachEntryFn(entry, currentIndex) {
|
||||
bufferFull = true
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if loadErr != nil {
|
||||
glog.Errorf("list meta cache: %v", loadErr)
|
||||
return fuse.EIO
|
||||
}
|
||||
|
||||
// Mark finished only when loading completed normally (not buffer full)
|
||||
// and we got fewer entries than requested
|
||||
if !bufferFull && loadedCount < batchSize {
|
||||
dh.isFinished = true
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if listErr != nil {
|
||||
glog.Errorf("list meta cache: %v", listErr)
|
||||
return fuse.EIO
|
||||
}
|
||||
|
||||
if !isEarlyTerminated {
|
||||
dh.isFinished = true
|
||||
}
|
||||
|
||||
return fuse.OK
|
||||
|
||||
Reference in New Issue
Block a user