From 4fd974b16b3ceed9dee3d05daa5cdaf191e44ef3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 5 Apr 2026 16:07:34 -0700 Subject: [PATCH] fix(azuresink): delete freshly created blob on write failure (#8934) * fix(azuresink): delete freshly created blob on write failure appendBlobClient.Create() runs before content decryption and copy. If MaybeDecryptContent or CopyFromChunkViews fails, an empty blob is left behind, silently replacing any previous valid data. Add cleanup that deletes the blob on content write errors when we were the ones who created it. * fix(azuresink): track recreated blobs for cleanup on write failure handleExistingBlob deletes and recreates the blob when overwrite is needed, but freshlyCreated was only set on the initial Create success path. Set freshlyCreated = needsWrite after handleExistingBlob so recreated blobs are also cleaned up on content write failure. --- weed/replication/sink/azuresink/azure_sink.go | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index 8eb2218e7..1c2eb7944 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -138,6 +138,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] _, err := appendBlobClient.Create(ctxCreate, nil) needsWrite := true + freshlyCreated := false if err != nil { if bloberror.HasCode(err, bloberror.BlobAlreadyExists) { // Handle existing blob - check if overwrite is needed and perform it if necessary @@ -146,9 +147,13 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] if handleErr != nil { return handleErr } + // handleExistingBlob recreates the blob when needsWrite is true + freshlyCreated = needsWrite } else { return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err) } + } else { + freshlyCreated = true } // If we don't need to write (blob is up-to-date), return early @@ -156,6 +161,23 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] return nil } + // cleanupOnError deletes a freshly created blob when content write fails, + // preventing empty blobs from being left behind. + cleanupOnError := func(writeErr error) error { + if !freshlyCreated { + return writeErr + } + glog.Warningf("azure sink: cleaning up empty blob %s/%s after write failure: %v", g.container, key, writeErr) + ctxCleanup, cancelCleanup := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + defer cancelCleanup() + if _, delErr := appendBlobClient.Delete(ctxCleanup, nil); delErr != nil { + if !bloberror.HasCode(delErr, bloberror.BlobNotFound) { + glog.Warningf("azure sink: failed to clean up blob %s/%s: %v", g.container, key, delErr) + } + } + return writeErr + } + writeFunc := func(data []byte) error { ctxWrite, cancelWrite := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) defer cancelWrite() @@ -164,11 +186,14 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] } if len(entry.Content) > 0 { - return writeFunc(entry.Content) + if err := writeFunc(entry.Content); err != nil { + return cleanupOnError(err) + } + return nil } if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil { - return err + return cleanupOnError(err) } return nil