diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index 8eb2218e7..1c2eb7944 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -138,6 +138,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] _, err := appendBlobClient.Create(ctxCreate, nil) needsWrite := true + freshlyCreated := false if err != nil { if bloberror.HasCode(err, bloberror.BlobAlreadyExists) { // Handle existing blob - check if overwrite is needed and perform it if necessary @@ -146,9 +147,13 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] if handleErr != nil { return handleErr } + // handleExistingBlob recreates the blob when needsWrite is true + freshlyCreated = needsWrite } else { return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err) } + } else { + freshlyCreated = true } // If we don't need to write (blob is up-to-date), return early @@ -156,6 +161,23 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] return nil } + // cleanupOnError deletes a freshly created blob when content write fails, + // preventing empty blobs from being left behind. + cleanupOnError := func(writeErr error) error { + if !freshlyCreated { + return writeErr + } + glog.Warningf("azure sink: cleaning up empty blob %s/%s after write failure: %v", g.container, key, writeErr) + ctxCleanup, cancelCleanup := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) + defer cancelCleanup() + if _, delErr := appendBlobClient.Delete(ctxCleanup, nil); delErr != nil { + if !bloberror.HasCode(delErr, bloberror.BlobNotFound) { + glog.Warningf("azure sink: failed to clean up blob %s/%s: %v", g.container, key, delErr) + } + } + return writeErr + } + writeFunc := func(data []byte) error { ctxWrite, cancelWrite := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout) defer cancelWrite() @@ -164,11 +186,14 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] } if len(entry.Content) > 0 { - return writeFunc(entry.Content) + if err := writeFunc(entry.Content); err != nil { + return cleanupOnError(err) + } + return nil } if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil { - return err + return cleanupOnError(err) } return nil