Files
seaweedFS/weed/replication/sink/gcssink/gcs_sink.go
Chris Lu 72eb93919c fix(gcssink): prevent empty object finalization on write failure (#8933)
* fix(gcssink): prevent empty object finalization on write failure

The GCS writer was created unconditionally with defer wc.Close(),
which finalizes the upload even when content decryption or copy
fails. This silently overwrites valid objects with empty data.
Remove the unconditional defer, explicitly close on success to
propagate errors, and delete the object on write failure.

* fix(gcssink): use context cancellation instead of obj.Delete on failure

obj.Delete() after a failed write would delete the existing object at
that key, causing data loss on updates. Use a cancelable context
instead — cancelling before Close() aborts the GCS upload without
touching any pre-existing object.
2026-04-05 16:07:49 -07:00

150 lines
4.0 KiB
Go

package gcssink
import (
"context"
"fmt"
"os"
"strings"
"cloud.google.com/go/storage"
"github.com/seaweedfs/seaweedfs/weed/replication/repl_util"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
"github.com/seaweedfs/seaweedfs/weed/util"
)
type GcsSink struct {
client *storage.Client
bucket string
dir string
filerSource *source.FilerSource
isIncremental bool
}
func init() {
sink.Sinks = append(sink.Sinks, &GcsSink{})
}
func (g *GcsSink) GetName() string {
return "google_cloud_storage"
}
func (g *GcsSink) GetSinkToDirectory() string {
return g.dir
}
func (g *GcsSink) IsIncremental() bool {
return g.isIncremental
}
func (g *GcsSink) Initialize(configuration util.Configuration, prefix string) error {
g.isIncremental = configuration.GetBool(prefix + "is_incremental")
return g.initialize(
configuration.GetString(prefix+"google_application_credentials"),
configuration.GetString(prefix+"bucket"),
configuration.GetString(prefix+"directory"),
)
}
func (g *GcsSink) SetSourceFiler(s *source.FilerSource) {
g.filerSource = s
}
func (g *GcsSink) initialize(google_application_credentials, bucketName, dir string) error {
g.bucket = bucketName
g.dir = dir
// Creates a client.
var clientOpts []option.ClientOption
if google_application_credentials != "" {
var data []byte
var err error
if strings.HasPrefix(google_application_credentials, "{") {
data = []byte(google_application_credentials)
} else {
googleCredentialsPath := util.ResolvePath(google_application_credentials)
data, err = os.ReadFile(googleCredentialsPath)
if err != nil {
return fmt.Errorf("failed to read credentials file %s: %v", googleCredentialsPath, err)
}
}
creds, err := google.CredentialsFromJSON(context.Background(), data, storage.ScopeFullControl)
if err != nil {
return fmt.Errorf("failed to parse credentials: %v", err)
}
httpClient := oauth2.NewClient(context.Background(), creds.TokenSource)
clientOpts = append(clientOpts, option.WithHTTPClient(httpClient), option.WithoutAuthentication())
}
client, err := storage.NewClient(context.Background(), clientOpts...)
if err != nil {
return fmt.Errorf("failed to create client with credentials \"%s\" env \"%s\": %v",
google_application_credentials, os.Getenv("GOOGLE_APPLICATION_CREDENTIALS"), err)
}
g.client = client
return nil
}
func (g *GcsSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
if isDirectory {
key = key + "/"
}
if err := g.client.Bucket(g.bucket).Object(key).Delete(context.Background()); err != nil {
return fmt.Errorf("gcs delete %s/%s: %v", g.bucket, key, err)
}
return nil
}
func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
if entry.IsDirectory {
return nil
}
totalSize := filer.FileSize(entry)
chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wc := g.client.Bucket(g.bucket).Object(key).NewWriter(ctx)
writeFunc := func(data []byte) error {
_, writeErr := wc.Write(data)
return writeErr
}
var writeErr error
if len(entry.Content) > 0 {
writeErr = writeFunc(entry.Content)
} else {
writeErr = repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc)
}
if writeErr != nil {
// Cancel the context to abort the GCS upload without touching
// any existing object at this key.
cancel()
wc.Close()
return writeErr
}
return wc.Close()
}
func (g *GcsSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
return true, g.CreateEntry(key, newEntry, signatures)
}