filer: propagate lazy metadata deletes to remote mounts (#8522)
* filer: propagate lazy metadata deletes to remote mounts Delete operations now call the remote backend for mounted remote-only entries before removing filer metadata, keeping remote state aligned and preserving retry semantics on remote failures. Made-with: Cursor * filer: harden remote delete metadata recovery Persist remote-delete metadata pendings so local entry removal can be retried after failures, and return explicit errors when remote client resolution fails to prevent silent local-only deletes. Made-with: Cursor * filer: streamline remote delete client lookup and logging Avoid a redundant mount trie traversal by resolving the remote client directly from the matched mount location, and add parity logging for successful remote directory deletions. Made-with: Cursor * filer: harden pending remote metadata deletion flow Retry pending-marker writes before local delete, fail closed when marking cannot be persisted, and start remote pending reconciliation only after the filer store is initialised to avoid nil store access. Made-with: Cursor * filer: avoid lazy fetch in pending metadata reconciliation Use a local-only entry lookup during pending remote metadata reconciliation so cache misses do not trigger remote lazy fetches. Made-with: Cursor * filer: serialise concurrent index read-modify-write in pending metadata deletion Add remoteMetadataDeletionIndexMu to Filer and acquire it for the full read→mutate→commit sequence in markRemoteMetadataDeletionPending and clearRemoteMetadataDeletionPending, preventing concurrent goroutines from overwriting each other's index updates. Made-with: Cursor * filer: start remote deletion reconciliation loop in NewFiler Move the background goroutine for pending remote metadata deletion reconciliation from SetStore (where it was gated by sync.Once) to NewFiler alongside the existing loopProcessingDeletion goroutine. The sync.Once approach was problematic: it buried a goroutine launch as a side effect of a setter, was unrecoverable if the goroutine panicked, could race with store initialisation, and coupled its lifecycle to unrelated shutdown machinery. The existing nil-store guard in reconcilePendingRemoteMetadataDeletions handles the window before SetStore is called. * filer: skip remote delete for replicated deletes from other filers When isFromOtherCluster is true the delete was already propagated to the remote backend by the originating filer. Repeating the remote delete on every replica doubles API calls, and a transient remote failure on the replica would block local metadata cleanup — leaving filers inconsistent. * filer: skip pending marking for directory remote deletes Directory remote deletes are idempotent and do not need the pending/reconcile machinery that was designed for file deletes where the local metadata delete might fail after the remote object is already removed. * filer: propagate remote deletes for children in recursive folder deletion doBatchDeleteFolderMetaAndData iterated child files but only called NotifyUpdateEvent and collected chunks — it never called maybeDeleteFromRemote for individual children. This left orphaned objects in the remote backend when a directory containing remote-only files was recursively deleted. Also fix isFromOtherCluster being hardcoded to false in the recursive call to doBatchDeleteFolderMetaAndData for subdirectories. * filer: simplify pending remote deletion tracking to single index key Replace the double-bookkeeping scheme (individual KV entry per path + newline-delimited index key) with a single index key that stores paths directly. This removes the per-path KV writes/deletes, the base64 encoding round-trip, and the transaction overhead that was only needed to keep the two representations in sync. * filer: address review feedback on remote deletion flow - Distinguish missing remote config from client initialization failure in maybeDeleteFromRemote error messages. - Use a detached context (30s timeout) for pending-mark and pending-clear KV writes so they survive request cancellation after the remote object has already been deleted. - Emit NotifyUpdateEvent in reconcilePendingRemoteMetadataDeletions after a successful retry deletion so downstream watchers and replicas learn about the eventual metadata removal. * filer: remove background reconciliation for pending remote deletions The pending-mark/reconciliation machinery (KV index, mutex, background loop, detached contexts) handled the narrow case where the remote object was deleted but the subsequent local metadata delete failed. The client already receives the error and can retry — on retry the remote not-found is treated as success and the local delete proceeds normally. The added complexity (and new edge cases around NotifyUpdateEvent, multi-filer consistency during reconciliation, and context lifetime) is not justified for a transient store failure the caller already handles. Remove: loopProcessingRemoteMetadataDeletionPending, reconcilePendingRemoteMetadataDeletions, markRemoteMetadataDeletionPending, clearRemoteMetadataDeletionPending, listPendingRemoteMetadataDeletionPaths, encodePendingRemoteMetadataDeletionIndex, FindEntryLocal, and all associated constants, fields, and test infrastructure. * filer: fix test stubs and add early exit on child remote delete error - Refactor stubFilerStore to release lock before invoking callbacks and propagate callback errors, preventing potential deadlocks in tests - Implement ListDirectoryPrefixedEntries with proper prefix filtering instead of delegating to the unfiltered ListDirectoryEntries - Add continue after setting err on child remote delete failure in doBatchDeleteFolderMetaAndData to skip further processing of the failed entry * filer: propagate child remote delete error instead of silently continuing Replace `continue` with early `break` when maybeDeleteFromRemote fails for a child entry during recursive folder deletion. The previous `continue` skipped the error check at the end of the loop body, so a subsequent successful entry would overwrite err and the remote delete error was silently lost. Now the loop breaks, the existing error check returns the error, and NotifyUpdateEvent / chunk collection are correctly skipped for the failed entry. * filer: delete remote file when entry has Remote pointer, not only when remote-only Replace IsInRemoteOnly() guard with entry.Remote == nil check in maybeDeleteFromRemote. IsInRemoteOnly() requires zero local chunks and RemoteSize > 0, which incorrectly skips remote deletion for cached files (local chunks exist) and zero-byte remote objects (RemoteSize 0). The correct condition is whether the entry has a remote backing object at all. --------- Co-authored-by: Chris Lu <chris.lu@gmail.com>
This commit is contained in:
@@ -91,8 +91,19 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
|
|||||||
lastFileName = sub.Name()
|
lastFileName = sub.Name()
|
||||||
if sub.IsDirectory() {
|
if sub.IsDirectory() {
|
||||||
subIsDeletingBucket := f.IsBucket(sub)
|
subIsDeletingBucket := f.IsBucket(sub)
|
||||||
err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, subIsDeletingBucket, false, nil, onHardLinkIdsFn)
|
err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, subIsDeletingBucket, isFromOtherCluster, nil, onHardLinkIdsFn)
|
||||||
} else {
|
} else {
|
||||||
|
if !isFromOtherCluster {
|
||||||
|
if _, remoteErr := f.maybeDeleteFromRemote(ctx, sub); remoteErr != nil {
|
||||||
|
glog.Warningf("remote delete child %s: %v", sub.FullPath, remoteErr)
|
||||||
|
if !ignoreRecursiveError {
|
||||||
|
err = remoteErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil && !ignoreRecursiveError {
|
||||||
|
break
|
||||||
|
}
|
||||||
f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil)
|
f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil)
|
||||||
if len(sub.HardLinkId) != 0 {
|
if len(sub.HardLinkId) != 0 {
|
||||||
// hard link chunk data are deleted separately
|
// hard link chunk data are deleted separately
|
||||||
@@ -130,9 +141,16 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou
|
|||||||
|
|
||||||
glog.V(3).InfofCtx(ctx, "deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
|
glog.V(3).InfofCtx(ctx, "deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
|
||||||
|
|
||||||
|
if !isFromOtherCluster {
|
||||||
|
if _, remoteDeletionErr := f.maybeDeleteFromRemote(ctx, entry); remoteDeletionErr != nil {
|
||||||
|
return remoteDeletionErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil {
|
if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil {
|
||||||
return fmt.Errorf("filer store delete: %w", storeDeletionErr)
|
return fmt.Errorf("filer store delete: %w", storeDeletionErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !entry.IsDirectory() {
|
if !entry.IsDirectory() {
|
||||||
f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures)
|
f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -114,3 +114,49 @@ func (f *Filer) maybeLazyFetchFromRemote(ctx context.Context, p util.FullPath) (
|
|||||||
}
|
}
|
||||||
return result.entry, nil
|
return result.entry, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *Filer) maybeDeleteFromRemote(ctx context.Context, entry *Entry) (bool, error) {
|
||||||
|
if entry == nil || f.RemoteStorage == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
mountDir, remoteLoc := f.RemoteStorage.FindMountDirectory(entry.FullPath)
|
||||||
|
if remoteLoc == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
client, _, found := f.RemoteStorage.GetRemoteStorageClient(remoteLoc.Name)
|
||||||
|
if !found {
|
||||||
|
return false, fmt.Errorf("resolve remote storage client for %s: not found", entry.FullPath)
|
||||||
|
}
|
||||||
|
if client == nil {
|
||||||
|
return false, fmt.Errorf("resolve remote storage client for %s: initialization failed", entry.FullPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
objectLoc := MapFullPathToRemoteStorageLocation(mountDir, remoteLoc, entry.FullPath)
|
||||||
|
|
||||||
|
if entry.IsDirectory() {
|
||||||
|
if err := client.RemoveDirectory(objectLoc); err != nil {
|
||||||
|
if errors.Is(err, remote_storage.ErrRemoteObjectNotFound) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, fmt.Errorf("remove remote directory %s: %w", entry.FullPath, err)
|
||||||
|
}
|
||||||
|
glog.V(3).InfofCtx(ctx, "maybeDeleteFromRemote: deleted directory %s from remote", entry.FullPath)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if entry.Remote == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := client.DeleteFile(objectLoc); err != nil {
|
||||||
|
if errors.Is(err, remote_storage.ErrRemoteObjectNotFound) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, fmt.Errorf("delete remote file %s: %w", entry.FullPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(3).InfofCtx(ctx, "maybeDeleteFromRemote: deleted %s from remote", entry.FullPath)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -5,6 +5,9 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"os"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@@ -28,11 +31,17 @@ import (
|
|||||||
type stubFilerStore struct {
|
type stubFilerStore struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
entries map[string]*Entry
|
entries map[string]*Entry
|
||||||
|
kv map[string][]byte
|
||||||
insertErr error
|
insertErr error
|
||||||
|
deleteErrByPath map[string]error
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStubFilerStore() *stubFilerStore {
|
func newStubFilerStore() *stubFilerStore {
|
||||||
return &stubFilerStore{entries: make(map[string]*Entry)}
|
return &stubFilerStore{
|
||||||
|
entries: make(map[string]*Entry),
|
||||||
|
kv: make(map[string][]byte),
|
||||||
|
deleteErrByPath: make(map[string]error),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stubFilerStore) GetName() string { return "stub" }
|
func (s *stubFilerStore) GetName() string { return "stub" }
|
||||||
@@ -43,17 +52,105 @@ func (s *stubFilerStore) BeginTransaction(ctx context.Context) (context.Context,
|
|||||||
}
|
}
|
||||||
func (s *stubFilerStore) CommitTransaction(context.Context) error { return nil }
|
func (s *stubFilerStore) CommitTransaction(context.Context) error { return nil }
|
||||||
func (s *stubFilerStore) RollbackTransaction(context.Context) error { return nil }
|
func (s *stubFilerStore) RollbackTransaction(context.Context) error { return nil }
|
||||||
func (s *stubFilerStore) KvPut(context.Context, []byte, []byte) error { return nil }
|
func (s *stubFilerStore) KvPut(_ context.Context, key []byte, value []byte) error {
|
||||||
func (s *stubFilerStore) KvGet(context.Context, []byte) ([]byte, error) {
|
s.mu.Lock()
|
||||||
return nil, ErrKvNotFound
|
defer s.mu.Unlock()
|
||||||
|
s.kv[string(key)] = append([]byte(nil), value...)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
func (s *stubFilerStore) KvDelete(context.Context, []byte) error { return nil }
|
func (s *stubFilerStore) KvGet(_ context.Context, key []byte) ([]byte, error) {
|
||||||
func (s *stubFilerStore) DeleteFolderChildren(context.Context, util.FullPath) error { return nil }
|
s.mu.Lock()
|
||||||
func (s *stubFilerStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {
|
defer s.mu.Unlock()
|
||||||
return "", nil
|
value, found := s.kv[string(key)]
|
||||||
|
if !found {
|
||||||
|
return nil, ErrKvNotFound
|
||||||
|
}
|
||||||
|
return append([]byte(nil), value...), nil
|
||||||
}
|
}
|
||||||
func (s *stubFilerStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) {
|
func (s *stubFilerStore) KvDelete(_ context.Context, key []byte) error {
|
||||||
return "", nil
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
delete(s.kv, string(key))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (s *stubFilerStore) DeleteFolderChildren(_ context.Context, dirPath util.FullPath) error {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
prefix := string(dirPath) + "/"
|
||||||
|
for k := range s.entries {
|
||||||
|
if strings.HasPrefix(k, prefix) {
|
||||||
|
delete(s.entries, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (s *stubFilerStore) listDirectoryChildNames(dirPath util.FullPath, startFileName string, includeStartFile bool, namePrefix string) []string {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
dirPrefix := string(dirPath) + "/"
|
||||||
|
var names []string
|
||||||
|
for k := range s.entries {
|
||||||
|
if !strings.HasPrefix(k, dirPrefix) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rest := k[len(dirPrefix):]
|
||||||
|
if strings.Contains(rest, "/") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if namePrefix != "" && !strings.HasPrefix(rest, namePrefix) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if rest > startFileName || (includeStartFile && rest == startFileName) {
|
||||||
|
names = append(names, rest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sort.Strings(names)
|
||||||
|
return names
|
||||||
|
}
|
||||||
|
func (s *stubFilerStore) getEntry(path string) *Entry {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
return s.entries[path]
|
||||||
|
}
|
||||||
|
func (s *stubFilerStore) ListDirectoryEntries(_ context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {
|
||||||
|
names := s.listDirectoryChildNames(dirPath, startFileName, includeStartFile, "")
|
||||||
|
dirPrefix := string(dirPath) + "/"
|
||||||
|
lastFileName := ""
|
||||||
|
for i, name := range names {
|
||||||
|
if int64(i) >= limit {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
entry := s.getEntry(dirPrefix + name)
|
||||||
|
cont, err := eachEntryFunc(entry)
|
||||||
|
if err != nil {
|
||||||
|
return lastFileName, err
|
||||||
|
}
|
||||||
|
lastFileName = name
|
||||||
|
if !cont {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return lastFileName, nil
|
||||||
|
}
|
||||||
|
func (s *stubFilerStore) ListDirectoryPrefixedEntries(_ context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) {
|
||||||
|
names := s.listDirectoryChildNames(dirPath, startFileName, includeStartFile, prefix)
|
||||||
|
dirPrefix := string(dirPath) + "/"
|
||||||
|
lastFileName := ""
|
||||||
|
for i, name := range names {
|
||||||
|
if int64(i) >= limit {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
entry := s.getEntry(dirPrefix + name)
|
||||||
|
cont, err := eachEntryFunc(entry)
|
||||||
|
if err != nil {
|
||||||
|
return lastFileName, err
|
||||||
|
}
|
||||||
|
lastFileName = name
|
||||||
|
if !cont {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return lastFileName, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stubFilerStore) InsertEntry(_ context.Context, entry *Entry) error {
|
func (s *stubFilerStore) InsertEntry(_ context.Context, entry *Entry) error {
|
||||||
@@ -85,6 +182,9 @@ func (s *stubFilerStore) FindEntry(_ context.Context, p util.FullPath) (*Entry,
|
|||||||
func (s *stubFilerStore) DeleteEntry(_ context.Context, p util.FullPath) error {
|
func (s *stubFilerStore) DeleteEntry(_ context.Context, p util.FullPath) error {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
if deleteErr, found := s.deleteErrByPath[string(p)]; found && deleteErr != nil {
|
||||||
|
return deleteErr
|
||||||
|
}
|
||||||
delete(s.entries, string(p))
|
delete(s.entries, string(p))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -94,6 +194,11 @@ func (s *stubFilerStore) DeleteEntry(_ context.Context, p util.FullPath) error {
|
|||||||
type stubRemoteClient struct {
|
type stubRemoteClient struct {
|
||||||
statResult *filer_pb.RemoteEntry
|
statResult *filer_pb.RemoteEntry
|
||||||
statErr error
|
statErr error
|
||||||
|
deleteErr error
|
||||||
|
removeErr error
|
||||||
|
|
||||||
|
deleteCalls []*remote_pb.RemoteStorageLocation
|
||||||
|
removeCalls []*remote_pb.RemoteStorageLocation
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *stubRemoteClient) StatFile(*remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
|
func (c *stubRemoteClient) StatFile(*remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
|
||||||
@@ -108,14 +213,28 @@ func (c *stubRemoteClient) ReadFile(*remote_pb.RemoteStorageLocation, int64, int
|
|||||||
func (c *stubRemoteClient) WriteDirectory(*remote_pb.RemoteStorageLocation, *filer_pb.Entry) error {
|
func (c *stubRemoteClient) WriteDirectory(*remote_pb.RemoteStorageLocation, *filer_pb.Entry) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (c *stubRemoteClient) RemoveDirectory(*remote_pb.RemoteStorageLocation) error { return nil }
|
func (c *stubRemoteClient) RemoveDirectory(loc *remote_pb.RemoteStorageLocation) error {
|
||||||
|
c.removeCalls = append(c.removeCalls, &remote_pb.RemoteStorageLocation{
|
||||||
|
Name: loc.Name,
|
||||||
|
Bucket: loc.Bucket,
|
||||||
|
Path: loc.Path,
|
||||||
|
})
|
||||||
|
return c.removeErr
|
||||||
|
}
|
||||||
func (c *stubRemoteClient) WriteFile(*remote_pb.RemoteStorageLocation, *filer_pb.Entry, io.Reader) (*filer_pb.RemoteEntry, error) {
|
func (c *stubRemoteClient) WriteFile(*remote_pb.RemoteStorageLocation, *filer_pb.Entry, io.Reader) (*filer_pb.RemoteEntry, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
func (c *stubRemoteClient) UpdateFileMetadata(*remote_pb.RemoteStorageLocation, *filer_pb.Entry, *filer_pb.Entry) error {
|
func (c *stubRemoteClient) UpdateFileMetadata(*remote_pb.RemoteStorageLocation, *filer_pb.Entry, *filer_pb.Entry) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (c *stubRemoteClient) DeleteFile(*remote_pb.RemoteStorageLocation) error { return nil }
|
func (c *stubRemoteClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) error {
|
||||||
|
c.deleteCalls = append(c.deleteCalls, &remote_pb.RemoteStorageLocation{
|
||||||
|
Name: loc.Name,
|
||||||
|
Bucket: loc.Bucket,
|
||||||
|
Path: loc.Path,
|
||||||
|
})
|
||||||
|
return c.deleteErr
|
||||||
|
}
|
||||||
func (c *stubRemoteClient) ListBuckets() ([]*remote_storage.Bucket, error) { return nil, nil }
|
func (c *stubRemoteClient) ListBuckets() ([]*remote_storage.Bucket, error) { return nil, nil }
|
||||||
func (c *stubRemoteClient) CreateBucket(string) error { return nil }
|
func (c *stubRemoteClient) CreateBucket(string) error { return nil }
|
||||||
func (c *stubRemoteClient) DeleteBucket(string) error { return nil }
|
func (c *stubRemoteClient) DeleteBucket(string) error { return nil }
|
||||||
@@ -144,9 +263,11 @@ func newTestFiler(t *testing.T, store *stubFilerStore, rs *FilerRemoteStorage) *
|
|||||||
f := &Filer{
|
f := &Filer{
|
||||||
RemoteStorage: rs,
|
RemoteStorage: rs,
|
||||||
Store: NewFilerStoreWrapper(store),
|
Store: NewFilerStoreWrapper(store),
|
||||||
|
FilerConf: NewFilerConf(),
|
||||||
MaxFilenameLength: 255,
|
MaxFilenameLength: 255,
|
||||||
MasterClient: mc,
|
MasterClient: mc,
|
||||||
fileIdDeletionQueue: util.NewUnboundedQueue(),
|
fileIdDeletionQueue: util.NewUnboundedQueue(),
|
||||||
|
deletionQuit: make(chan struct{}),
|
||||||
LocalMetaLogBuffer: log_buffer.NewLogBuffer("test", time.Minute,
|
LocalMetaLogBuffer: log_buffer.NewLogBuffer("test", time.Minute,
|
||||||
func(*log_buffer.LogBuffer, time.Time, time.Time, []byte, int64, int64) {}, nil, func() {}),
|
func(*log_buffer.LogBuffer, time.Time, time.Time, []byte, int64, int64) {}, nil, func() {}),
|
||||||
}
|
}
|
||||||
@@ -368,3 +489,342 @@ func TestFindEntry_LazyFetchOnMiss(t *testing.T) {
|
|||||||
require.NotNil(t, entry2)
|
require.NotNil(t, entry2)
|
||||||
assert.Equal(t, uint64(999), entry2.FileSize)
|
assert.Equal(t, uint64(999), entry2.FileSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDeleteEntryMetaAndData_RemoteOnlyFileDeletesRemoteAndMetadata(t *testing.T) {
|
||||||
|
const storageType = "stub_lazy_delete_file"
|
||||||
|
stub := &stubRemoteClient{}
|
||||||
|
defer registerStubMaker(t, storageType, stub)()
|
||||||
|
|
||||||
|
conf := &remote_pb.RemoteConf{Name: "deletestore", Type: storageType}
|
||||||
|
rs := NewFilerRemoteStorage()
|
||||||
|
rs.storageNameToConf[conf.Name] = conf
|
||||||
|
rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{
|
||||||
|
Name: "deletestore",
|
||||||
|
Bucket: "mybucket",
|
||||||
|
Path: "/",
|
||||||
|
})
|
||||||
|
|
||||||
|
store := newStubFilerStore()
|
||||||
|
filePath := util.FullPath("/buckets/mybucket/file.txt")
|
||||||
|
store.entries[string(filePath)] = &Entry{
|
||||||
|
FullPath: filePath,
|
||||||
|
Attr: Attr{
|
||||||
|
Mtime: time.Unix(1700000000, 0),
|
||||||
|
Crtime: time.Unix(1700000000, 0),
|
||||||
|
Mode: 0644,
|
||||||
|
FileSize: 64,
|
||||||
|
},
|
||||||
|
Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 64},
|
||||||
|
}
|
||||||
|
f := newTestFiler(t, store, rs)
|
||||||
|
|
||||||
|
err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, false, nil, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, findErr := store.FindEntry(context.Background(), filePath)
|
||||||
|
require.ErrorIs(t, findErr, filer_pb.ErrNotFound)
|
||||||
|
require.Len(t, stub.deleteCalls, 1)
|
||||||
|
assert.Equal(t, "deletestore", stub.deleteCalls[0].Name)
|
||||||
|
assert.Equal(t, "mybucket", stub.deleteCalls[0].Bucket)
|
||||||
|
assert.Equal(t, "/file.txt", stub.deleteCalls[0].Path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteEntryMetaAndData_IsFromOtherClusterSkipsRemoteDelete(t *testing.T) {
|
||||||
|
const storageType = "stub_lazy_delete_other_cluster"
|
||||||
|
stub := &stubRemoteClient{}
|
||||||
|
defer registerStubMaker(t, storageType, stub)()
|
||||||
|
|
||||||
|
conf := &remote_pb.RemoteConf{Name: "othercluster", Type: storageType}
|
||||||
|
rs := NewFilerRemoteStorage()
|
||||||
|
rs.storageNameToConf[conf.Name] = conf
|
||||||
|
rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{
|
||||||
|
Name: "othercluster",
|
||||||
|
Bucket: "mybucket",
|
||||||
|
Path: "/",
|
||||||
|
})
|
||||||
|
|
||||||
|
store := newStubFilerStore()
|
||||||
|
filePath := util.FullPath("/buckets/mybucket/replicated.txt")
|
||||||
|
store.entries[string(filePath)] = &Entry{
|
||||||
|
FullPath: filePath,
|
||||||
|
Attr: Attr{
|
||||||
|
Mtime: time.Unix(1700000000, 0),
|
||||||
|
Crtime: time.Unix(1700000000, 0),
|
||||||
|
Mode: 0644,
|
||||||
|
FileSize: 64,
|
||||||
|
},
|
||||||
|
Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 64},
|
||||||
|
}
|
||||||
|
f := newTestFiler(t, store, rs)
|
||||||
|
|
||||||
|
// isFromOtherCluster=true simulates a replicated delete from another filer
|
||||||
|
err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, true, nil, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Local metadata should be deleted
|
||||||
|
_, findErr := store.FindEntry(context.Background(), filePath)
|
||||||
|
require.ErrorIs(t, findErr, filer_pb.ErrNotFound)
|
||||||
|
// Remote should NOT have been called — the originating filer handles that
|
||||||
|
require.Len(t, stub.deleteCalls, 0)
|
||||||
|
require.Len(t, stub.removeCalls, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteEntryMetaAndData_RemoteOnlyFileNotUnderMountSkipsRemoteDelete(t *testing.T) {
|
||||||
|
const storageType = "stub_lazy_delete_not_under_mount"
|
||||||
|
stub := &stubRemoteClient{}
|
||||||
|
defer registerStubMaker(t, storageType, stub)()
|
||||||
|
|
||||||
|
conf := &remote_pb.RemoteConf{Name: "notundermount", Type: storageType}
|
||||||
|
rs := NewFilerRemoteStorage()
|
||||||
|
rs.storageNameToConf[conf.Name] = conf
|
||||||
|
rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{
|
||||||
|
Name: "notundermount",
|
||||||
|
Bucket: "mybucket",
|
||||||
|
Path: "/",
|
||||||
|
})
|
||||||
|
|
||||||
|
store := newStubFilerStore()
|
||||||
|
filePath := util.FullPath("/no/mount/file.txt")
|
||||||
|
store.entries[string(filePath)] = &Entry{
|
||||||
|
FullPath: filePath,
|
||||||
|
Attr: Attr{
|
||||||
|
Mtime: time.Unix(1700000000, 0),
|
||||||
|
Crtime: time.Unix(1700000000, 0),
|
||||||
|
Mode: 0644,
|
||||||
|
FileSize: 99,
|
||||||
|
},
|
||||||
|
Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 99},
|
||||||
|
}
|
||||||
|
f := newTestFiler(t, store, rs)
|
||||||
|
|
||||||
|
err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, false, nil, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, stub.deleteCalls, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteEntryMetaAndData_RemoteMountWithoutClientResolutionKeepsMetadata(t *testing.T) {
|
||||||
|
rs := NewFilerRemoteStorage()
|
||||||
|
rs.storageNameToConf["missingclient"] = &remote_pb.RemoteConf{Name: "missingclient", Type: "stub_missing_client"}
|
||||||
|
rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{
|
||||||
|
Name: "missingclient",
|
||||||
|
Bucket: "mybucket",
|
||||||
|
Path: "/",
|
||||||
|
})
|
||||||
|
|
||||||
|
store := newStubFilerStore()
|
||||||
|
filePath := util.FullPath("/buckets/mybucket/no-client.txt")
|
||||||
|
store.entries[string(filePath)] = &Entry{
|
||||||
|
FullPath: filePath,
|
||||||
|
Attr: Attr{
|
||||||
|
Mtime: time.Unix(1700000000, 0),
|
||||||
|
Crtime: time.Unix(1700000000, 0),
|
||||||
|
Mode: 0644,
|
||||||
|
FileSize: 51,
|
||||||
|
},
|
||||||
|
Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 51},
|
||||||
|
}
|
||||||
|
f := newTestFiler(t, store, rs)
|
||||||
|
|
||||||
|
err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, false, nil, 0)
|
||||||
|
require.Error(t, err)
|
||||||
|
require.ErrorContains(t, err, "resolve remote storage client")
|
||||||
|
require.ErrorContains(t, err, string(filePath))
|
||||||
|
|
||||||
|
stored, findErr := store.FindEntry(context.Background(), filePath)
|
||||||
|
require.NoError(t, findErr)
|
||||||
|
require.NotNil(t, stored)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteEntryMetaAndData_LocalDeleteFailurePreservesMetadata(t *testing.T) {
|
||||||
|
const storageType = "stub_lazy_delete_local_fail"
|
||||||
|
stub := &stubRemoteClient{}
|
||||||
|
defer registerStubMaker(t, storageType, stub)()
|
||||||
|
|
||||||
|
conf := &remote_pb.RemoteConf{Name: "localfail", Type: storageType}
|
||||||
|
rs := NewFilerRemoteStorage()
|
||||||
|
rs.storageNameToConf[conf.Name] = conf
|
||||||
|
rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{
|
||||||
|
Name: "localfail",
|
||||||
|
Bucket: "mybucket",
|
||||||
|
Path: "/",
|
||||||
|
})
|
||||||
|
|
||||||
|
store := newStubFilerStore()
|
||||||
|
filePath := util.FullPath("/buckets/mybucket/localfail.txt")
|
||||||
|
store.entries[string(filePath)] = &Entry{
|
||||||
|
FullPath: filePath,
|
||||||
|
Attr: Attr{
|
||||||
|
Mtime: time.Unix(1700000000, 0),
|
||||||
|
Crtime: time.Unix(1700000000, 0),
|
||||||
|
Mode: 0644,
|
||||||
|
FileSize: 80,
|
||||||
|
},
|
||||||
|
Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 80},
|
||||||
|
}
|
||||||
|
store.deleteErrByPath[string(filePath)] = errors.New("simulated local delete failure")
|
||||||
|
f := newTestFiler(t, store, rs)
|
||||||
|
|
||||||
|
err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, false, nil, 0)
|
||||||
|
require.Error(t, err)
|
||||||
|
require.ErrorContains(t, err, "filer store delete")
|
||||||
|
require.Len(t, stub.deleteCalls, 1)
|
||||||
|
|
||||||
|
// Local metadata should still exist since local delete failed
|
||||||
|
stored, findErr := store.FindEntry(context.Background(), filePath)
|
||||||
|
require.NoError(t, findErr)
|
||||||
|
require.NotNil(t, stored)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteEntryMetaAndData_RemoteDeleteNotFoundStillDeletesMetadata(t *testing.T) {
|
||||||
|
const storageType = "stub_lazy_delete_notfound"
|
||||||
|
stub := &stubRemoteClient{deleteErr: remote_storage.ErrRemoteObjectNotFound}
|
||||||
|
defer registerStubMaker(t, storageType, stub)()
|
||||||
|
|
||||||
|
conf := &remote_pb.RemoteConf{Name: "deletenotfound", Type: storageType}
|
||||||
|
rs := NewFilerRemoteStorage()
|
||||||
|
rs.storageNameToConf[conf.Name] = conf
|
||||||
|
rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{
|
||||||
|
Name: "deletenotfound",
|
||||||
|
Bucket: "mybucket",
|
||||||
|
Path: "/",
|
||||||
|
})
|
||||||
|
|
||||||
|
store := newStubFilerStore()
|
||||||
|
filePath := util.FullPath("/buckets/mybucket/notfound.txt")
|
||||||
|
store.entries[string(filePath)] = &Entry{
|
||||||
|
FullPath: filePath,
|
||||||
|
Attr: Attr{
|
||||||
|
Mtime: time.Unix(1700000000, 0),
|
||||||
|
Crtime: time.Unix(1700000000, 0),
|
||||||
|
Mode: 0644,
|
||||||
|
FileSize: 23,
|
||||||
|
},
|
||||||
|
Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 23},
|
||||||
|
}
|
||||||
|
f := newTestFiler(t, store, rs)
|
||||||
|
|
||||||
|
err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, false, nil, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, findErr := store.FindEntry(context.Background(), filePath)
|
||||||
|
require.ErrorIs(t, findErr, filer_pb.ErrNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteEntryMetaAndData_RemoteDeleteErrorKeepsMetadata(t *testing.T) {
|
||||||
|
const storageType = "stub_lazy_delete_error"
|
||||||
|
stub := &stubRemoteClient{deleteErr: errors.New("remote delete failed")}
|
||||||
|
defer registerStubMaker(t, storageType, stub)()
|
||||||
|
|
||||||
|
conf := &remote_pb.RemoteConf{Name: "deleteerr", Type: storageType}
|
||||||
|
rs := NewFilerRemoteStorage()
|
||||||
|
rs.storageNameToConf[conf.Name] = conf
|
||||||
|
rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{
|
||||||
|
Name: "deleteerr",
|
||||||
|
Bucket: "mybucket",
|
||||||
|
Path: "/",
|
||||||
|
})
|
||||||
|
|
||||||
|
store := newStubFilerStore()
|
||||||
|
filePath := util.FullPath("/buckets/mybucket/error.txt")
|
||||||
|
store.entries[string(filePath)] = &Entry{
|
||||||
|
FullPath: filePath,
|
||||||
|
Attr: Attr{
|
||||||
|
Mtime: time.Unix(1700000000, 0),
|
||||||
|
Crtime: time.Unix(1700000000, 0),
|
||||||
|
Mode: 0644,
|
||||||
|
FileSize: 77,
|
||||||
|
},
|
||||||
|
Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 77},
|
||||||
|
}
|
||||||
|
f := newTestFiler(t, store, rs)
|
||||||
|
|
||||||
|
err := f.DeleteEntryMetaAndData(context.Background(), filePath, false, false, false, false, nil, 0)
|
||||||
|
require.Error(t, err)
|
||||||
|
stored, findErr := store.FindEntry(context.Background(), filePath)
|
||||||
|
require.NoError(t, findErr)
|
||||||
|
require.NotNil(t, stored)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteEntryMetaAndData_DirectoryUnderMountDeletesRemoteDirectory(t *testing.T) {
|
||||||
|
const storageType = "stub_lazy_delete_dir"
|
||||||
|
stub := &stubRemoteClient{}
|
||||||
|
defer registerStubMaker(t, storageType, stub)()
|
||||||
|
|
||||||
|
conf := &remote_pb.RemoteConf{Name: "dirstore", Type: storageType}
|
||||||
|
rs := NewFilerRemoteStorage()
|
||||||
|
rs.storageNameToConf[conf.Name] = conf
|
||||||
|
rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{
|
||||||
|
Name: "dirstore",
|
||||||
|
Bucket: "mybucket",
|
||||||
|
Path: "/",
|
||||||
|
})
|
||||||
|
|
||||||
|
store := newStubFilerStore()
|
||||||
|
dirPath := util.FullPath("/buckets/mybucket/dir")
|
||||||
|
store.entries[string(dirPath)] = &Entry{
|
||||||
|
FullPath: dirPath,
|
||||||
|
Attr: Attr{
|
||||||
|
Mtime: time.Unix(1700000000, 0),
|
||||||
|
Crtime: time.Unix(1700000000, 0),
|
||||||
|
Mode: os.ModeDir | 0755,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
f := newTestFiler(t, store, rs)
|
||||||
|
|
||||||
|
err := f.doDeleteEntryMetaAndData(context.Background(), store.entries[string(dirPath)], false, false, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Len(t, stub.removeCalls, 1)
|
||||||
|
assert.Equal(t, "dirstore", stub.removeCalls[0].Name)
|
||||||
|
assert.Equal(t, "mybucket", stub.removeCalls[0].Bucket)
|
||||||
|
assert.Equal(t, "/dir", stub.removeCalls[0].Path)
|
||||||
|
_, findErr := store.FindEntry(context.Background(), dirPath)
|
||||||
|
require.ErrorIs(t, findErr, filer_pb.ErrNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteEntryMetaAndData_RecursiveFolderDeleteRemotesChildren(t *testing.T) {
|
||||||
|
const storageType = "stub_lazy_delete_folder_children"
|
||||||
|
stub := &stubRemoteClient{}
|
||||||
|
defer registerStubMaker(t, storageType, stub)()
|
||||||
|
|
||||||
|
conf := &remote_pb.RemoteConf{Name: "childstore", Type: storageType}
|
||||||
|
rs := NewFilerRemoteStorage()
|
||||||
|
rs.storageNameToConf[conf.Name] = conf
|
||||||
|
rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{
|
||||||
|
Name: "childstore",
|
||||||
|
Bucket: "mybucket",
|
||||||
|
Path: "/",
|
||||||
|
})
|
||||||
|
|
||||||
|
store := newStubFilerStore()
|
||||||
|
dirPath := util.FullPath("/buckets/mybucket/subdir")
|
||||||
|
store.entries[string(dirPath)] = &Entry{
|
||||||
|
FullPath: dirPath,
|
||||||
|
Attr: Attr{
|
||||||
|
Mtime: time.Unix(1700000000, 0),
|
||||||
|
Crtime: time.Unix(1700000000, 0),
|
||||||
|
Mode: os.ModeDir | 0755,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
childPath := util.FullPath("/buckets/mybucket/subdir/child.txt")
|
||||||
|
store.entries[string(childPath)] = &Entry{
|
||||||
|
FullPath: childPath,
|
||||||
|
Attr: Attr{
|
||||||
|
Mtime: time.Unix(1700000000, 0),
|
||||||
|
Crtime: time.Unix(1700000000, 0),
|
||||||
|
Mode: 0644,
|
||||||
|
FileSize: 50,
|
||||||
|
},
|
||||||
|
Remote: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 50},
|
||||||
|
}
|
||||||
|
f := newTestFiler(t, store, rs)
|
||||||
|
|
||||||
|
err := f.DeleteEntryMetaAndData(context.Background(), dirPath, true, false, false, false, nil, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Child file should have been deleted from remote
|
||||||
|
require.Len(t, stub.deleteCalls, 1)
|
||||||
|
assert.Equal(t, "/subdir/child.txt", stub.deleteCalls[0].Path)
|
||||||
|
// Directory itself should also have been deleted from remote
|
||||||
|
require.Len(t, stub.removeCalls, 1)
|
||||||
|
assert.Equal(t, "/subdir", stub.removeCalls[0].Path)
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user