mount: refresh and evict hot dir cache (#8174)
* mount: refresh and evict hot dir cache * mount: guard dir update window and extend TTL * mount: reuse timestamp for cache mark * Apply suggestion from @gemini-code-assist[bot] Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * mount: make dir cache tuning configurable * mount: dedupe dir update notices * mount: restore invalidate-all cache helper * mount: keep hot dir tuning constants * mount: centralize cache state reset * mount: mark refresh completion time * mount: allow disabling idle eviction --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
@@ -283,17 +283,17 @@ func (s *AdminServer) generateBreadcrumbs(dir string) []BreadcrumbItem {
|
|||||||
}
|
}
|
||||||
currentPath += "/" + part
|
currentPath += "/" + part
|
||||||
|
|
||||||
// Special handling for bucket paths
|
// Special handling for bucket paths
|
||||||
displayName := part
|
displayName := part
|
||||||
if len(breadcrumbs) == 1 && part == "buckets" {
|
if len(breadcrumbs) == 1 && part == "buckets" {
|
||||||
displayName = "Object Store Buckets"
|
displayName = "Object Store Buckets"
|
||||||
} else if len(breadcrumbs) == 1 && part == "table-buckets" {
|
} else if len(breadcrumbs) == 1 && part == "table-buckets" {
|
||||||
displayName = "Table Buckets"
|
displayName = "Table Buckets"
|
||||||
} else if len(breadcrumbs) == 2 && strings.HasPrefix(dir, "/buckets/") {
|
} else if len(breadcrumbs) == 2 && strings.HasPrefix(dir, "/buckets/") {
|
||||||
displayName = "📦 " + part // Add bucket icon to bucket name
|
displayName = "📦 " + part // Add bucket icon to bucket name
|
||||||
} else if len(breadcrumbs) == 2 && strings.HasPrefix(dir, "/table-buckets/") {
|
} else if len(breadcrumbs) == 2 && strings.HasPrefix(dir, "/table-buckets/") {
|
||||||
displayName = "🧊 " + part
|
displayName = "🧊 " + part
|
||||||
}
|
}
|
||||||
|
|
||||||
breadcrumbs = append(breadcrumbs, BreadcrumbItem{
|
breadcrumbs = append(breadcrumbs, BreadcrumbItem{
|
||||||
Name: displayName,
|
Name: displayName,
|
||||||
|
|||||||
@@ -157,6 +157,13 @@ func runFuse(cmd *Command, args []string) bool {
|
|||||||
} else {
|
} else {
|
||||||
panic(fmt.Errorf("cacheMetaTtlSec: %s", err))
|
panic(fmt.Errorf("cacheMetaTtlSec: %s", err))
|
||||||
}
|
}
|
||||||
|
case "dirIdleEvictSec":
|
||||||
|
if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil {
|
||||||
|
intValue := int(parsed)
|
||||||
|
mountOptions.dirIdleEvictSec = &intValue
|
||||||
|
} else {
|
||||||
|
panic(fmt.Errorf("dirIdleEvictSec: %s", err))
|
||||||
|
}
|
||||||
case "concurrentWriters":
|
case "concurrentWriters":
|
||||||
i++
|
i++
|
||||||
if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil {
|
if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil {
|
||||||
|
|||||||
@@ -48,6 +48,8 @@ type MountOptions struct {
|
|||||||
rdmaMaxConcurrent *int
|
rdmaMaxConcurrent *int
|
||||||
rdmaTimeoutMs *int
|
rdmaTimeoutMs *int
|
||||||
|
|
||||||
|
dirIdleEvictSec *int
|
||||||
|
|
||||||
// FUSE performance options
|
// FUSE performance options
|
||||||
writebackCache *bool
|
writebackCache *bool
|
||||||
asyncDio *bool
|
asyncDio *bool
|
||||||
@@ -107,6 +109,8 @@ func init() {
|
|||||||
mountOptions.rdmaMaxConcurrent = cmdMount.Flag.Int("rdma.maxConcurrent", 64, "max concurrent RDMA operations")
|
mountOptions.rdmaMaxConcurrent = cmdMount.Flag.Int("rdma.maxConcurrent", 64, "max concurrent RDMA operations")
|
||||||
mountOptions.rdmaTimeoutMs = cmdMount.Flag.Int("rdma.timeoutMs", 5000, "RDMA operation timeout in milliseconds")
|
mountOptions.rdmaTimeoutMs = cmdMount.Flag.Int("rdma.timeoutMs", 5000, "RDMA operation timeout in milliseconds")
|
||||||
|
|
||||||
|
mountOptions.dirIdleEvictSec = cmdMount.Flag.Int("dirIdleEvictSec", 600, "seconds to evict idle cached directories (0 to disable)")
|
||||||
|
|
||||||
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
|
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
|
||||||
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")
|
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")
|
||||||
mountReadRetryTime = cmdMount.Flag.Duration("readRetryTime", 6*time.Second, "maximum read retry wait time")
|
mountReadRetryTime = cmdMount.Flag.Duration("readRetryTime", 6*time.Second, "maximum read retry wait time")
|
||||||
|
|||||||
@@ -270,6 +270,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
|
|||||||
RdmaReadOnly: *option.rdmaReadOnly,
|
RdmaReadOnly: *option.rdmaReadOnly,
|
||||||
RdmaMaxConcurrent: *option.rdmaMaxConcurrent,
|
RdmaMaxConcurrent: *option.rdmaMaxConcurrent,
|
||||||
RdmaTimeoutMs: *option.rdmaTimeoutMs,
|
RdmaTimeoutMs: *option.rdmaTimeoutMs,
|
||||||
|
DirIdleEvictSec: *option.dirIdleEvictSec,
|
||||||
})
|
})
|
||||||
|
|
||||||
// create mount root
|
// create mount root
|
||||||
|
|||||||
@@ -22,6 +22,19 @@ type InodeEntry struct {
|
|||||||
isDirectory bool
|
isDirectory bool
|
||||||
isChildrenCached bool
|
isChildrenCached bool
|
||||||
cachedExpiresTime time.Time
|
cachedExpiresTime time.Time
|
||||||
|
lastAccess time.Time
|
||||||
|
lastRefresh time.Time
|
||||||
|
updateWindowStart time.Time
|
||||||
|
updateCount int
|
||||||
|
needsRefresh bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ie *InodeEntry) resetCacheState() {
|
||||||
|
ie.isChildrenCached = false
|
||||||
|
ie.cachedExpiresTime = time.Time{}
|
||||||
|
ie.needsRefresh = false
|
||||||
|
ie.updateCount = 0
|
||||||
|
ie.updateWindowStart = time.Time{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ie *InodeEntry) removeOnePath(p util.FullPath) bool {
|
func (ie *InodeEntry) removeOnePath(p util.FullPath) bool {
|
||||||
@@ -51,7 +64,12 @@ func NewInodeToPath(root util.FullPath, ttlSec int) *InodeToPath {
|
|||||||
path2inode: make(map[util.FullPath]uint64),
|
path2inode: make(map[util.FullPath]uint64),
|
||||||
cacheMetaTtlSec: time.Second * time.Duration(ttlSec),
|
cacheMetaTtlSec: time.Second * time.Duration(ttlSec),
|
||||||
}
|
}
|
||||||
t.inode2path[1] = &InodeEntry{[]util.FullPath{root}, 1, true, false, time.Time{}}
|
t.inode2path[1] = &InodeEntry{
|
||||||
|
paths: []util.FullPath{root},
|
||||||
|
nlookup: 1,
|
||||||
|
isDirectory: true,
|
||||||
|
lastAccess: time.Now(),
|
||||||
|
}
|
||||||
t.path2inode[root] = 1
|
t.path2inode[root] = 1
|
||||||
|
|
||||||
return t
|
return t
|
||||||
@@ -94,9 +112,16 @@ func (i *InodeToPath) Lookup(path util.FullPath, unixTime int64, isDirectory boo
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if !isLookup {
|
if !isLookup {
|
||||||
i.inode2path[inode] = &InodeEntry{[]util.FullPath{path}, 0, isDirectory, false, time.Time{}}
|
i.inode2path[inode] = &InodeEntry{
|
||||||
|
paths: []util.FullPath{path},
|
||||||
|
isDirectory: isDirectory,
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
i.inode2path[inode] = &InodeEntry{[]util.FullPath{path}, 1, isDirectory, false, time.Time{}}
|
i.inode2path[inode] = &InodeEntry{
|
||||||
|
paths: []util.FullPath{path},
|
||||||
|
nlookup: 1,
|
||||||
|
isDirectory: isDirectory,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -163,8 +188,14 @@ func (i *InodeToPath) MarkChildrenCached(fullpath util.FullPath) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
path.isChildrenCached = true
|
path.isChildrenCached = true
|
||||||
|
now := time.Now()
|
||||||
|
path.lastAccess = now
|
||||||
|
path.lastRefresh = now
|
||||||
|
path.updateCount = 0
|
||||||
|
path.needsRefresh = false
|
||||||
|
path.updateWindowStart = time.Time{}
|
||||||
if i.cacheMetaTtlSec > 0 {
|
if i.cacheMetaTtlSec > 0 {
|
||||||
path.cachedExpiresTime = time.Now().Add(i.cacheMetaTtlSec)
|
path.cachedExpiresTime = now.Add(i.cacheMetaTtlSec)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -200,12 +231,120 @@ func (i *InodeToPath) InvalidateAllChildrenCache() {
|
|||||||
defer i.Unlock()
|
defer i.Unlock()
|
||||||
for _, entry := range i.inode2path {
|
for _, entry := range i.inode2path {
|
||||||
if entry.isDirectory && entry.isChildrenCached {
|
if entry.isDirectory && entry.isChildrenCached {
|
||||||
entry.isChildrenCached = false
|
entry.resetCacheState()
|
||||||
entry.cachedExpiresTime = time.Time{}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (i *InodeToPath) InvalidateChildrenCache(fullpath util.FullPath) {
|
||||||
|
i.Lock()
|
||||||
|
defer i.Unlock()
|
||||||
|
inode, found := i.path2inode[fullpath]
|
||||||
|
if !found {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
entry, found := i.inode2path[inode]
|
||||||
|
if !found {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
entry.resetCacheState()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *InodeToPath) TouchDirectory(fullpath util.FullPath) {
|
||||||
|
i.Lock()
|
||||||
|
defer i.Unlock()
|
||||||
|
inode, found := i.path2inode[fullpath]
|
||||||
|
if !found {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
entry, found := i.inode2path[inode]
|
||||||
|
if !found || !entry.isDirectory {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
entry.lastAccess = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *InodeToPath) RecordDirectoryUpdate(fullpath util.FullPath, now time.Time, window time.Duration, threshold int) bool {
|
||||||
|
if threshold <= 0 || window <= 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
i.Lock()
|
||||||
|
defer i.Unlock()
|
||||||
|
inode, found := i.path2inode[fullpath]
|
||||||
|
if !found {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
entry, found := i.inode2path[inode]
|
||||||
|
if !found || !entry.isDirectory || !entry.isChildrenCached {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if entry.updateWindowStart.IsZero() || now.Sub(entry.updateWindowStart) > window {
|
||||||
|
entry.updateWindowStart = now
|
||||||
|
entry.updateCount = 0
|
||||||
|
}
|
||||||
|
entry.updateCount++
|
||||||
|
if entry.updateCount >= threshold {
|
||||||
|
entry.needsRefresh = true
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *InodeToPath) NeedsRefresh(fullpath util.FullPath) bool {
|
||||||
|
i.RLock()
|
||||||
|
defer i.RUnlock()
|
||||||
|
inode, found := i.path2inode[fullpath]
|
||||||
|
if !found {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
entry, found := i.inode2path[inode]
|
||||||
|
if !found || !entry.isDirectory {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return entry.isChildrenCached && entry.needsRefresh
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *InodeToPath) MarkDirectoryRefreshed(fullpath util.FullPath, now time.Time) {
|
||||||
|
i.Lock()
|
||||||
|
defer i.Unlock()
|
||||||
|
inode, found := i.path2inode[fullpath]
|
||||||
|
if !found {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
entry, found := i.inode2path[inode]
|
||||||
|
if !found || !entry.isDirectory {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
entry.lastRefresh = now
|
||||||
|
entry.lastAccess = now
|
||||||
|
entry.updateCount = 0
|
||||||
|
entry.needsRefresh = false
|
||||||
|
entry.updateWindowStart = time.Time{}
|
||||||
|
if i.cacheMetaTtlSec > 0 {
|
||||||
|
entry.cachedExpiresTime = now.Add(i.cacheMetaTtlSec)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *InodeToPath) CollectEvictableDirs(now time.Time, idle time.Duration) []util.FullPath {
|
||||||
|
if idle <= 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
i.Lock()
|
||||||
|
defer i.Unlock()
|
||||||
|
var dirs []util.FullPath
|
||||||
|
for _, entry := range i.inode2path {
|
||||||
|
if !entry.isDirectory || !entry.isChildrenCached {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if entry.lastAccess.IsZero() || now.Sub(entry.lastAccess) < idle {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
entry.resetCacheState()
|
||||||
|
dirs = append(dirs, entry.paths...)
|
||||||
|
}
|
||||||
|
return dirs
|
||||||
|
}
|
||||||
|
|
||||||
func (i *InodeToPath) AddPath(inode uint64, path util.FullPath) {
|
func (i *InodeToPath) AddPath(inode uint64, path util.FullPath) {
|
||||||
i.Lock()
|
i.Lock()
|
||||||
defer i.Unlock()
|
defer i.Unlock()
|
||||||
@@ -217,10 +356,9 @@ func (i *InodeToPath) AddPath(inode uint64, path util.FullPath) {
|
|||||||
ie.nlookup++
|
ie.nlookup++
|
||||||
} else {
|
} else {
|
||||||
i.inode2path[inode] = &InodeEntry{
|
i.inode2path[inode] = &InodeEntry{
|
||||||
paths: []util.FullPath{path},
|
paths: []util.FullPath{path},
|
||||||
nlookup: 1,
|
nlookup: 1,
|
||||||
isDirectory: false,
|
isDirectory: false,
|
||||||
isChildrenCached: false,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -268,7 +406,7 @@ func (i *InodeToPath) MovePath(sourcePath, targetPath util.FullPath) (sourceInod
|
|||||||
entry.paths[i] = targetPath
|
entry.paths[i] = targetPath
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
entry.isChildrenCached = false
|
entry.resetCacheState()
|
||||||
} else {
|
} else {
|
||||||
glog.Errorf("MovePath %s to %s: sourceInode %d not found", sourcePath, targetPath, sourceInode)
|
glog.Errorf("MovePath %s to %s: sourceInode %d not found", sourcePath, targetPath, sourceInode)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,23 +23,25 @@ type MetaCache struct {
|
|||||||
localStore filer.VirtualFilerStore
|
localStore filer.VirtualFilerStore
|
||||||
leveldbStore *leveldb.LevelDBStore // direct reference for batch operations
|
leveldbStore *leveldb.LevelDBStore // direct reference for batch operations
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
uidGidMapper *UidGidMapper
|
uidGidMapper *UidGidMapper
|
||||||
markCachedFn func(fullpath util.FullPath)
|
markCachedFn func(fullpath util.FullPath)
|
||||||
isCachedFn func(fullpath util.FullPath) bool
|
isCachedFn func(fullpath util.FullPath) bool
|
||||||
invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry)
|
invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry)
|
||||||
visitGroup singleflight.Group // deduplicates concurrent EnsureVisited calls for the same path
|
onDirectoryUpdate func(dir util.FullPath)
|
||||||
|
visitGroup singleflight.Group // deduplicates concurrent EnsureVisited calls for the same path
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, root util.FullPath,
|
func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, root util.FullPath,
|
||||||
markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache {
|
markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry), onDirectoryUpdate func(dir util.FullPath)) *MetaCache {
|
||||||
leveldbStore, virtualStore := openMetaStore(dbFolder)
|
leveldbStore, virtualStore := openMetaStore(dbFolder)
|
||||||
return &MetaCache{
|
return &MetaCache{
|
||||||
root: root,
|
root: root,
|
||||||
localStore: virtualStore,
|
localStore: virtualStore,
|
||||||
leveldbStore: leveldbStore,
|
leveldbStore: leveldbStore,
|
||||||
markCachedFn: markCachedFn,
|
markCachedFn: markCachedFn,
|
||||||
isCachedFn: isCachedFn,
|
isCachedFn: isCachedFn,
|
||||||
uidGidMapper: uidGidMapper,
|
uidGidMapper: uidGidMapper,
|
||||||
|
onDirectoryUpdate: onDirectoryUpdate,
|
||||||
invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) {
|
invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) {
|
||||||
invalidateFunc(fullpath, entry)
|
invalidateFunc(fullpath, entry)
|
||||||
},
|
},
|
||||||
@@ -193,3 +195,9 @@ func (mc *MetaCache) Debug() {
|
|||||||
func (mc *MetaCache) IsDirectoryCached(dirPath util.FullPath) bool {
|
func (mc *MetaCache) IsDirectoryCached(dirPath util.FullPath) bool {
|
||||||
return mc.isCachedFn(dirPath)
|
return mc.isCachedFn(dirPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mc *MetaCache) noteDirectoryUpdate(dirPath util.FullPath) {
|
||||||
|
if mc.onDirectoryUpdate != nil {
|
||||||
|
mc.onDirectoryUpdate(dirPath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -77,6 +77,24 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
|
|||||||
}
|
}
|
||||||
err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
|
err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
if message.NewEntry != nil || message.OldEntry != nil {
|
||||||
|
dirsToNotify := make(map[util.FullPath]struct{})
|
||||||
|
if oldPath != "" {
|
||||||
|
parent, _ := oldPath.DirAndName()
|
||||||
|
dirsToNotify[util.FullPath(parent)] = struct{}{}
|
||||||
|
}
|
||||||
|
if newEntry != nil {
|
||||||
|
newParent, _ := newEntry.DirAndName()
|
||||||
|
dirsToNotify[util.FullPath(newParent)] = struct{}{}
|
||||||
|
}
|
||||||
|
if message.NewEntry != nil && message.NewEntry.IsDirectory {
|
||||||
|
childPath := util.NewFullPath(dir, message.NewEntry.Name)
|
||||||
|
dirsToNotify[childPath] = struct{}{}
|
||||||
|
}
|
||||||
|
for dirPath := range dirsToNotify {
|
||||||
|
mc.noteDirectoryUpdate(dirPath)
|
||||||
|
}
|
||||||
|
}
|
||||||
if message.OldEntry != nil && message.NewEntry != nil {
|
if message.OldEntry != nil && message.NewEntry != nil {
|
||||||
oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
|
oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
|
||||||
mc.invalidateFunc(oldKey, message.OldEntry)
|
mc.invalidateFunc(oldKey, message.OldEntry)
|
||||||
|
|||||||
@@ -75,6 +75,9 @@ type Option struct {
|
|||||||
RdmaMaxConcurrent int
|
RdmaMaxConcurrent int
|
||||||
RdmaTimeoutMs int
|
RdmaTimeoutMs int
|
||||||
|
|
||||||
|
// Directory cache refresh/eviction controls
|
||||||
|
DirIdleEvictSec int
|
||||||
|
|
||||||
uniqueCacheDirForRead string
|
uniqueCacheDirForRead string
|
||||||
uniqueCacheDirForWrite string
|
uniqueCacheDirForWrite string
|
||||||
}
|
}
|
||||||
@@ -102,8 +105,19 @@ type WFS struct {
|
|||||||
rdmaClient *RDMAMountClient
|
rdmaClient *RDMAMountClient
|
||||||
FilerConf *filer.FilerConf
|
FilerConf *filer.FilerConf
|
||||||
filerClient *wdclient.FilerClient // Cached volume location client
|
filerClient *wdclient.FilerClient // Cached volume location client
|
||||||
|
refreshMu sync.Mutex
|
||||||
|
refreshingDirs map[util.FullPath]struct{}
|
||||||
|
dirHotWindow time.Duration
|
||||||
|
dirHotThreshold int
|
||||||
|
dirIdleEvict time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultDirHotWindow = 2 * time.Second
|
||||||
|
defaultDirHotThreshold = 64
|
||||||
|
defaultDirIdleEvict = 10 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
func NewSeaweedFileSystem(option *Option) *WFS {
|
func NewSeaweedFileSystem(option *Option) *WFS {
|
||||||
// Only create FilerClient for direct volume access modes
|
// Only create FilerClient for direct volume access modes
|
||||||
// When VolumeServerAccess == "filerProxy", all reads go through filer, so no volume lookup needed
|
// When VolumeServerAccess == "filerProxy", all reads go through filer, so no volume lookup needed
|
||||||
@@ -127,15 +141,28 @@ func NewSeaweedFileSystem(option *Option) *WFS {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dirHotWindow := defaultDirHotWindow
|
||||||
|
dirHotThreshold := defaultDirHotThreshold
|
||||||
|
dirIdleEvict := defaultDirIdleEvict
|
||||||
|
if option.DirIdleEvictSec != 0 {
|
||||||
|
dirIdleEvict = time.Duration(option.DirIdleEvictSec) * time.Second
|
||||||
|
} else {
|
||||||
|
dirIdleEvict = 0
|
||||||
|
}
|
||||||
|
|
||||||
wfs := &WFS{
|
wfs := &WFS{
|
||||||
RawFileSystem: fuse.NewDefaultRawFileSystem(),
|
RawFileSystem: fuse.NewDefaultRawFileSystem(),
|
||||||
option: option,
|
option: option,
|
||||||
signature: util.RandomInt32(),
|
signature: util.RandomInt32(),
|
||||||
inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath), option.CacheMetaTTlSec),
|
inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath), option.CacheMetaTTlSec),
|
||||||
fhMap: NewFileHandleToInode(),
|
fhMap: NewFileHandleToInode(),
|
||||||
dhMap: NewDirectoryHandleToInode(),
|
dhMap: NewDirectoryHandleToInode(),
|
||||||
filerClient: filerClient, // nil for proxy mode, initialized for direct access
|
filerClient: filerClient, // nil for proxy mode, initialized for direct access
|
||||||
fhLockTable: util.NewLockTable[FileHandleId](),
|
fhLockTable: util.NewLockTable[FileHandleId](),
|
||||||
|
refreshingDirs: make(map[util.FullPath]struct{}),
|
||||||
|
dirHotWindow: dirHotWindow,
|
||||||
|
dirHotThreshold: dirHotThreshold,
|
||||||
|
dirIdleEvict: dirIdleEvict,
|
||||||
}
|
}
|
||||||
|
|
||||||
wfs.option.filerIndex = int32(rand.IntN(len(option.FilerAddresses)))
|
wfs.option.filerIndex = int32(rand.IntN(len(option.FilerAddresses)))
|
||||||
@@ -171,6 +198,10 @@ func NewSeaweedFileSystem(option *Option) *WFS {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}, func(dirPath util.FullPath) {
|
||||||
|
if wfs.inodeToPath.RecordDirectoryUpdate(dirPath, time.Now(), wfs.dirHotWindow, wfs.dirHotThreshold) {
|
||||||
|
wfs.maybeRefreshDirectory(dirPath)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
grace.OnInterrupt(func() {
|
grace.OnInterrupt(func() {
|
||||||
wfs.metaCache.Shutdown()
|
wfs.metaCache.Shutdown()
|
||||||
@@ -224,6 +255,7 @@ func (wfs *WFS) StartBackgroundTasks() error {
|
|||||||
}, follower)
|
}, follower)
|
||||||
go wfs.loopCheckQuota()
|
go wfs.loopCheckQuota()
|
||||||
go wfs.loopFlushDirtyMetadata()
|
go wfs.loopFlushDirtyMetadata()
|
||||||
|
go wfs.loopEvictIdleDirCache()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -339,6 +371,49 @@ func (wfs *WFS) ClearCacheDir() {
|
|||||||
os.RemoveAll(wfs.option.getUniqueCacheDirForRead())
|
os.RemoveAll(wfs.option.getUniqueCacheDirForRead())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (wfs *WFS) maybeRefreshDirectory(dirPath util.FullPath) {
|
||||||
|
if !wfs.inodeToPath.NeedsRefresh(dirPath) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
wfs.refreshMu.Lock()
|
||||||
|
if _, exists := wfs.refreshingDirs[dirPath]; exists {
|
||||||
|
wfs.refreshMu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
wfs.refreshingDirs[dirPath] = struct{}{}
|
||||||
|
wfs.refreshMu.Unlock()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
wfs.refreshMu.Lock()
|
||||||
|
delete(wfs.refreshingDirs, dirPath)
|
||||||
|
wfs.refreshMu.Unlock()
|
||||||
|
}()
|
||||||
|
wfs.inodeToPath.InvalidateChildrenCache(dirPath)
|
||||||
|
if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil {
|
||||||
|
glog.Warningf("refresh dir cache %s: %v", dirPath, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
wfs.inodeToPath.MarkDirectoryRefreshed(dirPath, time.Now())
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wfs *WFS) loopEvictIdleDirCache() {
|
||||||
|
if wfs.dirIdleEvict <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ticker := time.NewTicker(wfs.dirIdleEvict / 2)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for range ticker.C {
|
||||||
|
dirs := wfs.inodeToPath.CollectEvictableDirs(time.Now(), wfs.dirIdleEvict)
|
||||||
|
for _, dir := range dirs {
|
||||||
|
if err := wfs.metaCache.DeleteFolderChildren(context.Background(), dir); err != nil {
|
||||||
|
glog.V(2).Infof("evict dir cache %s: %v", dir, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (option *Option) setupUniqueCacheDirectory() {
|
func (option *Option) setupUniqueCacheDirectory() {
|
||||||
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + version.Version()))[0:8]
|
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + version.Version()))[0:8]
|
||||||
option.uniqueCacheDirForRead = path.Join(option.CacheDirForRead, cacheUniqueId)
|
option.uniqueCacheDirForRead = path.Join(option.CacheDirForRead, cacheUniqueId)
|
||||||
|
|||||||
@@ -71,6 +71,7 @@ func (wfs *WFS) Mkdir(cancel <-chan struct{}, in *fuse.MkdirIn, name string, out
|
|||||||
// Only cache the entry if the parent directory is already cached.
|
// Only cache the entry if the parent directory is already cached.
|
||||||
// This avoids polluting the cache with partial directory data.
|
// This avoids polluting the cache with partial directory data.
|
||||||
if wfs.metaCache.IsDirectoryCached(dirFullPath) {
|
if wfs.metaCache.IsDirectoryCached(dirFullPath) {
|
||||||
|
wfs.inodeToPath.TouchDirectory(dirFullPath)
|
||||||
if err := wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
|
if err := wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
|
||||||
return fmt.Errorf("local mkdir dir %s: %w", entryFullPath, err)
|
return fmt.Errorf("local mkdir dir %s: %w", entryFullPath, err)
|
||||||
}
|
}
|
||||||
@@ -122,6 +123,7 @@ func (wfs *WFS) Rmdir(cancel <-chan struct{}, header *fuse.InHeader, name string
|
|||||||
|
|
||||||
wfs.metaCache.DeleteEntry(context.Background(), entryFullPath)
|
wfs.metaCache.DeleteEntry(context.Background(), entryFullPath)
|
||||||
wfs.inodeToPath.RemovePath(entryFullPath)
|
wfs.inodeToPath.RemovePath(entryFullPath)
|
||||||
|
wfs.inodeToPath.TouchDirectory(dirFullPath)
|
||||||
|
|
||||||
return fuse.OK
|
return fuse.OK
|
||||||
|
|
||||||
|
|||||||
@@ -163,6 +163,8 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
|
|||||||
if code != fuse.OK {
|
if code != fuse.OK {
|
||||||
return code
|
return code
|
||||||
}
|
}
|
||||||
|
wfs.inodeToPath.TouchDirectory(dirPath)
|
||||||
|
wfs.maybeRefreshDirectory(dirPath)
|
||||||
|
|
||||||
var dirEntry fuse.DirEntry
|
var dirEntry fuse.DirEntry
|
||||||
|
|
||||||
|
|||||||
@@ -91,6 +91,7 @@ func (wfs *WFS) Mknod(cancel <-chan struct{}, in *fuse.MknodIn, name string, out
|
|||||||
// Only cache the entry if the parent directory is already cached.
|
// Only cache the entry if the parent directory is already cached.
|
||||||
// This avoids polluting the cache with partial directory data.
|
// This avoids polluting the cache with partial directory data.
|
||||||
if wfs.metaCache.IsDirectoryCached(dirFullPath) {
|
if wfs.metaCache.IsDirectoryCached(dirFullPath) {
|
||||||
|
wfs.inodeToPath.TouchDirectory(dirFullPath)
|
||||||
if err := wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
|
if err := wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
|
||||||
return fmt.Errorf("local mknod %s: %w", entryFullPath, err)
|
return fmt.Errorf("local mknod %s: %w", entryFullPath, err)
|
||||||
}
|
}
|
||||||
@@ -153,6 +154,7 @@ func (wfs *WFS) Unlink(cancel <-chan struct{}, header *fuse.InHeader, name strin
|
|||||||
glog.V(3).Infof("local DeleteEntry %s: %v", entryFullPath, err)
|
glog.V(3).Infof("local DeleteEntry %s: %v", entryFullPath, err)
|
||||||
return fuse.EIO
|
return fuse.EIO
|
||||||
}
|
}
|
||||||
|
wfs.inodeToPath.TouchDirectory(dirFullPath)
|
||||||
|
|
||||||
wfs.inodeToPath.RemovePath(entryFullPath)
|
wfs.inodeToPath.RemovePath(entryFullPath)
|
||||||
|
|
||||||
|
|||||||
@@ -220,6 +220,8 @@ func (wfs *WFS) Rename(cancel <-chan struct{}, in *fuse.RenameIn, oldName string
|
|||||||
glog.V(0).Infof("Link: %v", err)
|
glog.V(0).Infof("Link: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
wfs.inodeToPath.TouchDirectory(oldDir)
|
||||||
|
wfs.inodeToPath.TouchDirectory(newDir)
|
||||||
|
|
||||||
return fuse.OK
|
return fuse.OK
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user