* fix(remote_gateway): prevent double-versioning when syncing to versioned central bucket
When a file is uploaded to a versioned bucket on edge, SeaweedFS stores
it internally as {object}.versions/v_{versionId}. The remote_gateway was
syncing this internal path directly to the central S3 endpoint. When
central's bucket also has versioning enabled, this caused central to
apply its own versioning on top, producing corrupt paths like:
object.versions/v_{edgeId}.versions/v_{centralId}
Fix: rewrite internal .versions/v_{id} paths to the original S3 object
key before uploading to the remote. Skip version file delete/update
events that are internal bookkeeping.
Fixes https://github.com/seaweedfs/seaweedfs/discussions/8481#discussioncomment-16209342
* fix(remote_gateway): propagate delete markers to remote as deletions
Delete markers are zero-content version entries (ExtDeleteMarkerKey=true)
created by S3 DELETE on a versioned bucket. Previously they were silently
dropped by the HasData() filter, so deletions on edge never reached
central.
Now: detect delete markers before the HasData check, rewrite the
.versions path to the original S3 key, and issue client.DeleteFile()
on the remote.
* fix(remote_gateway): tighten isVersionedPath to avoid false positives
Address PR review feedback:
- Add isDir parameter to isVersionedPath so it only matches the exact
internal shapes: directories whose name ends with .versions (isDir=true),
and files with the v_ prefix inside a .versions parent (isDir=false).
Previously the function was too broad and could match user-created paths
like "my.versions/data.txt".
- Update all 4 call sites to pass the entry's IsDirectory field.
- Rename TestVersionedDirectoryNotFilteredByHasData to
TestVersionsDirectoryFilteredByHasData so the name reflects the
actual assertion (directories ARE filtered by HasData).
- Expand TestIsVersionedPath with isDir cases and false-positive checks.
* fix(remote_gateway): persist sync marker after delete-marker propagation
The delete-marker branch was calling client.DeleteFile() and returning
without updating the local entry, making event replay re-issue the
remote delete. Now call updateLocalEntry after a successful DeleteFile
to stamp the delete-marker entry with a RemoteEntry, matching the
pattern used by the normal create path.
* refactor(remote_gateway): extract syncDeleteMarker and fix root path edge case
- Extract syncDeleteMarker() shared helper used by both bucketed and
mounted-dir event processors, replacing the duplicated delete + persist
local marker logic.
- Fix rewriteVersionedSourcePath for root-level objects: when lastSlash
is 0 (e.g. "/file.xml.versions"), return "/" as the parent dir instead
of an empty string.
- The strings.Contains(dir, ".versions/") condition flagged in review was
already removed in a prior commit that tightened isVersionedPath.
* fix(remote_gateway): skip updateLocalEntry for versioned path rewrites
After rewriting a .versions/v_{id} path to the logical S3 key and
uploading, the code was calling updateLocalEntry on the original v_*
entry, stamping it with a RemoteEntry for the logical key. This is
semantically wrong: the logical object has no filer entry in versioned
buckets, and the internal v_* entry should not carry a RemoteEntry for
a different path.
Skip updateLocalEntry when the path was rewritten from a versioned
source. Replay safety is preserved because S3 PutObject is idempotent.
* fix(remote_gateway): scope versioning checks to /buckets/ namespace
isVersionedPath and rewriteVersionedSourcePath could wrongly match
paths in non-bucket mounts (e.g. /mnt/remote/file.xml.versions).
Add the same /buckets/ prefix guard used by isMultipartUploadDir so
the .versions / v_ logic only applies within the bucket namespace.
478 lines
18 KiB
Go
478 lines
18 KiB
Go
package command
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"math/rand"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/remote_storage"
|
|
"github.com/seaweedfs/seaweedfs/weed/replication/source"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error {
|
|
|
|
// read filer remote storage mount mappings
|
|
if detectErr := option.collectRemoteStorageConf(); detectErr != nil {
|
|
return fmt.Errorf("read mount info: %w", detectErr)
|
|
}
|
|
|
|
eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
|
|
processor := NewMetadataProcessor(eachEntryFunc, 128, lastOffsetTs.UnixNano())
|
|
|
|
var lastLogTsNs = time.Now().UnixNano()
|
|
processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
|
|
processor.AddSyncJob(resp)
|
|
return nil
|
|
}, 3*time.Second, func(counter int64, lastTsNs int64) error {
|
|
offsetTsNs := processor.processedTsWatermark.Load()
|
|
if offsetTsNs == 0 {
|
|
return nil
|
|
}
|
|
now := time.Now().UnixNano()
|
|
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
|
|
lastLogTsNs = now
|
|
return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, offsetTsNs)
|
|
})
|
|
|
|
option.clientEpoch++
|
|
|
|
metadataFollowOption := &pb.MetadataFollowOption{
|
|
ClientName: "filer.remote.sync",
|
|
ClientId: option.clientId,
|
|
ClientEpoch: option.clientEpoch,
|
|
SelfSignature: 0,
|
|
PathPrefix: option.bucketsDir + "/",
|
|
AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote},
|
|
DirectoriesToWatch: nil,
|
|
StartTsNs: lastOffsetTs.UnixNano(),
|
|
StopTsNs: 0,
|
|
EventErrorType: pb.RetryForeverOnError,
|
|
}
|
|
|
|
return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset)
|
|
}
|
|
|
|
func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
|
|
|
|
handleCreateBucket := func(entry *filer_pb.Entry) error {
|
|
if !entry.IsDirectory {
|
|
return nil
|
|
}
|
|
if entry.RemoteEntry != nil {
|
|
// this directory is imported from "remote.mount.buckets" or "remote.mount"
|
|
return nil
|
|
}
|
|
if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" {
|
|
*option.createBucketAt = option.mappings.PrimaryBucketStorageName
|
|
glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt)
|
|
}
|
|
if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" {
|
|
for k := range option.mappings.Mappings {
|
|
*option.createBucketAt = k
|
|
glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt)
|
|
}
|
|
}
|
|
if *option.createBucketAt == "" {
|
|
return nil
|
|
}
|
|
remoteConf, found := option.remoteConfs[*option.createBucketAt]
|
|
if !found {
|
|
return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt)
|
|
}
|
|
|
|
client, err := remote_storage.GetRemoteStorage(remoteConf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
bucketName := strings.ToLower(entry.Name)
|
|
if *option.include != "" {
|
|
if ok, _ := filepath.Match(*option.include, entry.Name); !ok {
|
|
return nil
|
|
}
|
|
}
|
|
if *option.exclude != "" {
|
|
if ok, _ := filepath.Match(*option.exclude, entry.Name); ok {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
|
|
remoteLocation, found := option.mappings.Mappings[string(bucketPath)]
|
|
if !found {
|
|
if *option.createBucketRandomSuffix {
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
|
|
if len(bucketName)+5 > 63 {
|
|
bucketName = bucketName[:58]
|
|
}
|
|
bucketName = fmt.Sprintf("%s-%04d", bucketName, rand.Uint32()%10000)
|
|
}
|
|
remoteLocation = &remote_pb.RemoteStorageLocation{
|
|
Name: *option.createBucketAt,
|
|
Bucket: bucketName,
|
|
Path: "/",
|
|
}
|
|
// need to add new mapping here before getting updates from metadata tailing
|
|
option.mappings.Mappings[string(bucketPath)] = remoteLocation
|
|
} else {
|
|
bucketName = remoteLocation.Bucket
|
|
}
|
|
|
|
glog.V(0).Infof("create bucket %s", bucketName)
|
|
if err := client.CreateBucket(bucketName); err != nil {
|
|
return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err)
|
|
}
|
|
|
|
return filer.InsertMountMapping(option, string(bucketPath), remoteLocation)
|
|
|
|
}
|
|
handleDeleteBucket := func(entry *filer_pb.Entry) error {
|
|
if !entry.IsDirectory {
|
|
return nil
|
|
}
|
|
|
|
client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name)
|
|
if err != nil {
|
|
return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err)
|
|
}
|
|
|
|
glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket)
|
|
if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil {
|
|
return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err)
|
|
}
|
|
|
|
bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
|
|
|
|
return filer.DeleteMountMapping(option, string(bucketPath))
|
|
}
|
|
|
|
handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
|
message := resp.EventNotification
|
|
if message.NewEntry != nil {
|
|
// update
|
|
if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
|
|
newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
|
|
if readErr != nil {
|
|
return fmt.Errorf("unmarshal mappings: %w", readErr)
|
|
}
|
|
option.mappings = newMappings
|
|
}
|
|
if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
|
|
conf := &remote_pb.RemoteConf{}
|
|
if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
|
|
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
|
|
}
|
|
option.remoteConfs[conf.Name] = conf
|
|
}
|
|
} else if message.OldEntry != nil {
|
|
// deletion
|
|
if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
|
|
conf := &remote_pb.RemoteConf{}
|
|
if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil {
|
|
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err)
|
|
}
|
|
delete(option.remoteConfs, conf.Name)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
|
message := resp.EventNotification
|
|
if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
|
|
return handleEtcRemoteChanges(resp)
|
|
}
|
|
|
|
if filer_pb.IsEmpty(resp) {
|
|
return nil
|
|
}
|
|
if filer_pb.IsCreate(resp) {
|
|
if message.NewParentPath == option.bucketsDir {
|
|
return handleCreateBucket(message.NewEntry)
|
|
}
|
|
if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) {
|
|
return nil
|
|
}
|
|
// Propagate delete markers as deletions on the remote.
|
|
// Delete markers are zero-content version entries, so they
|
|
// would be filtered out by the HasData check below.
|
|
if isDeleteMarker(message.NewEntry) {
|
|
if newParent, newName, ok := rewriteVersionedSourcePath(message.NewParentPath, message.NewEntry.Name); ok {
|
|
bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(newParent)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
client, err := remote_storage.GetRemoteStorage(remoteStorage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dest := toRemoteStorageLocation(bucket, util.NewFullPath(newParent, newName), remoteStorageMountLocation)
|
|
return syncDeleteMarker(client, option, message, dest)
|
|
}
|
|
return nil
|
|
}
|
|
if !filer.HasData(message.NewEntry) {
|
|
return nil
|
|
}
|
|
bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
client, err := remote_storage.GetRemoteStorage(remoteStorage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
glog.V(2).Infof("create: %+v", resp)
|
|
if !shouldSendToRemote(message.NewEntry) {
|
|
glog.V(2).Infof("skipping creating: %+v", resp)
|
|
return nil
|
|
}
|
|
// Rewrite internal versioning paths to the original S3 key
|
|
// to prevent double-versioning when central also has versioning enabled
|
|
parentPath, entryName := message.NewParentPath, message.NewEntry.Name
|
|
isRewrittenVersion := false
|
|
if newParent, newName, ok := rewriteVersionedSourcePath(parentPath, entryName); ok {
|
|
glog.V(0).Infof("rewrite versioned path %s/%s -> %s/%s", parentPath, entryName, newParent, newName)
|
|
parentPath, entryName = newParent, newName
|
|
isRewrittenVersion = true
|
|
}
|
|
dest := toRemoteStorageLocation(bucket, util.NewFullPath(parentPath, entryName), remoteStorageMountLocation)
|
|
if message.NewEntry.IsDirectory {
|
|
glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
|
|
return client.WriteDirectory(dest, message.NewEntry)
|
|
}
|
|
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
|
|
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
|
|
if writeErr != nil {
|
|
return writeErr
|
|
}
|
|
// Skip updateLocalEntry for versioned rewrites: the logical
|
|
// object (e.g. file.xml) has no filer entry in versioned
|
|
// buckets, and stamping the internal v_* entry with a
|
|
// RemoteEntry for the logical key is semantically wrong.
|
|
// Replay is safe because S3 PutObject is idempotent.
|
|
if isRewrittenVersion {
|
|
return nil
|
|
}
|
|
return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
|
|
}
|
|
if filer_pb.IsDelete(resp) {
|
|
if resp.Directory == option.bucketsDir {
|
|
return handleDeleteBucket(message.OldEntry)
|
|
}
|
|
// Skip deletion of internal version files; individual version
|
|
// deletes should not propagate to the remote object
|
|
if isVersionedPath(resp.Directory, message.OldEntry.Name, message.OldEntry.IsDirectory) {
|
|
glog.V(2).Infof("skipping delete of internal version path: %s/%s", resp.Directory, message.OldEntry.Name)
|
|
return nil
|
|
}
|
|
bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
client, err := remote_storage.GetRemoteStorage(remoteStorage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
glog.V(2).Infof("delete: %+v", resp)
|
|
dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
|
|
if message.OldEntry.IsDirectory {
|
|
glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
|
|
return client.RemoveDirectory(dest)
|
|
}
|
|
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
|
|
return client.DeleteFile(dest)
|
|
}
|
|
if message.OldEntry != nil && message.NewEntry != nil {
|
|
if resp.Directory == option.bucketsDir {
|
|
if message.NewParentPath == option.bucketsDir {
|
|
if message.OldEntry.Name == message.NewEntry.Name {
|
|
return nil
|
|
}
|
|
if err := handleCreateBucket(message.NewEntry); err != nil {
|
|
return err
|
|
}
|
|
if err := handleDeleteBucket(message.OldEntry); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) {
|
|
return nil
|
|
}
|
|
// Skip updates to internal version paths
|
|
if isVersionedPath(message.NewParentPath, message.NewEntry.Name, message.NewEntry.IsDirectory) {
|
|
glog.V(2).Infof("skipping update of internal version path: %s/%s", message.NewParentPath, message.NewEntry.Name)
|
|
return nil
|
|
}
|
|
oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory)
|
|
newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath)
|
|
if oldOk && newOk {
|
|
if !shouldSendToRemote(message.NewEntry) {
|
|
glog.V(2).Infof("skipping updating: %+v", resp)
|
|
return nil
|
|
}
|
|
client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
|
|
// update the same entry
|
|
if message.NewEntry.IsDirectory {
|
|
// update directory property
|
|
return nil
|
|
}
|
|
if message.OldEntry.RemoteEntry != nil && filer.IsSameData(message.OldEntry, message.NewEntry) {
|
|
glog.V(2).Infof("update meta: %+v", resp)
|
|
oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
|
|
return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
|
|
} else {
|
|
newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
|
|
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
|
|
if writeErr != nil {
|
|
return writeErr
|
|
}
|
|
return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
|
|
}
|
|
}
|
|
}
|
|
|
|
// the following is entry rename
|
|
if oldOk {
|
|
client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
|
|
if message.OldEntry.IsDirectory {
|
|
return client.RemoveDirectory(oldDest)
|
|
}
|
|
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
|
|
if err := client.DeleteFile(oldDest); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if newOk {
|
|
if !shouldSendToRemote(message.NewEntry) {
|
|
glog.V(2).Infof("skipping updating: %+v", resp)
|
|
return nil
|
|
}
|
|
client, err := remote_storage.GetRemoteStorage(newRemoteStorage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
|
|
if message.NewEntry.IsDirectory {
|
|
return client.WriteDirectory(newDest, message.NewEntry)
|
|
}
|
|
remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
|
|
if writeErr != nil {
|
|
return writeErr
|
|
}
|
|
return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
return eachEntryFunc, nil
|
|
}
|
|
|
|
func (option *RemoteGatewayOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) {
|
|
bucket := util.FullPath(option.bucketsDir).Child(bucketName)
|
|
|
|
var isMounted bool
|
|
remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
|
|
if !isMounted {
|
|
return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket)
|
|
}
|
|
remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name]
|
|
if !hasClient {
|
|
return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
|
|
}
|
|
|
|
client, err = remote_storage.GetRemoteStorage(remoteConf)
|
|
if err != nil {
|
|
return nil, remoteStorageMountLocation, err
|
|
}
|
|
return client, remoteStorageMountLocation, nil
|
|
}
|
|
|
|
func (option *RemoteGatewayOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
|
|
bucket, ok = extractBucketPath(option.bucketsDir, actualDir)
|
|
if !ok {
|
|
return "", nil, nil, false
|
|
}
|
|
var isMounted bool
|
|
remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
|
|
if !isMounted {
|
|
glog.Warningf("%s is not mounted", bucket)
|
|
return "", nil, nil, false
|
|
}
|
|
var hasClient bool
|
|
remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name]
|
|
if !hasClient {
|
|
glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
|
|
return "", nil, nil, false
|
|
}
|
|
return bucket, remoteStorageMountLocation, remoteConf, true
|
|
}
|
|
|
|
func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) {
|
|
if bucketPath, ok := util.ExtractBucketPath(bucketsDir, dir, false); ok {
|
|
return util.FullPath(bucketPath), true
|
|
}
|
|
return "", false
|
|
}
|
|
|
|
func (option *RemoteGatewayOptions) collectRemoteStorageConf() (err error) {
|
|
|
|
if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil {
|
|
if err == filer_pb.ErrNotFound {
|
|
return fmt.Errorf("remote storage is not configured in filer server")
|
|
}
|
|
return err
|
|
} else {
|
|
option.mappings = mappings
|
|
}
|
|
|
|
option.remoteConfs = make(map[string]*remote_pb.RemoteConf)
|
|
var lastConfName string
|
|
err = filer_pb.List(context.Background(), option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error {
|
|
if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
|
|
return nil
|
|
}
|
|
conf := &remote_pb.RemoteConf{}
|
|
if err := proto.Unmarshal(entry.Content, conf); err != nil {
|
|
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
|
|
}
|
|
option.remoteConfs[conf.Name] = conf
|
|
lastConfName = conf.Name
|
|
return nil
|
|
}, "", false, math.MaxUint32)
|
|
|
|
if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 {
|
|
glog.V(0).Infof("%s is set to the default remote storage", lastConfName)
|
|
option.mappings.PrimaryBucketStorageName = lastConfName
|
|
}
|
|
|
|
return
|
|
}
|