mount: invalidate meta cache on follow retry (#8173)
* mount: invalidate meta cache on follow retry * mount: clear cache under mount root on retry
This commit is contained in:
@@ -195,6 +195,17 @@ func (i *InodeToPath) HasInode(inode uint64) bool {
|
|||||||
return found
|
return found
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (i *InodeToPath) InvalidateAllChildrenCache() {
|
||||||
|
i.Lock()
|
||||||
|
defer i.Unlock()
|
||||||
|
for _, entry := range i.inode2path {
|
||||||
|
if entry.isDirectory && entry.isChildrenCached {
|
||||||
|
entry.isChildrenCached = false
|
||||||
|
entry.cachedExpiresTime = time.Time{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (i *InodeToPath) AddPath(inode uint64, path util.FullPath) {
|
func (i *InodeToPath) AddPath(inode uint64, path util.FullPath) {
|
||||||
i.Lock()
|
i.Lock()
|
||||||
defer i.Unlock()
|
defer i.Unlock()
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ func mergeProcessors(mainProcessor func(resp *filer_pb.SubscribeMetadataResponse
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64, followers ...*MetadataFollower) error {
|
func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64, onRetry func(lastTsNs int64, err error), followers ...*MetadataFollower) error {
|
||||||
|
|
||||||
var prefixes []string
|
var prefixes []string
|
||||||
for _, follower := range followers {
|
for _, follower := range followers {
|
||||||
@@ -117,6 +117,9 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
|
|||||||
metadataFollowOption.ClientEpoch++
|
metadataFollowOption.ClientEpoch++
|
||||||
return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, mergeProcessors(processEventFn, followers...))
|
return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, mergeProcessors(processEventFn, followers...))
|
||||||
}, func(err error) bool {
|
}, func(err error) bool {
|
||||||
|
if onRetry != nil {
|
||||||
|
onRetry(metadataFollowOption.StartTsNs, err)
|
||||||
|
}
|
||||||
glog.Errorf("follow metadata updates: %v", err)
|
glog.Errorf("follow metadata updates: %v", err)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -215,7 +215,13 @@ func (wfs *WFS) StartBackgroundTasks() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), follower)
|
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), func(lastTsNs int64, err error) {
|
||||||
|
glog.Warningf("meta events follow retry from %v: %v", time.Unix(0, lastTsNs), err)
|
||||||
|
if deleteErr := wfs.metaCache.DeleteFolderChildren(context.Background(), util.FullPath(wfs.option.FilerMountRootPath)); deleteErr != nil {
|
||||||
|
glog.Warningf("meta cache cleanup failed: %v", deleteErr)
|
||||||
|
}
|
||||||
|
wfs.inodeToPath.InvalidateAllChildrenCache()
|
||||||
|
}, follower)
|
||||||
go wfs.loopCheckQuota()
|
go wfs.loopCheckQuota()
|
||||||
go wfs.loopFlushDirtyMetadata()
|
go wfs.loopFlushDirtyMetadata()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user