Files
seaweedFS/weed/command/filer_remote_gateway_buckets.go
Chris Lu 51ec0d2122 fix(remote_gateway): prevent double-versioning when syncing to versioned central bucket (#8710)
* fix(remote_gateway): prevent double-versioning when syncing to versioned central bucket

When a file is uploaded to a versioned bucket on edge, SeaweedFS stores
it internally as {object}.versions/v_{versionId}. The remote_gateway was
syncing this internal path directly to the central S3 endpoint. When
central's bucket also has versioning enabled, this caused central to
apply its own versioning on top, producing corrupt paths like:
  object.versions/v_{edgeId}.versions/v_{centralId}

Fix: rewrite internal .versions/v_{id} paths to the original S3 object
key before uploading to the remote. Skip version file delete/update
events that are internal bookkeeping.

Fixes https://github.com/seaweedfs/seaweedfs/discussions/8481#discussioncomment-16209342

* fix(remote_gateway): propagate delete markers to remote as deletions

Delete markers are zero-content version entries (ExtDeleteMarkerKey=true)
created by S3 DELETE on a versioned bucket. Previously they were silently
dropped by the HasData() filter, so deletions on edge never reached
central.

Now: detect delete markers before the HasData check, rewrite the
.versions path to the original S3 key, and issue client.DeleteFile()
on the remote.

* fix(remote_gateway): tighten isVersionedPath to avoid false positives

Address PR review feedback:

- Add isDir parameter to isVersionedPath so it only matches the exact
  internal shapes: directories whose name ends with .versions (isDir=true),
  and files with the v_ prefix inside a .versions parent (isDir=false).
  Previously the function was too broad and could match user-created paths
  like "my.versions/data.txt".

- Update all 4 call sites to pass the entry's IsDirectory field.

- Rename TestVersionedDirectoryNotFilteredByHasData to
  TestVersionsDirectoryFilteredByHasData so the name reflects the
  actual assertion (directories ARE filtered by HasData).

- Expand TestIsVersionedPath with isDir cases and false-positive checks.

* fix(remote_gateway): persist sync marker after delete-marker propagation

The delete-marker branch was calling client.DeleteFile() and returning
without updating the local entry, making event replay re-issue the
remote delete. Now call updateLocalEntry after a successful DeleteFile
to stamp the delete-marker entry with a RemoteEntry, matching the
pattern used by the normal create path.

* refactor(remote_gateway): extract syncDeleteMarker and fix root path edge case

- Extract syncDeleteMarker() shared helper used by both bucketed and
  mounted-dir event processors, replacing the duplicated delete + persist
  local marker logic.

- Fix rewriteVersionedSourcePath for root-level objects: when lastSlash
  is 0 (e.g. "/file.xml.versions"), return "/" as the parent dir instead
  of an empty string.

- The strings.Contains(dir, ".versions/") condition flagged in review was
  already removed in a prior commit that tightened isVersionedPath.

* fix(remote_gateway): skip updateLocalEntry for versioned path rewrites

After rewriting a .versions/v_{id} path to the logical S3 key and
uploading, the code was calling updateLocalEntry on the original v_*
entry, stamping it with a RemoteEntry for the logical key. This is
semantically wrong: the logical object has no filer entry in versioned
buckets, and the internal v_* entry should not carry a RemoteEntry for
a different path.

Skip updateLocalEntry when the path was rewritten from a versioned
source. Replay safety is preserved because S3 PutObject is idempotent.

* fix(remote_gateway): scope versioning checks to /buckets/ namespace

isVersionedPath and rewriteVersionedSourcePath could wrongly match
paths in non-bucket mounts (e.g. /mnt/remote/file.xml.versions).

Add the same /buckets/ prefix guard used by isMultipartUploadDir so
the .versions / v_ logic only applies within the bucket namespace.
2026-03-19 21:18:52 -07:00

478 lines
18 KiB
Go

package command
import (
"context"
"fmt"
"math"
"math/rand"
"path/filepath"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
"github.com/seaweedfs/seaweedfs/weed/remote_storage"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/protobuf/proto"
)
func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error {
// read filer remote storage mount mappings
if detectErr := option.collectRemoteStorageConf(); detectErr != nil {
return fmt.Errorf("read mount info: %w", detectErr)
}
eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource)
if err != nil {
return err
}
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
processor := NewMetadataProcessor(eachEntryFunc, 128, lastOffsetTs.UnixNano())
var lastLogTsNs = time.Now().UnixNano()
processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
processor.AddSyncJob(resp)
return nil
}, 3*time.Second, func(counter int64, lastTsNs int64) error {
offsetTsNs := processor.processedTsWatermark.Load()
if offsetTsNs == 0 {
return nil
}
now := time.Now().UnixNano()
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
lastLogTsNs = now
return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, offsetTsNs)
})
option.clientEpoch++
metadataFollowOption := &pb.MetadataFollowOption{
ClientName: "filer.remote.sync",
ClientId: option.clientId,
ClientEpoch: option.clientEpoch,
SelfSignature: 0,
PathPrefix: option.bucketsDir + "/",
AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote},
DirectoriesToWatch: nil,
StartTsNs: lastOffsetTs.UnixNano(),
StopTsNs: 0,
EventErrorType: pb.RetryForeverOnError,
}
return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset)
}
func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
handleCreateBucket := func(entry *filer_pb.Entry) error {
if !entry.IsDirectory {
return nil
}
if entry.RemoteEntry != nil {
// this directory is imported from "remote.mount.buckets" or "remote.mount"
return nil
}
if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" {
*option.createBucketAt = option.mappings.PrimaryBucketStorageName
glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt)
}
if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" {
for k := range option.mappings.Mappings {
*option.createBucketAt = k
glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt)
}
}
if *option.createBucketAt == "" {
return nil
}
remoteConf, found := option.remoteConfs[*option.createBucketAt]
if !found {
return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt)
}
client, err := remote_storage.GetRemoteStorage(remoteConf)
if err != nil {
return err
}
bucketName := strings.ToLower(entry.Name)
if *option.include != "" {
if ok, _ := filepath.Match(*option.include, entry.Name); !ok {
return nil
}
}
if *option.exclude != "" {
if ok, _ := filepath.Match(*option.exclude, entry.Name); ok {
return nil
}
}
bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
remoteLocation, found := option.mappings.Mappings[string(bucketPath)]
if !found {
if *option.createBucketRandomSuffix {
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
if len(bucketName)+5 > 63 {
bucketName = bucketName[:58]
}
bucketName = fmt.Sprintf("%s-%04d", bucketName, rand.Uint32()%10000)
}
remoteLocation = &remote_pb.RemoteStorageLocation{
Name: *option.createBucketAt,
Bucket: bucketName,
Path: "/",
}
// need to add new mapping here before getting updates from metadata tailing
option.mappings.Mappings[string(bucketPath)] = remoteLocation
} else {
bucketName = remoteLocation.Bucket
}
glog.V(0).Infof("create bucket %s", bucketName)
if err := client.CreateBucket(bucketName); err != nil {
return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err)
}
return filer.InsertMountMapping(option, string(bucketPath), remoteLocation)
}
handleDeleteBucket := func(entry *filer_pb.Entry) error {
if !entry.IsDirectory {
return nil
}
client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name)
if err != nil {
return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err)
}
glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket)
if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil {
return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err)
}
bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
return filer.DeleteMountMapping(option, string(bucketPath))
}
handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
if message.NewEntry != nil {
// update
if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
if readErr != nil {
return fmt.Errorf("unmarshal mappings: %w", readErr)
}
option.mappings = newMappings
}
if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
conf := &remote_pb.RemoteConf{}
if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
}
option.remoteConfs[conf.Name] = conf
}
} else if message.OldEntry != nil {
// deletion
if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
conf := &remote_pb.RemoteConf{}
if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil {
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err)
}
delete(option.remoteConfs, conf.Name)
}
}
return nil
}
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
return handleEtcRemoteChanges(resp)
}
if filer_pb.IsEmpty(resp) {
return nil
}
if filer_pb.IsCreate(resp) {
if message.NewParentPath == option.bucketsDir {
return handleCreateBucket(message.NewEntry)
}
if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) {
return nil
}
// Propagate delete markers as deletions on the remote.
// Delete markers are zero-content version entries, so they
// would be filtered out by the HasData check below.
if isDeleteMarker(message.NewEntry) {
if newParent, newName, ok := rewriteVersionedSourcePath(message.NewParentPath, message.NewEntry.Name); ok {
bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(newParent)
if !ok {
return nil
}
client, err := remote_storage.GetRemoteStorage(remoteStorage)
if err != nil {
return err
}
dest := toRemoteStorageLocation(bucket, util.NewFullPath(newParent, newName), remoteStorageMountLocation)
return syncDeleteMarker(client, option, message, dest)
}
return nil
}
if !filer.HasData(message.NewEntry) {
return nil
}
bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath)
if !ok {
return nil
}
client, err := remote_storage.GetRemoteStorage(remoteStorage)
if err != nil {
return err
}
glog.V(2).Infof("create: %+v", resp)
if !shouldSendToRemote(message.NewEntry) {
glog.V(2).Infof("skipping creating: %+v", resp)
return nil
}
// Rewrite internal versioning paths to the original S3 key
// to prevent double-versioning when central also has versioning enabled
parentPath, entryName := message.NewParentPath, message.NewEntry.Name
isRewrittenVersion := false
if newParent, newName, ok := rewriteVersionedSourcePath(parentPath, entryName); ok {
glog.V(0).Infof("rewrite versioned path %s/%s -> %s/%s", parentPath, entryName, newParent, newName)
parentPath, entryName = newParent, newName
isRewrittenVersion = true
}
dest := toRemoteStorageLocation(bucket, util.NewFullPath(parentPath, entryName), remoteStorageMountLocation)
if message.NewEntry.IsDirectory {
glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
return client.WriteDirectory(dest, message.NewEntry)
}
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
if writeErr != nil {
return writeErr
}
// Skip updateLocalEntry for versioned rewrites: the logical
// object (e.g. file.xml) has no filer entry in versioned
// buckets, and stamping the internal v_* entry with a
// RemoteEntry for the logical key is semantically wrong.
// Replay is safe because S3 PutObject is idempotent.
if isRewrittenVersion {
return nil
}
return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
}
if filer_pb.IsDelete(resp) {
if resp.Directory == option.bucketsDir {
return handleDeleteBucket(message.OldEntry)
}
// Skip deletion of internal version files; individual version
// deletes should not propagate to the remote object
if isVersionedPath(resp.Directory, message.OldEntry.Name, message.OldEntry.IsDirectory) {
glog.V(2).Infof("skipping delete of internal version path: %s/%s", resp.Directory, message.OldEntry.Name)
return nil
}
bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory)
if !ok {
return nil
}
client, err := remote_storage.GetRemoteStorage(remoteStorage)
if err != nil {
return err
}
glog.V(2).Infof("delete: %+v", resp)
dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
if message.OldEntry.IsDirectory {
glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
return client.RemoveDirectory(dest)
}
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
return client.DeleteFile(dest)
}
if message.OldEntry != nil && message.NewEntry != nil {
if resp.Directory == option.bucketsDir {
if message.NewParentPath == option.bucketsDir {
if message.OldEntry.Name == message.NewEntry.Name {
return nil
}
if err := handleCreateBucket(message.NewEntry); err != nil {
return err
}
if err := handleDeleteBucket(message.OldEntry); err != nil {
return err
}
}
}
if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) {
return nil
}
// Skip updates to internal version paths
if isVersionedPath(message.NewParentPath, message.NewEntry.Name, message.NewEntry.IsDirectory) {
glog.V(2).Infof("skipping update of internal version path: %s/%s", message.NewParentPath, message.NewEntry.Name)
return nil
}
oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory)
newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath)
if oldOk && newOk {
if !shouldSendToRemote(message.NewEntry) {
glog.V(2).Infof("skipping updating: %+v", resp)
return nil
}
client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
if err != nil {
return err
}
if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
// update the same entry
if message.NewEntry.IsDirectory {
// update directory property
return nil
}
if message.OldEntry.RemoteEntry != nil && filer.IsSameData(message.OldEntry, message.NewEntry) {
glog.V(2).Infof("update meta: %+v", resp)
oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
} else {
newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
if writeErr != nil {
return writeErr
}
return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
}
}
}
// the following is entry rename
if oldOk {
client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
if err != nil {
return err
}
oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
if message.OldEntry.IsDirectory {
return client.RemoveDirectory(oldDest)
}
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
if err := client.DeleteFile(oldDest); err != nil {
return err
}
}
if newOk {
if !shouldSendToRemote(message.NewEntry) {
glog.V(2).Infof("skipping updating: %+v", resp)
return nil
}
client, err := remote_storage.GetRemoteStorage(newRemoteStorage)
if err != nil {
return err
}
newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
if message.NewEntry.IsDirectory {
return client.WriteDirectory(newDest, message.NewEntry)
}
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
if writeErr != nil {
return writeErr
}
return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
}
}
return nil
}
return eachEntryFunc, nil
}
func (option *RemoteGatewayOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) {
bucket := util.FullPath(option.bucketsDir).Child(bucketName)
var isMounted bool
remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
if !isMounted {
return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket)
}
remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name]
if !hasClient {
return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
}
client, err = remote_storage.GetRemoteStorage(remoteConf)
if err != nil {
return nil, remoteStorageMountLocation, err
}
return client, remoteStorageMountLocation, nil
}
func (option *RemoteGatewayOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
bucket, ok = extractBucketPath(option.bucketsDir, actualDir)
if !ok {
return "", nil, nil, false
}
var isMounted bool
remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
if !isMounted {
glog.Warningf("%s is not mounted", bucket)
return "", nil, nil, false
}
var hasClient bool
remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name]
if !hasClient {
glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
return "", nil, nil, false
}
return bucket, remoteStorageMountLocation, remoteConf, true
}
func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) {
if bucketPath, ok := util.ExtractBucketPath(bucketsDir, dir, false); ok {
return util.FullPath(bucketPath), true
}
return "", false
}
func (option *RemoteGatewayOptions) collectRemoteStorageConf() (err error) {
if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil {
if err == filer_pb.ErrNotFound {
return fmt.Errorf("remote storage is not configured in filer server")
}
return err
} else {
option.mappings = mappings
}
option.remoteConfs = make(map[string]*remote_pb.RemoteConf)
var lastConfName string
err = filer_pb.List(context.Background(), option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error {
if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
return nil
}
conf := &remote_pb.RemoteConf{}
if err := proto.Unmarshal(entry.Content, conf); err != nil {
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
}
option.remoteConfs[conf.Name] = conf
lastConfName = conf.Name
return nil
}, "", false, math.MaxUint32)
if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 {
glog.V(0).Infof("%s is set to the default remote storage", lastConfName)
option.mappings.PrimaryBucketStorageName = lastConfName
}
return
}