rename: delete source entry metadata only, skipping hard links
This commit is contained in:
@@ -62,7 +62,7 @@ func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) erro
|
||||
return mc.localStore.InsertEntry(ctx, entry)
|
||||
}
|
||||
|
||||
func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
|
||||
func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry, shouldDeleteChunks bool) error {
|
||||
//mc.Lock()
|
||||
//defer mc.Unlock()
|
||||
|
||||
@@ -74,8 +74,14 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti
|
||||
// leave the update to the following InsertEntry operation
|
||||
} else {
|
||||
glog.V(3).Infof("DeleteEntry %s", oldPath)
|
||||
if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
|
||||
return err
|
||||
if shouldDeleteChunks {
|
||||
if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := mc.localStore.DeleteOneEntrySkipHardlink(ctx, oldPath); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -112,6 +118,12 @@ func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *fi
|
||||
return
|
||||
}
|
||||
|
||||
func (mc *MetaCache) DeleteEntrySkipHardlink(ctx context.Context, fp util.FullPath) (err error) {
|
||||
//mc.Lock()
|
||||
//defer mc.Unlock()
|
||||
return mc.localStore.DeleteOneEntrySkipHardlink(ctx, fp)
|
||||
}
|
||||
|
||||
func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
|
||||
//mc.Lock()
|
||||
//defer mc.Unlock()
|
||||
|
||||
@@ -36,7 +36,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
|
||||
glog.V(4).Infof("creating %v", key)
|
||||
newEntry = filer.FromPbEntry(dir, message.NewEntry)
|
||||
}
|
||||
err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
|
||||
err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry, message.DeleteChunks)
|
||||
if err == nil {
|
||||
if message.OldEntry != nil && message.NewEntry != nil {
|
||||
oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
|
||||
|
||||
@@ -217,7 +217,7 @@ func (wfs *WFS) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamR
|
||||
if resp.EventNotification.NewEntry != nil {
|
||||
// with new entry, the old entry name also exists. This is the first step to create new entry
|
||||
newEntry := filer.FromPbEntry(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry)
|
||||
if err := wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, "", newEntry); err != nil {
|
||||
if err := wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, "", newEntry, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -231,7 +231,7 @@ func (wfs *WFS) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamR
|
||||
|
||||
} else if resp.EventNotification.OldEntry != nil {
|
||||
// without new entry, only old entry name exists. This is the second step to delete old entry
|
||||
if err := wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil); err != nil {
|
||||
if err := wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil, resp.EventNotification.DeleteChunks); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user