filer: optimize for less number of directory lookup
bottom up directory lookup
This commit is contained in:
@@ -134,69 +134,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
dirParts := strings.Split(string(entry.FullPath), "/")
|
oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
|
||||||
|
|
||||||
// fmt.Printf("directory parts: %+v\n", dirParts)
|
|
||||||
|
|
||||||
var lastDirectoryEntry *Entry
|
|
||||||
|
|
||||||
for i := 1; i < len(dirParts); i++ {
|
|
||||||
dirPath := "/" + util.Join(dirParts[:i]...)
|
|
||||||
// fmt.Printf("%d directory: %+v\n", i, dirPath)
|
|
||||||
|
|
||||||
// check the store directly
|
|
||||||
glog.V(4).Infof("find uncached directory: %s", dirPath)
|
|
||||||
dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
|
|
||||||
|
|
||||||
// no such existing directory
|
|
||||||
if dirEntry == nil {
|
|
||||||
|
|
||||||
// create the directory
|
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
dirEntry = &Entry{
|
|
||||||
FullPath: util.FullPath(dirPath),
|
|
||||||
Attr: Attr{
|
|
||||||
Mtime: now,
|
|
||||||
Crtime: now,
|
|
||||||
Mode: os.ModeDir | entry.Mode | 0110,
|
|
||||||
Uid: entry.Uid,
|
|
||||||
Gid: entry.Gid,
|
|
||||||
Collection: entry.Collection,
|
|
||||||
Replication: entry.Replication,
|
|
||||||
UserName: entry.UserName,
|
|
||||||
GroupNames: entry.GroupNames,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
|
|
||||||
mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
|
|
||||||
if mkdirErr != nil {
|
|
||||||
if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
|
|
||||||
glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
|
|
||||||
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
f.maybeAddBucket(dirEntry)
|
|
||||||
f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
} else if !dirEntry.IsDirectory() {
|
|
||||||
glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
|
|
||||||
return fmt.Errorf("%s is a file", dirPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
// remember the direct parent directory entry
|
|
||||||
if i == len(dirParts)-1 {
|
|
||||||
lastDirectoryEntry = dirEntry
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
if lastDirectoryEntry == nil {
|
|
||||||
glog.Errorf("CreateEntry %s: lastDirectoryEntry is nil", entry.FullPath)
|
|
||||||
return fmt.Errorf("parent folder not found: %v", entry.FullPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
if !hasWritePermission(lastDirectoryEntry, entry) {
|
if !hasWritePermission(lastDirectoryEntry, entry) {
|
||||||
@@ -206,9 +144,13 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
|
|||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
|
|
||||||
|
|
||||||
if oldEntry == nil {
|
if oldEntry == nil {
|
||||||
|
|
||||||
|
dirParts := strings.Split(string(entry.FullPath), "/")
|
||||||
|
if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
|
glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
|
||||||
if err := f.Store.InsertEntry(ctx, entry); err != nil {
|
if err := f.Store.InsertEntry(ctx, entry); err != nil {
|
||||||
glog.Errorf("insert entry %s: %v", entry.FullPath, err)
|
glog.Errorf("insert entry %s: %v", entry.FullPath, err)
|
||||||
@@ -236,6 +178,65 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, dirParts []string, level int, isFromOtherCluster bool) (err error) {
|
||||||
|
|
||||||
|
if level == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
dirPath := "/" + util.Join(dirParts[:level]...)
|
||||||
|
// fmt.Printf("%d directory: %+v\n", i, dirPath)
|
||||||
|
|
||||||
|
// check the store directly
|
||||||
|
glog.V(4).Infof("find uncached directory: %s", dirPath)
|
||||||
|
dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
|
||||||
|
|
||||||
|
// no such existing directory
|
||||||
|
if dirEntry == nil {
|
||||||
|
|
||||||
|
// ensure parent directory
|
||||||
|
if err = f.ensureParentDirecotryEntry(ctx, entry, dirParts, level-1, isFromOtherCluster); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// create the directory
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
dirEntry = &Entry{
|
||||||
|
FullPath: util.FullPath(dirPath),
|
||||||
|
Attr: Attr{
|
||||||
|
Mtime: now,
|
||||||
|
Crtime: now,
|
||||||
|
Mode: os.ModeDir | entry.Mode | 0110,
|
||||||
|
Uid: entry.Uid,
|
||||||
|
Gid: entry.Gid,
|
||||||
|
Collection: entry.Collection,
|
||||||
|
Replication: entry.Replication,
|
||||||
|
UserName: entry.UserName,
|
||||||
|
GroupNames: entry.GroupNames,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
|
||||||
|
mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
|
||||||
|
if mkdirErr != nil {
|
||||||
|
if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
|
||||||
|
glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
|
||||||
|
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
f.maybeAddBucket(dirEntry)
|
||||||
|
f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if !dirEntry.IsDirectory() {
|
||||||
|
glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
|
||||||
|
return fmt.Errorf("%s is a file", dirPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
|
func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
|
||||||
if oldEntry != nil {
|
if oldEntry != nil {
|
||||||
entry.Attr.Crtime = oldEntry.Attr.Crtime
|
entry.Attr.Crtime = oldEntry.Attr.Crtime
|
||||||
|
|||||||
Reference in New Issue
Block a user