mount: add option to show system entries (#8829)
* mount: add option to show system entries * address gemini code review's suggested changes * rename flag from -showSystemEntries to -includeSystemEntries * meta_cache: purge hidden system entries on filer events --------- Co-authored-by: Chris Lu <chris.lu@gmail.com>
This commit is contained in:
@@ -27,18 +27,19 @@ type MetaCache struct {
|
||||
localStore filer.VirtualFilerStore
|
||||
leveldbStore *leveldb.LevelDBStore // direct reference for batch operations
|
||||
sync.RWMutex
|
||||
uidGidMapper *UidGidMapper
|
||||
markCachedFn func(fullpath util.FullPath)
|
||||
isCachedFn func(fullpath util.FullPath) bool
|
||||
invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry)
|
||||
onDirectoryUpdate func(dir util.FullPath)
|
||||
visitGroup singleflight.Group // deduplicates concurrent EnsureVisited calls for the same path
|
||||
applyCh chan metadataApplyRequest
|
||||
applyDone chan struct{}
|
||||
applyStateMu sync.Mutex
|
||||
applyClosed bool
|
||||
buildingDirs map[util.FullPath]*directoryBuildState
|
||||
dedupRing dedupRingBuffer
|
||||
uidGidMapper *UidGidMapper
|
||||
markCachedFn func(fullpath util.FullPath)
|
||||
isCachedFn func(fullpath util.FullPath) bool
|
||||
invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry)
|
||||
onDirectoryUpdate func(dir util.FullPath)
|
||||
visitGroup singleflight.Group // deduplicates concurrent EnsureVisited calls for the same path
|
||||
applyCh chan metadataApplyRequest
|
||||
applyDone chan struct{}
|
||||
applyStateMu sync.Mutex
|
||||
applyClosed bool
|
||||
buildingDirs map[util.FullPath]*directoryBuildState
|
||||
dedupRing dedupRingBuffer
|
||||
includeSystemEntries bool
|
||||
}
|
||||
|
||||
var errMetaCacheClosed = errors.New("metadata cache is shut down")
|
||||
@@ -84,17 +85,18 @@ type metadataApplyRequest struct {
|
||||
done chan error
|
||||
}
|
||||
|
||||
func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, root util.FullPath,
|
||||
func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, root util.FullPath, includeSystemEntries bool,
|
||||
markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry), onDirectoryUpdate func(dir util.FullPath)) *MetaCache {
|
||||
leveldbStore, virtualStore := openMetaStore(dbFolder)
|
||||
mc := &MetaCache{
|
||||
root: root,
|
||||
localStore: virtualStore,
|
||||
leveldbStore: leveldbStore,
|
||||
markCachedFn: markCachedFn,
|
||||
isCachedFn: isCachedFn,
|
||||
uidGidMapper: uidGidMapper,
|
||||
onDirectoryUpdate: onDirectoryUpdate,
|
||||
root: root,
|
||||
localStore: virtualStore,
|
||||
leveldbStore: leveldbStore,
|
||||
markCachedFn: markCachedFn,
|
||||
isCachedFn: isCachedFn,
|
||||
uidGidMapper: uidGidMapper,
|
||||
onDirectoryUpdate: onDirectoryUpdate,
|
||||
includeSystemEntries: includeSystemEntries,
|
||||
invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) {
|
||||
invalidateFunc(fullpath, entry)
|
||||
},
|
||||
@@ -182,6 +184,29 @@ func (mc *MetaCache) atomicUpdateEntryFromFilerLocked(ctx context.Context, oldPa
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc *MetaCache) shouldHideEntry(fullpath util.FullPath) bool {
|
||||
if mc.includeSystemEntries {
|
||||
return false
|
||||
}
|
||||
dir, name := fullpath.DirAndName()
|
||||
return IsHiddenSystemEntry(dir, name)
|
||||
}
|
||||
|
||||
func (mc *MetaCache) purgeEntryLocked(ctx context.Context, fullpath util.FullPath, isDirectory bool) error {
|
||||
if fullpath == "" {
|
||||
return nil
|
||||
}
|
||||
if err := mc.localStore.DeleteEntry(ctx, fullpath); err != nil {
|
||||
return err
|
||||
}
|
||||
if isDirectory {
|
||||
if err := mc.localStore.DeleteFolderChildren(ctx, fullpath); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc *MetaCache) ApplyMetadataResponse(ctx context.Context, resp *filer_pb.SubscribeMetadataResponse, options MetadataResponseApplyOptions) error {
|
||||
if resp == nil || resp.EventNotification == nil {
|
||||
return nil
|
||||
@@ -520,7 +545,9 @@ func (mc *MetaCache) applyMetadataResponseLocked(ctx context.Context, resp *file
|
||||
}
|
||||
|
||||
var oldPath util.FullPath
|
||||
var newPath util.FullPath
|
||||
var newEntry *filer.Entry
|
||||
hideNewPath := false
|
||||
if message.OldEntry != nil {
|
||||
oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name)
|
||||
}
|
||||
@@ -530,11 +557,20 @@ func (mc *MetaCache) applyMetadataResponseLocked(ctx context.Context, resp *file
|
||||
if message.NewParentPath != "" {
|
||||
dir = message.NewParentPath
|
||||
}
|
||||
newEntry = filer.FromPbEntry(dir, message.NewEntry)
|
||||
newPath = util.NewFullPath(dir, message.NewEntry.Name)
|
||||
hideNewPath = mc.shouldHideEntry(newPath)
|
||||
if !hideNewPath {
|
||||
newEntry = filer.FromPbEntry(dir, message.NewEntry)
|
||||
}
|
||||
}
|
||||
|
||||
mc.Lock()
|
||||
err := mc.atomicUpdateEntryFromFilerLocked(ctx, oldPath, newEntry, allowUncachedInsert)
|
||||
if err == nil && hideNewPath {
|
||||
if purgeErr := mc.purgeEntryLocked(ctx, newPath, message.NewEntry.IsDirectory); purgeErr != nil {
|
||||
err = purgeErr
|
||||
}
|
||||
}
|
||||
// When a directory is deleted or moved, remove its cached descendants
|
||||
// so stale children cannot be served from the local cache.
|
||||
if err == nil && oldPath != "" && message.OldEntry != nil && message.OldEntry.IsDirectory {
|
||||
|
||||
@@ -2,6 +2,7 @@ package meta_cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
@@ -296,6 +297,114 @@ func TestApplyMetadataResponseDeduplicatesRepeatedFilerEvent(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyMetadataResponseSkipsHiddenSystemEntryWhenDisabled(t *testing.T) {
|
||||
mc, _, _, _ := newTestMetaCache(t, map[util.FullPath]bool{
|
||||
"/": true,
|
||||
})
|
||||
defer mc.Shutdown()
|
||||
|
||||
createResp := &filer_pb.SubscribeMetadataResponse{
|
||||
Directory: "/",
|
||||
EventNotification: &filer_pb.EventNotification{
|
||||
NewEntry: &filer_pb.Entry{
|
||||
Name: "topics",
|
||||
Attributes: &filer_pb.FuseAttributes{
|
||||
Crtime: 1,
|
||||
Mtime: 1,
|
||||
FileMode: uint32(os.ModeDir | 0o755),
|
||||
},
|
||||
IsDirectory: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if err := mc.ApplyMetadataResponse(context.Background(), createResp, SubscriberMetadataResponseApplyOptions); err != nil {
|
||||
t.Fatalf("apply create: %v", err)
|
||||
}
|
||||
|
||||
entry, err := mc.FindEntry(context.Background(), util.FullPath("/topics"))
|
||||
if err != filer_pb.ErrNotFound {
|
||||
t.Fatalf("find hidden entry error = %v, want %v", err, filer_pb.ErrNotFound)
|
||||
}
|
||||
if entry != nil {
|
||||
t.Fatalf("hidden entry still cached: %+v", entry)
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyMetadataResponsePurgesHiddenDestinationPath(t *testing.T) {
|
||||
mc, _, _, _ := newTestMetaCache(t, map[util.FullPath]bool{
|
||||
"/": true,
|
||||
"/src": true,
|
||||
})
|
||||
defer mc.Shutdown()
|
||||
|
||||
if err := mc.InsertEntry(context.Background(), &filer.Entry{
|
||||
FullPath: "/topics",
|
||||
Attr: filer.Attr{
|
||||
Crtime: time.Unix(1, 0),
|
||||
Mtime: time.Unix(1, 0),
|
||||
Mode: os.ModeDir | 0o755,
|
||||
},
|
||||
}); err != nil {
|
||||
t.Fatalf("insert stale hidden dir: %v", err)
|
||||
}
|
||||
if err := mc.InsertEntry(context.Background(), &filer.Entry{
|
||||
FullPath: "/topics/leaked.txt",
|
||||
Attr: filer.Attr{
|
||||
Crtime: time.Unix(1, 0),
|
||||
Mtime: time.Unix(1, 0),
|
||||
Mode: 0o644,
|
||||
FileSize: 7,
|
||||
},
|
||||
}); err != nil {
|
||||
t.Fatalf("insert leaked hidden child: %v", err)
|
||||
}
|
||||
if err := mc.InsertEntry(context.Background(), &filer.Entry{
|
||||
FullPath: "/src/visible",
|
||||
Attr: filer.Attr{
|
||||
Crtime: time.Unix(1, 0),
|
||||
Mtime: time.Unix(1, 0),
|
||||
Mode: os.ModeDir | 0o755,
|
||||
},
|
||||
}); err != nil {
|
||||
t.Fatalf("insert source dir: %v", err)
|
||||
}
|
||||
|
||||
renameResp := &filer_pb.SubscribeMetadataResponse{
|
||||
Directory: "/src",
|
||||
EventNotification: &filer_pb.EventNotification{
|
||||
OldEntry: &filer_pb.Entry{
|
||||
Name: "visible",
|
||||
IsDirectory: true,
|
||||
},
|
||||
NewEntry: &filer_pb.Entry{
|
||||
Name: "topics",
|
||||
Attributes: &filer_pb.FuseAttributes{
|
||||
Crtime: 2,
|
||||
Mtime: 2,
|
||||
FileMode: uint32(os.ModeDir | 0o755),
|
||||
},
|
||||
IsDirectory: true,
|
||||
},
|
||||
NewParentPath: "/",
|
||||
},
|
||||
}
|
||||
|
||||
if err := mc.ApplyMetadataResponse(context.Background(), renameResp, SubscriberMetadataResponseApplyOptions); err != nil {
|
||||
t.Fatalf("apply rename: %v", err)
|
||||
}
|
||||
|
||||
if entry, err := mc.FindEntry(context.Background(), util.FullPath("/src/visible")); err != filer_pb.ErrNotFound || entry != nil {
|
||||
t.Fatalf("source dir after rename = %+v, %v; want nil, %v", entry, err, filer_pb.ErrNotFound)
|
||||
}
|
||||
if entry, err := mc.FindEntry(context.Background(), util.FullPath("/topics")); err != filer_pb.ErrNotFound || entry != nil {
|
||||
t.Fatalf("hidden destination after rename = %+v, %v; want nil, %v", entry, err, filer_pb.ErrNotFound)
|
||||
}
|
||||
if entry, err := mc.FindEntry(context.Background(), util.FullPath("/topics/leaked.txt")); err != filer_pb.ErrNotFound || entry != nil {
|
||||
t.Fatalf("hidden child after rename = %+v, %v; want nil, %v", entry, err, filer_pb.ErrNotFound)
|
||||
}
|
||||
}
|
||||
|
||||
func newTestMetaCache(t *testing.T, cached map[util.FullPath]bool) (*MetaCache, map[util.FullPath]bool, *recordedPaths, *recordedPaths) {
|
||||
t.Helper()
|
||||
|
||||
@@ -312,6 +421,7 @@ func newTestMetaCache(t *testing.T, cached map[util.FullPath]bool) (*MetaCache,
|
||||
filepath.Join(t.TempDir(), "meta"),
|
||||
mapper,
|
||||
util.FullPath("/"),
|
||||
false,
|
||||
func(path util.FullPath) {
|
||||
cachedMu.Lock()
|
||||
defer cachedMu.Unlock()
|
||||
|
||||
@@ -107,7 +107,7 @@ func doEnsureVisited(ctx context.Context, mc *MetaCache, client filer_pb.FilerCl
|
||||
var err error
|
||||
snapshotTsNs, err = filer_pb.ReadDirAllEntriesWithSnapshot(ctx, client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
|
||||
entry := filer.FromPbEntry(string(path), pbEntry)
|
||||
if IsHiddenSystemEntry(string(path), entry.Name()) {
|
||||
if !mc.includeSystemEntries && IsHiddenSystemEntry(string(path), entry.Name()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user