Files
seaweedFS/weed/mount/meta_cache/meta_cache.go
Chris Lu 6442da6f17 mount: efficient file lookup in large directories, skipping directory caching (#7818)
* mount: skip directory caching on file lookup and write

When opening or creating a file in a directory that hasn't been cached yet,
don't list the entire directory. Instead:
- For reads: fetch only the single file's metadata directly from the filer
- For writes: create on filer but skip local cache insertion

This fixes a performance issue where opening a file in a directory
with millions of files would hang because EnsureVisited() had to
list all entries before the open could complete.

The directory will still be cached when explicitly listed (ReadDir),
but individual file operations now bypass the full directory caching.

Key optimizations:
- Extract shared lookupEntry() method to eliminate code duplication
- Skip EnsureVisited on Lookup (file open)
- Skip cache insertion on Mknod, Mkdir, Symlink, Link if dir not cached
- Skip cache update on file sync/flush if dir not cached
- If directory IS cached and entry not found, return ENOENT immediately

Fixes #7145

* mount: add error handling for meta cache insert/update operations

Handle errors from metaCache.InsertEntry and metaCache.UpdateEntry calls
instead of silently ignoring them. This prevents silent cache inconsistencies
and ensures errors are properly propagated.

Files updated:
- filehandle_read.go: handle InsertEntry error in downloadRemoteEntry
- weedfs_file_sync.go: handle InsertEntry error in doFlush
- weedfs_link.go: handle UpdateEntry and InsertEntry errors in Link
- weedfs_symlink.go: handle InsertEntry error in Symlink

* mount: use error wrapping (%w) for consistent error handling

Use %w instead of %v in fmt.Errorf to preserve the original error,
allowing it to be inspected up the call stack with errors.Is/As.
2025-12-18 21:19:15 -08:00

196 lines
5.7 KiB
Go

package meta_cache
import (
"context"
"os"
"sync"
"time"
"golang.org/x/sync/singleflight"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// need to have logic similar to FilerStoreWrapper
// e.g. fill fileId field for chunks
type MetaCache struct {
root util.FullPath
localStore filer.VirtualFilerStore
leveldbStore *leveldb.LevelDBStore // direct reference for batch operations
sync.RWMutex
uidGidMapper *UidGidMapper
markCachedFn func(fullpath util.FullPath)
isCachedFn func(fullpath util.FullPath) bool
invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry)
visitGroup singleflight.Group // deduplicates concurrent EnsureVisited calls for the same path
}
func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, root util.FullPath,
markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache {
leveldbStore, virtualStore := openMetaStore(dbFolder)
return &MetaCache{
root: root,
localStore: virtualStore,
leveldbStore: leveldbStore,
markCachedFn: markCachedFn,
isCachedFn: isCachedFn,
uidGidMapper: uidGidMapper,
invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) {
invalidateFunc(fullpath, entry)
},
}
}
func openMetaStore(dbFolder string) (*leveldb.LevelDBStore, filer.VirtualFilerStore) {
os.RemoveAll(dbFolder)
os.MkdirAll(dbFolder, 0755)
store := &leveldb.LevelDBStore{}
config := &cacheConfig{
dir: dbFolder,
}
if err := store.Initialize(config, ""); err != nil {
glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
}
return store, filer.NewFilerStoreWrapper(store)
}
func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error {
mc.Lock()
defer mc.Unlock()
return mc.doInsertEntry(ctx, entry)
}
func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
return mc.localStore.InsertEntry(ctx, entry)
}
// doBatchInsertEntries inserts multiple entries using LevelDB's batch write.
// This is more efficient than inserting entries one by one.
func (mc *MetaCache) doBatchInsertEntries(ctx context.Context, entries []*filer.Entry) error {
return mc.leveldbStore.BatchInsertEntries(ctx, entries)
}
func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
mc.Lock()
defer mc.Unlock()
entry, err := mc.localStore.FindEntry(ctx, oldPath)
if err != nil && err != filer_pb.ErrNotFound {
glog.Errorf("Metacache: find entry error: %v", err)
return err
}
if entry != nil {
if oldPath != "" {
if newEntry != nil && oldPath == newEntry.FullPath {
// skip the unnecessary deletion
// leave the update to the following InsertEntry operation
} else {
ctx = context.WithValue(ctx, "OP", "MV")
glog.V(3).Infof("DeleteEntry %s", oldPath)
if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
return err
}
}
}
} else {
// println("unknown old directory:", oldDir)
}
if newEntry != nil {
newDir, _ := newEntry.DirAndName()
if mc.isCachedFn(util.FullPath(newDir)) {
glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name())
if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil {
return err
}
}
}
return nil
}
func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
mc.Lock()
defer mc.Unlock()
return mc.localStore.UpdateEntry(ctx, entry)
}
func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) {
mc.RLock()
defer mc.RUnlock()
entry, err = mc.localStore.FindEntry(ctx, fp)
if err != nil {
return nil, err
}
if entry.TtlSec > 0 && entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) {
return nil, filer_pb.ErrNotFound
}
mc.mapIdFromFilerToLocal(entry)
return
}
func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
mc.Lock()
defer mc.Unlock()
return mc.localStore.DeleteEntry(ctx, fp)
}
func (mc *MetaCache) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
mc.Lock()
defer mc.Unlock()
return mc.localStore.DeleteFolderChildren(ctx, fp)
}
func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
mc.RLock()
defer mc.RUnlock()
if !mc.isCachedFn(dirPath) {
// if this request comes after renaming, it should be fine
glog.Warningf("unsynchronized dir: %v", dirPath)
}
_, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) (bool, error) {
if entry.TtlSec > 0 && entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) {
return true, nil
}
mc.mapIdFromFilerToLocal(entry)
return eachEntryFunc(entry)
})
if err != nil {
return err
}
return err
}
func (mc *MetaCache) Shutdown() {
mc.Lock()
defer mc.Unlock()
mc.localStore.Shutdown()
}
func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) {
entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid)
}
func (mc *MetaCache) Debug() {
if debuggable, ok := mc.localStore.(filer.Debuggable); ok {
println("start debugging")
debuggable.Debug(os.Stderr)
}
}
// IsDirectoryCached returns true if the directory has been fully cached
// (i.e., all entries have been loaded via EnsureVisited or ReadDir).
func (mc *MetaCache) IsDirectoryCached(dirPath util.FullPath) bool {
return mc.isCachedFn(dirPath)
}