diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index f7b969453..d782281e0 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -128,10 +128,8 @@ func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefi } func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error { - // Check if context was already cancelled before ignoring cancellation - // This prevents writing metadata for operations that have already failed - if ctx.Err() != nil { - return ctx.Err() + if err := ctx.Err(); err != nil { + return err } ctx = context.WithoutCancel(ctx) actualStore := fsw.getActualStore(entry.FullPath) @@ -160,9 +158,8 @@ func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) err // InsertEntryKnownAbsent skips the pre-insert FindEntry path when the caller has // already established that the target path does not exist. func (fsw *FilerStoreWrapper) InsertEntryKnownAbsent(ctx context.Context, entry *Entry) error { - - if ctx.Err() != nil { - return ctx.Err() + if err := ctx.Err(); err != nil { + return err } ctx = context.WithoutCancel(ctx) actualStore := fsw.getActualStore(entry.FullPath) @@ -187,9 +184,8 @@ func (fsw *FilerStoreWrapper) InsertEntryKnownAbsent(ctx context.Context, entry } func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error { - - if ctx.Err() != nil { - return ctx.Err() + if err := ctx.Err(); err != nil { + return err } ctx = context.WithoutCancel(ctx) actualStore := fsw.getActualStore(entry.FullPath) @@ -249,9 +245,8 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) ( } func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { - - if ctx.Err() != nil { - return ctx.Err() + if err := ctx.Err(); err != nil { + return err } ctx = context.WithoutCancel(ctx) actualStore := fsw.getActualStore(fp) @@ -281,9 +276,8 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) } func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) { - - if ctx.Err() != nil { - return ctx.Err() + if err := ctx.Err(); err != nil { + return err } ctx = context.WithoutCancel(ctx) actualStore := fsw.getActualStore(existingEntry.FullPath) @@ -309,9 +303,8 @@ func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry } func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { - - if ctx.Err() != nil { - return ctx.Err() + if err := ctx.Err(); err != nil { + return err } ctx = context.WithoutCancel(ctx) actualStore := fsw.getActualStore(fp + "/") @@ -419,28 +412,22 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u } func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) { - - if ctx.Err() != nil { - return nil, ctx.Err() + if err := ctx.Err(); err != nil { + return nil, err } ctx = context.WithoutCancel(ctx) return fsw.getDefaultStore().BeginTransaction(ctx) } func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error { - - if ctx.Err() != nil { - return ctx.Err() + if err := ctx.Err(); err != nil { + return err } ctx = context.WithoutCancel(ctx) return fsw.getDefaultStore().CommitTransaction(ctx) } func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error { - - if ctx.Err() != nil { - return ctx.Err() - } ctx = context.WithoutCancel(ctx) return fsw.getDefaultStore().RollbackTransaction(ctx) } @@ -450,9 +437,8 @@ func (fsw *FilerStoreWrapper) Shutdown() { } func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) { - - if ctx.Err() != nil { - return ctx.Err() + if err := ctx.Err(); err != nil { + return err } ctx = context.WithoutCancel(ctx) return fsw.getDefaultStore().KvPut(ctx, key, value) @@ -462,9 +448,8 @@ func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []by return fsw.getDefaultStore().KvGet(ctx, key) } func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) { - - if ctx.Err() != nil { - return ctx.Err() + if err := ctx.Err(); err != nil { + return err } ctx = context.WithoutCancel(ctx) return fsw.getDefaultStore().KvDelete(ctx, key) diff --git a/weed/filer/filerstore_wrapper_test.go b/weed/filer/filerstore_wrapper_test.go index 0474e7e10..0b180f63e 100644 --- a/weed/filer/filerstore_wrapper_test.go +++ b/weed/filer/filerstore_wrapper_test.go @@ -5,6 +5,7 @@ import ( "errors" "os" "testing" + "time" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/stretchr/testify/assert" @@ -71,238 +72,136 @@ func TestFilerStoreWrapperMimeNormalization(t *testing.T) { } } -// TestFilerStoreWrapperContextCancellation verifies that write operations -// respect context cancellation to prevent orphaned metadata -func TestFilerStoreWrapperContextCancellation(t *testing.T) { - tests := []struct { - name string - cancelContext bool - expectError bool - isWriteOp bool - run func(*FilerStoreWrapper, context.Context, *Entry) error - }{ - { - name: "InsertEntry with cancelled context fails", - cancelContext: true, - expectError: true, - isWriteOp: true, - run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { - return fsw.InsertEntry(ctx, entry) - }, - }, - { - name: "InsertEntry with active context succeeds", - cancelContext: false, - expectError: false, - isWriteOp: true, - run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { - return fsw.InsertEntry(ctx, entry) - }, - }, - { - name: "InsertEntryKnownAbsent with cancelled context fails", - cancelContext: true, - expectError: true, - isWriteOp: true, - run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { - return fsw.InsertEntryKnownAbsent(ctx, entry) - }, - }, - { - name: "UpdateEntry with cancelled context fails", - cancelContext: true, - expectError: true, - isWriteOp: true, - run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { - _ = fsw.InsertEntry(context.Background(), entry) - entry.Attr.Mime = "updated" - return fsw.UpdateEntry(ctx, entry) - }, - }, - { - name: "DeleteEntry with cancelled context fails", - cancelContext: true, - expectError: true, - isWriteOp: true, - run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { - _ = fsw.InsertEntry(context.Background(), entry) - return fsw.DeleteEntry(ctx, entry.FullPath) - }, - }, - { - name: "DeleteOneEntry with cancelled context fails", - cancelContext: true, - expectError: true, - isWriteOp: true, - run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { - _ = fsw.InsertEntry(context.Background(), entry) - return fsw.DeleteOneEntry(ctx, entry) - }, - }, - { - name: "DeleteFolderChildren with cancelled context fails", - cancelContext: true, - expectError: true, - isWriteOp: true, - run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { - childEntry := &Entry{ - FullPath: util.FullPath("/test/folder/child"), - Attr: Attr{Mode: 0o660}, - } - _ = fsw.InsertEntry(context.Background(), childEntry) - return fsw.DeleteFolderChildren(ctx, util.FullPath("/test/folder")) - }, - }, - { - name: "KvPut with cancelled context fails", - cancelContext: true, - expectError: true, - isWriteOp: true, - run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { - return fsw.KvPut(ctx, []byte("test-key"), []byte("test-value")) - }, - }, - { - name: "KvDelete with cancelled context fails", - cancelContext: true, - expectError: true, - isWriteOp: true, - run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { - _ = fsw.KvPut(context.Background(), []byte("test-key"), []byte("test-value")) - return fsw.KvDelete(ctx, []byte("test-key")) - }, - }, - { - name: "FindEntry with cancelled context succeeds (read operation)", - cancelContext: true, - expectError: false, - isWriteOp: false, - run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { - _ = fsw.InsertEntry(context.Background(), entry) - _, err := fsw.FindEntry(ctx, entry.FullPath) - return err - }, - }, - { - name: "KvGet with cancelled context succeeds (read operation)", - cancelContext: true, - expectError: false, - isWriteOp: false, - run: func(fsw *FilerStoreWrapper, ctx context.Context, entry *Entry) error { - _ = fsw.KvPut(context.Background(), []byte("test-key"), []byte("test-value")) - _, err := fsw.KvGet(ctx, []byte("test-key")) - return err - }, - }, +// cancelledCtx returns a context that is already cancelled. +func cancelledCtx() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx +} + +// expiredCtx returns a context whose deadline has already passed. +func expiredCtx() context.Context { + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Second)) + cancel() // release resources immediately as it's already expired + return ctx +} + +func TestFilerStoreWrapperWriteOpsRejectCancelledContext(t *testing.T) { + newEntry := func(path string) *Entry { + return &Entry{ + FullPath: util.FullPath(path), + Attr: Attr{Mode: 0o660, Mime: "application/octet-stream"}, + } } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - store := newStubFilerStore() - wrapper := NewFilerStoreWrapper(store) - entry := &Entry{ - FullPath: util.FullPath("/test/object"), - Attr: Attr{ - Mode: 0o660, - Mime: "application/octet-stream", - }, - } + // Each write operation that should be guarded. + writeOps := []struct { + name string + run func(*FilerStoreWrapper, context.Context) error + }{ + {"InsertEntry", func(fsw *FilerStoreWrapper, ctx context.Context) error { + return fsw.InsertEntry(ctx, newEntry("/test/a")) + }}, + {"InsertEntryKnownAbsent", func(fsw *FilerStoreWrapper, ctx context.Context) error { + return fsw.InsertEntryKnownAbsent(ctx, newEntry("/test/b")) + }}, + {"UpdateEntry", func(fsw *FilerStoreWrapper, ctx context.Context) error { + _ = fsw.InsertEntry(context.Background(), newEntry("/test/c")) + return fsw.UpdateEntry(ctx, newEntry("/test/c")) + }}, + {"DeleteEntry", func(fsw *FilerStoreWrapper, ctx context.Context) error { + _ = fsw.InsertEntry(context.Background(), newEntry("/test/d")) + return fsw.DeleteEntry(ctx, "/test/d") + }}, + {"DeleteOneEntry", func(fsw *FilerStoreWrapper, ctx context.Context) error { + e := newEntry("/test/e") + _ = fsw.InsertEntry(context.Background(), e) + return fsw.DeleteOneEntry(ctx, e) + }}, + {"DeleteFolderChildren", func(fsw *FilerStoreWrapper, ctx context.Context) error { + _ = fsw.InsertEntry(context.Background(), newEntry("/test/folder/child")) + return fsw.DeleteFolderChildren(ctx, "/test/folder") + }}, + {"BeginTransaction", func(fsw *FilerStoreWrapper, ctx context.Context) error { + _, err := fsw.BeginTransaction(ctx) + return err + }}, + {"CommitTransaction", func(fsw *FilerStoreWrapper, ctx context.Context) error { + return fsw.CommitTransaction(ctx) + }}, + {"KvPut", func(fsw *FilerStoreWrapper, ctx context.Context) error { + return fsw.KvPut(ctx, []byte("k"), []byte("v")) + }}, + {"KvDelete", func(fsw *FilerStoreWrapper, ctx context.Context) error { + _ = fsw.KvPut(context.Background(), []byte("k"), []byte("v")) + return fsw.KvDelete(ctx, []byte("k")) + }}, + } - ctx, cancel := context.WithCancel(context.Background()) - if tt.cancelContext { - cancel() - } else { - defer cancel() - } + badContexts := []struct { + name string + ctx context.Context + wantError error + }{ + {"cancelled", cancelledCtx(), context.Canceled}, + {"deadline exceeded", expiredCtx(), context.DeadlineExceeded}, + } - err := tt.run(wrapper, ctx, entry) - - if tt.expectError { + for _, op := range writeOps { + for _, bc := range badContexts { + t.Run(op.name+"/"+bc.name, func(t *testing.T) { + wrapper := NewFilerStoreWrapper(newStubFilerStore()) + err := op.run(wrapper, bc.ctx) require.Error(t, err) - if tt.isWriteOp { - assert.True(t, errors.Is(err, context.Canceled)) - } - } else { - require.NoError(t, err) - } - }) + assert.True(t, errors.Is(err, bc.wantError), "got %v, want %v", err, bc.wantError) + }) + } } } -// TestFilerStoreWrapperTransactionContextCancellation verifies that transaction -// operations respect context cancellation. -func TestFilerStoreWrapperTransactionContextCancellation(t *testing.T) { - tests := []struct { - name string - cancelContext bool - run func(*FilerStoreWrapper, context.Context) error - }{ - { - name: "BeginTransaction with cancelled context fails", - cancelContext: true, - run: func(fsw *FilerStoreWrapper, ctx context.Context) error { - _, err := fsw.BeginTransaction(ctx) - return err - }, - }, - { - name: "BeginTransaction with active context succeeds", - cancelContext: false, - run: func(fsw *FilerStoreWrapper, ctx context.Context) error { - _, err := fsw.BeginTransaction(ctx) - return err - }, - }, - { - name: "CommitTransaction with cancelled context fails", - cancelContext: true, - run: func(fsw *FilerStoreWrapper, ctx context.Context) error { - return fsw.CommitTransaction(ctx) - }, - }, - { - name: "CommitTransaction with active context succeeds", - cancelContext: false, - run: func(fsw *FilerStoreWrapper, ctx context.Context) error { - return fsw.CommitTransaction(ctx) - }, - }, - { - name: "RollbackTransaction with cancelled context fails", - cancelContext: true, - run: func(fsw *FilerStoreWrapper, ctx context.Context) error { - return fsw.RollbackTransaction(ctx) - }, - }, - { - name: "RollbackTransaction with active context succeeds", - cancelContext: false, - run: func(fsw *FilerStoreWrapper, ctx context.Context) error { - return fsw.RollbackTransaction(ctx) - }, - }, +func TestFilerStoreWrapperWriteOpsSucceedWithActiveContext(t *testing.T) { + wrapper := NewFilerStoreWrapper(newStubFilerStore()) + ctx := context.Background() + entry := &Entry{ + FullPath: util.FullPath("/test/obj"), + Attr: Attr{Mode: 0o660}, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - store := newStubFilerStore() - wrapper := NewFilerStoreWrapper(store) + require.NoError(t, wrapper.InsertEntry(ctx, entry)) + require.NoError(t, wrapper.UpdateEntry(ctx, entry)) + require.NoError(t, wrapper.DeleteOneEntry(ctx, entry)) + require.NoError(t, wrapper.InsertEntryKnownAbsent(ctx, entry)) + require.NoError(t, wrapper.DeleteEntry(ctx, entry.FullPath)) + require.NoError(t, wrapper.KvPut(ctx, []byte("k"), []byte("v"))) + require.NoError(t, wrapper.KvDelete(ctx, []byte("k"))) - ctx, cancel := context.WithCancel(context.Background()) - if tt.cancelContext { - cancel() - } else { - defer cancel() - } - - err := tt.run(wrapper, ctx) - - if tt.cancelContext { - require.Error(t, err) - assert.True(t, errors.Is(err, context.Canceled)) - } else { - require.NoError(t, err) - } - }) - } + txCtx, err := wrapper.BeginTransaction(ctx) + require.NoError(t, err) + require.NoError(t, wrapper.CommitTransaction(txCtx)) +} + +func TestFilerStoreWrapperReadOpsSucceedWithCancelledContext(t *testing.T) { + wrapper := NewFilerStoreWrapper(newStubFilerStore()) + entry := &Entry{ + FullPath: util.FullPath("/test/readable"), + Attr: Attr{Mode: 0o660}, + } + require.NoError(t, wrapper.InsertEntry(context.Background(), entry)) + require.NoError(t, wrapper.KvPut(context.Background(), []byte("rk"), []byte("rv"))) + + ctx := cancelledCtx() + + _, err := wrapper.FindEntry(ctx, entry.FullPath) + assert.NoError(t, err) + + _, err = wrapper.KvGet(ctx, []byte("rk")) + assert.NoError(t, err) +} + +// RollbackTransaction must succeed even when the context is cancelled or +// expired, because it is a cleanup operation called after failures. +func TestFilerStoreWrapperRollbackSucceedsWithCancelledContext(t *testing.T) { + wrapper := NewFilerStoreWrapper(newStubFilerStore()) + assert.NoError(t, wrapper.RollbackTransaction(cancelledCtx())) + assert.NoError(t, wrapper.RollbackTransaction(expiredCtx())) }