listen for metadata updates
This commit is contained in:
32
weed/mount/meta_cache/cache_config.go
Normal file
32
weed/mount/meta_cache/cache_config.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package meta_cache
|
||||
|
||||
import "github.com/chrislusf/seaweedfs/weed/util"
|
||||
|
||||
var (
|
||||
_ = util.Configuration(&cacheConfig{})
|
||||
)
|
||||
|
||||
// implementing util.Configuraion
|
||||
type cacheConfig struct {
|
||||
dir string
|
||||
}
|
||||
|
||||
func (c cacheConfig) GetString(key string) string {
|
||||
return c.dir
|
||||
}
|
||||
|
||||
func (c cacheConfig) GetBool(key string) bool {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c cacheConfig) GetInt(key string) int {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c cacheConfig) GetStringSlice(key string) []string {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c cacheConfig) SetDefault(key string, value interface{}) {
|
||||
panic("implement me")
|
||||
}
|
||||
101
weed/mount/meta_cache/id_mapper.go
Normal file
101
weed/mount/meta_cache/id_mapper.go
Normal file
@@ -0,0 +1,101 @@
|
||||
package meta_cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type UidGidMapper struct {
|
||||
uidMapper *IdMapper
|
||||
gidMapper *IdMapper
|
||||
}
|
||||
|
||||
type IdMapper struct {
|
||||
localToFiler map[uint32]uint32
|
||||
filerToLocal map[uint32]uint32
|
||||
}
|
||||
|
||||
// UidGidMapper translates local uid/gid to filer uid/gid
|
||||
// The local storage always persists the same as the filer.
|
||||
// The local->filer translation happens when updating the filer first and later saving to meta_cache.
|
||||
// And filer->local happens when reading from the meta_cache.
|
||||
func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) {
|
||||
uidMapper, err := newIdMapper(uidPairsStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
gidMapper, err := newIdMapper(gidPairStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &UidGidMapper{
|
||||
uidMapper: uidMapper,
|
||||
gidMapper: gidMapper,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32, uint32) {
|
||||
return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid)
|
||||
}
|
||||
func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32, uint32) {
|
||||
return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid)
|
||||
}
|
||||
|
||||
func (m *IdMapper) LocalToFiler(id uint32) uint32 {
|
||||
value, found := m.localToFiler[id]
|
||||
if found {
|
||||
return value
|
||||
}
|
||||
return id
|
||||
}
|
||||
func (m *IdMapper) FilerToLocal(id uint32) uint32 {
|
||||
value, found := m.filerToLocal[id]
|
||||
if found {
|
||||
return value
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
func newIdMapper(pairsStr string) (*IdMapper, error) {
|
||||
|
||||
localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &IdMapper{
|
||||
localToFiler: localToFiler,
|
||||
filerToLocal: filerToLocal,
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) {
|
||||
|
||||
if pairsStr == "" {
|
||||
return
|
||||
}
|
||||
|
||||
localToFiler = make(map[uint32]uint32)
|
||||
filerToLocal = make(map[uint32]uint32)
|
||||
for _, pairStr := range strings.Split(pairsStr, ",") {
|
||||
pair := strings.Split(pairStr, ":")
|
||||
localUidStr, filerUidStr := pair[0], pair[1]
|
||||
localUid, localUidErr := strconv.Atoi(localUidStr)
|
||||
if localUidErr != nil {
|
||||
err = fmt.Errorf("failed to parse local %s: %v", localUidStr, localUidErr)
|
||||
return
|
||||
}
|
||||
filerUid, filerUidErr := strconv.Atoi(filerUidStr)
|
||||
if filerUidErr != nil {
|
||||
err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr)
|
||||
return
|
||||
}
|
||||
localToFiler[uint32(localUid)] = uint32(filerUid)
|
||||
filerToLocal[uint32(filerUid)] = uint32(localUid)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
160
weed/mount/meta_cache/meta_cache.go
Normal file
160
weed/mount/meta_cache/meta_cache.go
Normal file
@@ -0,0 +1,160 @@
|
||||
package meta_cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||
"github.com/chrislusf/seaweedfs/weed/filer/leveldb"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"os"
|
||||
)
|
||||
|
||||
// need to have logic similar to FilerStoreWrapper
|
||||
// e.g. fill fileId field for chunks
|
||||
|
||||
type MetaCache struct {
|
||||
localStore filer.VirtualFilerStore
|
||||
// sync.RWMutex
|
||||
uidGidMapper *UidGidMapper
|
||||
markCachedFn func(fullpath util.FullPath)
|
||||
isCachedFn func(fullpath util.FullPath) bool
|
||||
invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry)
|
||||
}
|
||||
|
||||
func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache {
|
||||
return &MetaCache{
|
||||
localStore: openMetaStore(dbFolder),
|
||||
markCachedFn: markCachedFn,
|
||||
isCachedFn: isCachedFn,
|
||||
uidGidMapper: uidGidMapper,
|
||||
invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) {
|
||||
invalidateFunc(fullpath, entry)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func openMetaStore(dbFolder string) filer.VirtualFilerStore {
|
||||
|
||||
os.RemoveAll(dbFolder)
|
||||
os.MkdirAll(dbFolder, 0755)
|
||||
|
||||
store := &leveldb.LevelDBStore{}
|
||||
config := &cacheConfig{
|
||||
dir: dbFolder,
|
||||
}
|
||||
|
||||
if err := store.Initialize(config, ""); err != nil {
|
||||
glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
|
||||
}
|
||||
|
||||
return filer.NewFilerStoreWrapper(store)
|
||||
|
||||
}
|
||||
|
||||
func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error {
|
||||
//mc.Lock()
|
||||
//defer mc.Unlock()
|
||||
return mc.doInsertEntry(ctx, entry)
|
||||
}
|
||||
|
||||
func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
|
||||
return mc.localStore.InsertEntry(ctx, entry)
|
||||
}
|
||||
|
||||
func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
|
||||
//mc.Lock()
|
||||
//defer mc.Unlock()
|
||||
|
||||
oldDir, _ := oldPath.DirAndName()
|
||||
if mc.isCachedFn(util.FullPath(oldDir)) {
|
||||
if oldPath != "" {
|
||||
if newEntry != nil && oldPath == newEntry.FullPath {
|
||||
// skip the unnecessary deletion
|
||||
// leave the update to the following InsertEntry operation
|
||||
} else {
|
||||
glog.V(3).Infof("DeleteEntry %s", oldPath)
|
||||
if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// println("unknown old directory:", oldDir)
|
||||
}
|
||||
|
||||
if newEntry != nil {
|
||||
newDir, _ := newEntry.DirAndName()
|
||||
if mc.isCachedFn(util.FullPath(newDir)) {
|
||||
glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name())
|
||||
if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
|
||||
//mc.Lock()
|
||||
//defer mc.Unlock()
|
||||
return mc.localStore.UpdateEntry(ctx, entry)
|
||||
}
|
||||
|
||||
func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) {
|
||||
//mc.RLock()
|
||||
//defer mc.RUnlock()
|
||||
entry, err = mc.localStore.FindEntry(ctx, fp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mc.mapIdFromFilerToLocal(entry)
|
||||
return
|
||||
}
|
||||
|
||||
func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
|
||||
//mc.Lock()
|
||||
//defer mc.Unlock()
|
||||
return mc.localStore.DeleteEntry(ctx, fp)
|
||||
}
|
||||
func (mc *MetaCache) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
|
||||
//mc.Lock()
|
||||
//defer mc.Unlock()
|
||||
return mc.localStore.DeleteFolderChildren(ctx, fp)
|
||||
}
|
||||
|
||||
func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
|
||||
//mc.RLock()
|
||||
//defer mc.RUnlock()
|
||||
|
||||
if !mc.isCachedFn(dirPath) {
|
||||
// if this request comes after renaming, it should be fine
|
||||
glog.Warningf("unsynchronized dir: %v", dirPath)
|
||||
}
|
||||
|
||||
_, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool {
|
||||
mc.mapIdFromFilerToLocal(entry)
|
||||
return eachEntryFunc(entry)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (mc *MetaCache) Shutdown() {
|
||||
//mc.Lock()
|
||||
//defer mc.Unlock()
|
||||
mc.localStore.Shutdown()
|
||||
}
|
||||
|
||||
func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) {
|
||||
entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid)
|
||||
}
|
||||
|
||||
func (mc *MetaCache) Debug() {
|
||||
if debuggable, ok := mc.localStore.(filer.Debuggable); ok {
|
||||
println("start debugging")
|
||||
debuggable.Debug(os.Stderr)
|
||||
}
|
||||
}
|
||||
67
weed/mount/meta_cache/meta_cache_init.go
Normal file
67
weed/mount/meta_cache/meta_cache_init.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package meta_cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
|
||||
|
||||
for {
|
||||
|
||||
// the directory children are already cached
|
||||
// so no need for this and upper directories
|
||||
if mc.isCachedFn(dirPath) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := doEnsureVisited(mc, client, dirPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// continue to parent directory
|
||||
if dirPath != "/" {
|
||||
parent, _ := dirPath.DirAndName()
|
||||
dirPath = util.FullPath(parent)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error {
|
||||
|
||||
glog.V(4).Infof("ReadDirAllEntries %s ...", path)
|
||||
|
||||
err := util.Retry("ReadDirAllEntries", func() error {
|
||||
return filer_pb.ReadDirAllEntries(client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
|
||||
entry := filer.FromPbEntry(string(path), pbEntry)
|
||||
if IsHiddenSystemEntry(string(path), entry.Name()) {
|
||||
return nil
|
||||
}
|
||||
if err := mc.doInsertEntry(context.Background(), entry); err != nil {
|
||||
glog.V(0).Infof("read %s: %v", entry.FullPath, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
err = fmt.Errorf("list %s: %v", path, err)
|
||||
}
|
||||
mc.markCachedFn(path)
|
||||
return err
|
||||
}
|
||||
|
||||
func IsHiddenSystemEntry(dir, name string) bool {
|
||||
return dir == "/" && (name == "topics" || name == "etc")
|
||||
}
|
||||
68
weed/mount/meta_cache/meta_cache_subscribe.go
Normal file
68
weed/mount/meta_cache/meta_cache_subscribe.go
Normal file
@@ -0,0 +1,68 @@
|
||||
package meta_cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/chrislusf/seaweedfs/weed/filer"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error {
|
||||
|
||||
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
||||
message := resp.EventNotification
|
||||
|
||||
for _, sig := range message.Signatures {
|
||||
if sig == selfSignature && selfSignature != 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
dir := resp.Directory
|
||||
var oldPath util.FullPath
|
||||
var newEntry *filer.Entry
|
||||
if message.OldEntry != nil {
|
||||
oldPath = util.NewFullPath(dir, message.OldEntry.Name)
|
||||
glog.V(4).Infof("deleting %v", oldPath)
|
||||
}
|
||||
|
||||
if message.NewEntry != nil {
|
||||
if message.NewParentPath != "" {
|
||||
dir = message.NewParentPath
|
||||
}
|
||||
key := util.NewFullPath(dir, message.NewEntry.Name)
|
||||
glog.V(4).Infof("creating %v", key)
|
||||
newEntry = filer.FromPbEntry(dir, message.NewEntry)
|
||||
}
|
||||
err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
|
||||
if err == nil {
|
||||
if message.OldEntry != nil && message.NewEntry != nil {
|
||||
oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
|
||||
mc.invalidateFunc(oldKey, message.OldEntry)
|
||||
if message.OldEntry.Name != message.NewEntry.Name {
|
||||
newKey := util.NewFullPath(dir, message.NewEntry.Name)
|
||||
mc.invalidateFunc(newKey, message.NewEntry)
|
||||
}
|
||||
} else if message.OldEntry == nil && message.NewEntry != nil {
|
||||
// no need to invaalidate
|
||||
} else if message.OldEntry != nil && message.NewEntry == nil {
|
||||
oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
|
||||
mc.invalidateFunc(oldKey, message.OldEntry)
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
util.RetryForever("followMetaUpdates", func() error {
|
||||
return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, selfSignature, processEventFn, true)
|
||||
}, func(err error) bool {
|
||||
glog.Errorf("follow metadata updates: %v", err)
|
||||
return true
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user