read from meta cache
meta cache is not initialized
This commit is contained in:
@@ -206,7 +206,11 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
|
|||||||
entry := dir.wfs.cacheGet(fullFilePath)
|
entry := dir.wfs.cacheGet(fullFilePath)
|
||||||
|
|
||||||
if dir.wfs.option.AsyncMetaDataCaching {
|
if dir.wfs.option.AsyncMetaDataCaching {
|
||||||
|
cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
|
||||||
|
if cacheErr == filer_pb.ErrNotFound {
|
||||||
|
return nil, fuse.ENOENT
|
||||||
|
}
|
||||||
|
entry = cachedEntry.ToProtoEntry()
|
||||||
}
|
}
|
||||||
|
|
||||||
if entry == nil {
|
if entry == nil {
|
||||||
@@ -248,13 +252,8 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
|
|||||||
|
|
||||||
glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath())
|
glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath())
|
||||||
|
|
||||||
if dir.wfs.option.AsyncMetaDataCaching {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
cacheTtl := 5 * time.Minute
|
cacheTtl := 5 * time.Minute
|
||||||
|
processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) {
|
||||||
readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", func(entry *filer_pb.Entry, isLast bool) {
|
|
||||||
fullpath := util.NewFullPath(dir.FullPath(), entry.Name)
|
fullpath := util.NewFullPath(dir.FullPath(), entry.Name)
|
||||||
inode := fullpath.AsInode()
|
inode := fullpath.AsInode()
|
||||||
if entry.IsDirectory {
|
if entry.IsDirectory {
|
||||||
@@ -265,7 +264,21 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
|
|||||||
ret = append(ret, dirent)
|
ret = append(ret, dirent)
|
||||||
}
|
}
|
||||||
dir.wfs.cacheSet(fullpath, entry, cacheTtl)
|
dir.wfs.cacheSet(fullpath, entry, cacheTtl)
|
||||||
})
|
}
|
||||||
|
|
||||||
|
if dir.wfs.option.AsyncMetaDataCaching {
|
||||||
|
listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit))
|
||||||
|
if listErr != nil {
|
||||||
|
glog.Errorf("list meta cache: %v", listErr)
|
||||||
|
return nil, fuse.EIO
|
||||||
|
}
|
||||||
|
for _, cachedEntry := range listedEntries {
|
||||||
|
processEachEntryFn(cachedEntry.ToProtoEntry(), false)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", processEachEntryFn)
|
||||||
if readErr != nil {
|
if readErr != nil {
|
||||||
glog.V(0).Infof("list %s: %v", dir.FullPath(), err)
|
glog.V(0).Infof("list %s: %v", dir.FullPath(), err)
|
||||||
return ret, fuse.EIO
|
return ret, fuse.EIO
|
||||||
|
|||||||
32
weed/filesys/meta_cache/cache_config.go
Normal file
32
weed/filesys/meta_cache/cache_config.go
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
package meta_cache
|
||||||
|
|
||||||
|
import "github.com/chrislusf/seaweedfs/weed/util"
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ = util.Configuration(&cacheConfig{})
|
||||||
|
)
|
||||||
|
|
||||||
|
// implementing util.Configuraion
|
||||||
|
type cacheConfig struct {
|
||||||
|
dir string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c cacheConfig) GetString(key string) string {
|
||||||
|
return c.dir
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c cacheConfig) GetBool(key string) bool {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c cacheConfig) GetInt(key string) int {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c cacheConfig) GetStringSlice(key string) []string {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c cacheConfig) SetDefault(key string, value interface{}) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
34
weed/filesys/meta_cache/meta_cache.go
Normal file
34
weed/filesys/meta_cache/meta_cache.go
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
package meta_cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/filer2"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MetaCache struct {
|
||||||
|
filer2.FilerStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMetaCache(dbFolder string) *MetaCache {
|
||||||
|
return &MetaCache{
|
||||||
|
FilerStore: OpenMetaStore(dbFolder),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func OpenMetaStore(dbFolder string) filer2.FilerStore {
|
||||||
|
|
||||||
|
os.MkdirAll(dbFolder, 0755)
|
||||||
|
|
||||||
|
store := &leveldb.LevelDBStore{}
|
||||||
|
config := &cacheConfig{}
|
||||||
|
|
||||||
|
if err := store.Initialize(config, ""); err != nil {
|
||||||
|
glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return store
|
||||||
|
|
||||||
|
}
|
||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -12,6 +13,7 @@ import (
|
|||||||
"github.com/karlseguin/ccache"
|
"github.com/karlseguin/ccache"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
@@ -67,6 +69,7 @@ type WFS struct {
|
|||||||
fsNodeCache *FsCache
|
fsNodeCache *FsCache
|
||||||
|
|
||||||
chunkCache *chunk_cache.ChunkCache
|
chunkCache *chunk_cache.ChunkCache
|
||||||
|
metaCache *meta_cache.MetaCache
|
||||||
}
|
}
|
||||||
type statsCache struct {
|
type statsCache struct {
|
||||||
filer_pb.StatisticsResponse
|
filer_pb.StatisticsResponse
|
||||||
@@ -90,6 +93,9 @@ func NewSeaweedFileSystem(option *Option) *WFS {
|
|||||||
wfs.chunkCache.Shutdown()
|
wfs.chunkCache.Shutdown()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
if wfs.option.AsyncMetaDataCaching {
|
||||||
|
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta"))
|
||||||
|
}
|
||||||
|
|
||||||
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
|
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
|
||||||
wfs.fsNodeCache = newFsCache(wfs.root)
|
wfs.fsNodeCache = newFsCache(wfs.root)
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
package filesys
|
package filesys
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||||
"github.com/chrislusf/seaweedfs/weed/util"
|
"github.com/chrislusf/seaweedfs/weed/util"
|
||||||
@@ -114,8 +116,13 @@ func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err err
|
|||||||
}
|
}
|
||||||
// glog.V(3).Infof("read entry cache miss %s", fullpath)
|
// glog.V(3).Infof("read entry cache miss %s", fullpath)
|
||||||
|
|
||||||
|
// read from async meta cache
|
||||||
if wfs.option.AsyncMetaDataCaching {
|
if wfs.option.AsyncMetaDataCaching {
|
||||||
|
cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
|
||||||
|
if cacheErr == filer_pb.ErrNotFound {
|
||||||
|
return nil, fuse.ENOENT
|
||||||
|
}
|
||||||
|
return cachedEntry.ToProtoEntry(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
err = wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|||||||
Reference in New Issue
Block a user