diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index 1c4ae9e9b..97d9217a2 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -115,23 +115,32 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []in totalSize := filer.FileSize(entry) chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize)) - wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background()) - defer wc.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wc := g.client.Bucket(g.bucket).Object(key).NewWriter(ctx) writeFunc := func(data []byte) error { _, writeErr := wc.Write(data) return writeErr } + var writeErr error if len(entry.Content) > 0 { - return writeFunc(entry.Content) + writeErr = writeFunc(entry.Content) + } else { + writeErr = repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc) } - if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil { - return err + if writeErr != nil { + // Cancel the context to abort the GCS upload without touching + // any existing object at this key. + cancel() + wc.Close() + return writeErr } - return nil + return wc.Close() }