From d5128f00f1863246cfd3cbc9d956ca8bd5ee96bb Mon Sep 17 00:00:00 2001 From: os-pradipbabar Date: Sat, 4 Apr 2026 04:52:46 +0530 Subject: [PATCH] fix: Prevent orphaned metadata from cancelled S3 operations (Issue #8908) (#8909) fix(filer): check if context was already cancelled before ignoring cancellation --- weed/filer/filerstore_wrapper.go | 45 +++++ weed/filer/filerstore_wrapper_test.go | 237 ++++++++++++++++++++++++++ 2 files changed, 282 insertions(+) diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index 0c8d692f2..f7b969453 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -128,6 +128,11 @@ func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefi } func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error { + // Check if context was already cancelled before ignoring cancellation + // This prevents writing metadata for operations that have already failed + if ctx.Err() != nil { + return ctx.Err() + } ctx = context.WithoutCancel(ctx) actualStore := fsw.getActualStore(entry.FullPath) stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "insert").Inc() @@ -155,6 +160,10 @@ func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) err // InsertEntryKnownAbsent skips the pre-insert FindEntry path when the caller has // already established that the target path does not exist. func (fsw *FilerStoreWrapper) InsertEntryKnownAbsent(ctx context.Context, entry *Entry) error { + + if ctx.Err() != nil { + return ctx.Err() + } ctx = context.WithoutCancel(ctx) actualStore := fsw.getActualStore(entry.FullPath) stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "insert").Inc() @@ -178,6 +187,10 @@ func (fsw *FilerStoreWrapper) InsertEntryKnownAbsent(ctx context.Context, entry } func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error { + + if ctx.Err() != nil { + return ctx.Err() + } ctx = context.WithoutCancel(ctx) actualStore := fsw.getActualStore(entry.FullPath) stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "update").Inc() @@ -236,6 +249,10 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) ( } func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { + + if ctx.Err() != nil { + return ctx.Err() + } ctx = context.WithoutCancel(ctx) actualStore := fsw.getActualStore(fp) stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc() @@ -264,6 +281,10 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) } func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) { + + if ctx.Err() != nil { + return ctx.Err() + } ctx = context.WithoutCancel(ctx) actualStore := fsw.getActualStore(existingEntry.FullPath) stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc() @@ -288,6 +309,10 @@ func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry } func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { + + if ctx.Err() != nil { + return ctx.Err() + } ctx = context.WithoutCancel(ctx) actualStore := fsw.getActualStore(fp + "/") stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc() @@ -394,16 +419,28 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u } func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) { + + if ctx.Err() != nil { + return nil, ctx.Err() + } ctx = context.WithoutCancel(ctx) return fsw.getDefaultStore().BeginTransaction(ctx) } func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error { + + if ctx.Err() != nil { + return ctx.Err() + } ctx = context.WithoutCancel(ctx) return fsw.getDefaultStore().CommitTransaction(ctx) } func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error { + + if ctx.Err() != nil { + return ctx.Err() + } ctx = context.WithoutCancel(ctx) return fsw.getDefaultStore().RollbackTransaction(ctx) } @@ -413,6 +450,10 @@ func (fsw *FilerStoreWrapper) Shutdown() { } func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + + if ctx.Err() != nil { + return ctx.Err() + } ctx = context.WithoutCancel(ctx) return fsw.getDefaultStore().KvPut(ctx, key, value) } @@ -421,6 +462,10 @@ func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []by return fsw.getDefaultStore().KvGet(ctx, key) } func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) { + + if ctx.Err() != nil { + return ctx.Err() + } ctx = context.WithoutCancel(ctx) return fsw.getDefaultStore().KvDelete(ctx, key) } diff --git a/weed/filer/filerstore_wrapper_test.go b/weed/filer/filerstore_wrapper_test.go index c3ef32018..0474e7e10 100644 --- a/weed/filer/filerstore_wrapper_test.go +++ b/weed/filer/filerstore_wrapper_test.go @@ -2,6 +2,7 @@ package filer import ( "context" + "errors" "os" "testing" @@ -69,3 +70,239 @@ func TestFilerStoreWrapperMimeNormalization(t *testing.T) { } } } + +// TestFilerStoreWrapperContextCancellation verifies that write operations +// respect context cancellation to prevent orphaned metadata +func TestFilerStoreWrapperContextCancellation(t *testing.T) { + tests := []struct { + name string + cancelContext bool + expectError bool + isWriteOp bool + run func(*FilerStoreWrapper, context.Context, *Entry) error + }{ + { + name: "InsertEntry with cancelled context fails", + cancelContext: true, + expectError: true, + isWriteOp: true, + run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { + return fsw.InsertEntry(ctx, entry) + }, + }, + { + name: "InsertEntry with active context succeeds", + cancelContext: false, + expectError: false, + isWriteOp: true, + run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { + return fsw.InsertEntry(ctx, entry) + }, + }, + { + name: "InsertEntryKnownAbsent with cancelled context fails", + cancelContext: true, + expectError: true, + isWriteOp: true, + run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { + return fsw.InsertEntryKnownAbsent(ctx, entry) + }, + }, + { + name: "UpdateEntry with cancelled context fails", + cancelContext: true, + expectError: true, + isWriteOp: true, + run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { + _ = fsw.InsertEntry(context.Background(), entry) + entry.Attr.Mime = "updated" + return fsw.UpdateEntry(ctx, entry) + }, + }, + { + name: "DeleteEntry with cancelled context fails", + cancelContext: true, + expectError: true, + isWriteOp: true, + run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { + _ = fsw.InsertEntry(context.Background(), entry) + return fsw.DeleteEntry(ctx, entry.FullPath) + }, + }, + { + name: "DeleteOneEntry with cancelled context fails", + cancelContext: true, + expectError: true, + isWriteOp: true, + run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { + _ = fsw.InsertEntry(context.Background(), entry) + return fsw.DeleteOneEntry(ctx, entry) + }, + }, + { + name: "DeleteFolderChildren with cancelled context fails", + cancelContext: true, + expectError: true, + isWriteOp: true, + run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { + childEntry := &Entry{ + FullPath: util.FullPath("/test/folder/child"), + Attr: Attr{Mode: 0o660}, + } + _ = fsw.InsertEntry(context.Background(), childEntry) + return fsw.DeleteFolderChildren(ctx, util.FullPath("/test/folder")) + }, + }, + { + name: "KvPut with cancelled context fails", + cancelContext: true, + expectError: true, + isWriteOp: true, + run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { + return fsw.KvPut(ctx, []byte("test-key"), []byte("test-value")) + }, + }, + { + name: "KvDelete with cancelled context fails", + cancelContext: true, + expectError: true, + isWriteOp: true, + run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { + _ = fsw.KvPut(context.Background(), []byte("test-key"), []byte("test-value")) + return fsw.KvDelete(ctx, []byte("test-key")) + }, + }, + { + name: "FindEntry with cancelled context succeeds (read operation)", + cancelContext: true, + expectError: false, + isWriteOp: false, + run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { + _ = fsw.InsertEntry(context.Background(), entry) + _, err := fsw.FindEntry(ctx, entry.FullPath) + return err + }, + }, + { + name: "KvGet with cancelled context succeeds (read operation)", + cancelContext: true, + expectError: false, + isWriteOp: false, + run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { + _ = fsw.KvPut(context.Background(), []byte("test-key"), []byte("test-value")) + _, err := fsw.KvGet(ctx, []byte("test-key")) + return err + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + store := newStubFilerStore() + wrapper := NewFilerStoreWrapper(store) + entry := &Entry{ + FullPath: util.FullPath("/test/object"), + Attr: Attr{ + Mode: 0o660, + Mime: "application/octet-stream", + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + if tt.cancelContext { + cancel() + } else { + defer cancel() + } + + err := tt.run(wrapper, ctx, entry) + + if tt.expectError { + require.Error(t, err) + if tt.isWriteOp { + assert.True(t, errors.Is(err, context.Canceled)) + } + } else { + require.NoError(t, err) + } + }) + } +} + +// TestFilerStoreWrapperTransactionContextCancellation verifies that transaction +// operations respect context cancellation. +func TestFilerStoreWrapperTransactionContextCancellation(t *testing.T) { + tests := []struct { + name string + cancelContext bool + run func(*FilerStoreWrapper, context.Context) error + }{ + { + name: "BeginTransaction with cancelled context fails", + cancelContext: true, + run: func(fsw *FilerStoreWrapper, ctx context.Context) error { + _, err := fsw.BeginTransaction(ctx) + return err + }, + }, + { + name: "BeginTransaction with active context succeeds", + cancelContext: false, + run: func(fsw *FilerStoreWrapper, ctx context.Context) error { + _, err := fsw.BeginTransaction(ctx) + return err + }, + }, + { + name: "CommitTransaction with cancelled context fails", + cancelContext: true, + run: func(fsw *FilerStoreWrapper, ctx context.Context) error { + return fsw.CommitTransaction(ctx) + }, + }, + { + name: "CommitTransaction with active context succeeds", + cancelContext: false, + run: func(fsw *FilerStoreWrapper, ctx context.Context) error { + return fsw.CommitTransaction(ctx) + }, + }, + { + name: "RollbackTransaction with cancelled context fails", + cancelContext: true, + run: func(fsw *FilerStoreWrapper, ctx context.Context) error { + return fsw.RollbackTransaction(ctx) + }, + }, + { + name: "RollbackTransaction with active context succeeds", + cancelContext: false, + run: func(fsw *FilerStoreWrapper, ctx context.Context) error { + return fsw.RollbackTransaction(ctx) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + store := newStubFilerStore() + wrapper := NewFilerStoreWrapper(store) + + ctx, cancel := context.WithCancel(context.Background()) + if tt.cancelContext { + cancel() + } else { + defer cancel() + } + + err := tt.run(wrapper, ctx) + + if tt.cancelContext { + require.Error(t, err) + assert.True(t, errors.Is(err, context.Canceled)) + } else { + require.NoError(t, err) + } + }) + } +}