* fix listing objects * add more list testing * address comments * fix next marker * fix isTruncated in listing * fix tests * address tests * Update s3api_object_handlers_multipart.go * fixes * store json into bucket content, for tagging and cors * switch bucket metadata from json to proto * fix * Update s3api_bucket_config.go * fix test issue * fix test_bucket_listv2_delimiter_prefix * Update cors.go * skip special characters * passing listing * fix test_bucket_list_delimiter_prefix * ok. fix the xsd generated go code now * fix cors tests * fix test * fix test_bucket_list_unordered and test_bucket_listv2_unordered do not accept the allow-unordered and delimiter parameter combination * fix test_bucket_list_objects_anonymous and test_bucket_listv2_objects_anonymous The tests test_bucket_list_objects_anonymous and test_bucket_listv2_objects_anonymous were failing because they try to set bucket ACL to public-read, but SeaweedFS only supported private ACL. Updated PutBucketAclHandler to use the existing ExtractAcl function which already supports all standard S3 canned ACLs Replaced the hardcoded check for only private ACL with proper ACL parsing that handles public-read, public-read-write, authenticated-read, bucket-owner-read, bucket-owner-full-control, etc. Added unit tests to verify all standard canned ACLs are accepted * fix list unordered The test is expecting the error code to be InvalidArgument instead of InvalidRequest * allow anonymous listing( and head, get) * fix test_bucket_list_maxkeys_invalid Invalid values: max-keys=blah → Returns ErrInvalidMaxKeys (HTTP 400) * updating IsPublicRead when parsing acl * more logs * CORS Test Fix * fix test_bucket_list_return_data * default to private * fix test_bucket_list_delimiter_not_skip_special * default no acl * add debug logging * more logs * use basic http client remove logs also * fixes * debug * Update stats.go * debugging * fix anonymous test expectation anonymous user can read, as configured in s3 json.
601 lines
19 KiB
Go
601 lines
19 KiB
Go
package s3api
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go/service/s3"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/cors"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
|
)
|
|
|
|
// BucketConfig represents cached bucket configuration
|
|
type BucketConfig struct {
|
|
Name string
|
|
Versioning string // "Enabled", "Suspended", or ""
|
|
Ownership string
|
|
ACL []byte
|
|
Owner string
|
|
IsPublicRead bool // Cached flag to avoid JSON parsing on every request
|
|
CORS *cors.CORSConfiguration
|
|
ObjectLockConfig *ObjectLockConfiguration // Cached parsed Object Lock configuration
|
|
LastModified time.Time
|
|
Entry *filer_pb.Entry
|
|
}
|
|
|
|
// BucketConfigCache provides caching for bucket configurations
|
|
// Cache entries are automatically updated/invalidated through metadata subscription events,
|
|
// so TTL serves as a safety fallback rather than the primary consistency mechanism
|
|
type BucketConfigCache struct {
|
|
cache map[string]*BucketConfig
|
|
mutex sync.RWMutex
|
|
ttl time.Duration // Safety fallback TTL; real-time consistency maintained via events
|
|
}
|
|
|
|
// NewBucketConfigCache creates a new bucket configuration cache
|
|
// TTL can be set to a longer duration since cache consistency is maintained
|
|
// through real-time metadata subscription events rather than TTL expiration
|
|
func NewBucketConfigCache(ttl time.Duration) *BucketConfigCache {
|
|
return &BucketConfigCache{
|
|
cache: make(map[string]*BucketConfig),
|
|
ttl: ttl,
|
|
}
|
|
}
|
|
|
|
// Get retrieves bucket configuration from cache
|
|
func (bcc *BucketConfigCache) Get(bucket string) (*BucketConfig, bool) {
|
|
bcc.mutex.RLock()
|
|
defer bcc.mutex.RUnlock()
|
|
|
|
config, exists := bcc.cache[bucket]
|
|
if !exists {
|
|
return nil, false
|
|
}
|
|
|
|
// Check if cache entry is expired (safety fallback; entries are normally updated via events)
|
|
if time.Since(config.LastModified) > bcc.ttl {
|
|
return nil, false
|
|
}
|
|
|
|
return config, true
|
|
}
|
|
|
|
// Set stores bucket configuration in cache
|
|
func (bcc *BucketConfigCache) Set(bucket string, config *BucketConfig) {
|
|
bcc.mutex.Lock()
|
|
defer bcc.mutex.Unlock()
|
|
|
|
config.LastModified = time.Now()
|
|
bcc.cache[bucket] = config
|
|
}
|
|
|
|
// Remove removes bucket configuration from cache
|
|
func (bcc *BucketConfigCache) Remove(bucket string) {
|
|
bcc.mutex.Lock()
|
|
defer bcc.mutex.Unlock()
|
|
|
|
delete(bcc.cache, bucket)
|
|
}
|
|
|
|
// Clear clears all cached configurations
|
|
func (bcc *BucketConfigCache) Clear() {
|
|
bcc.mutex.Lock()
|
|
defer bcc.mutex.Unlock()
|
|
|
|
bcc.cache = make(map[string]*BucketConfig)
|
|
}
|
|
|
|
// getBucketConfig retrieves bucket configuration with caching
|
|
func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.ErrorCode) {
|
|
// Try cache first
|
|
if config, found := s3a.bucketConfigCache.Get(bucket); found {
|
|
return config, s3err.ErrNone
|
|
}
|
|
|
|
// Try to get from filer
|
|
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
|
|
if err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
// Bucket doesn't exist
|
|
return nil, s3err.ErrNoSuchBucket
|
|
}
|
|
glog.Errorf("getBucketConfig: failed to get bucket entry for %s: %v", bucket, err)
|
|
return nil, s3err.ErrInternalError
|
|
}
|
|
|
|
config := &BucketConfig{
|
|
Name: bucket,
|
|
Entry: entry,
|
|
IsPublicRead: false, // Explicitly default to false for private buckets
|
|
}
|
|
|
|
// Extract configuration from extended attributes
|
|
if entry.Extended != nil {
|
|
if versioning, exists := entry.Extended[s3_constants.ExtVersioningKey]; exists {
|
|
config.Versioning = string(versioning)
|
|
}
|
|
if ownership, exists := entry.Extended[s3_constants.ExtOwnershipKey]; exists {
|
|
config.Ownership = string(ownership)
|
|
}
|
|
if acl, exists := entry.Extended[s3_constants.ExtAmzAclKey]; exists {
|
|
config.ACL = acl
|
|
// Parse ACL once and cache public-read status
|
|
config.IsPublicRead = parseAndCachePublicReadStatus(acl)
|
|
} else {
|
|
// No ACL means private bucket
|
|
config.IsPublicRead = false
|
|
}
|
|
if owner, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
|
|
config.Owner = string(owner)
|
|
}
|
|
// Parse Object Lock configuration if present
|
|
if objectLockConfig, found := LoadObjectLockConfigurationFromExtended(entry); found {
|
|
config.ObjectLockConfig = objectLockConfig
|
|
glog.V(2).Infof("getBucketConfig: cached Object Lock configuration for bucket %s", bucket)
|
|
}
|
|
}
|
|
|
|
// Load CORS configuration from bucket directory content
|
|
if corsConfig, err := s3a.loadCORSFromBucketContent(bucket); err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
// Missing metadata is not an error; fall back cleanly
|
|
glog.V(2).Infof("CORS metadata not found for bucket %s, falling back to default behavior", bucket)
|
|
} else {
|
|
// Log parsing or validation errors
|
|
glog.Errorf("Failed to load CORS configuration for bucket %s: %v", bucket, err)
|
|
}
|
|
} else {
|
|
config.CORS = corsConfig
|
|
}
|
|
|
|
// Cache the result
|
|
s3a.bucketConfigCache.Set(bucket, config)
|
|
|
|
return config, s3err.ErrNone
|
|
}
|
|
|
|
// updateBucketConfig updates bucket configuration and invalidates cache
|
|
func (s3a *S3ApiServer) updateBucketConfig(bucket string, updateFn func(*BucketConfig) error) s3err.ErrorCode {
|
|
config, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
return errCode
|
|
}
|
|
|
|
// Apply update function
|
|
if err := updateFn(config); err != nil {
|
|
glog.Errorf("updateBucketConfig: update function failed for bucket %s: %v", bucket, err)
|
|
return s3err.ErrInternalError
|
|
}
|
|
|
|
// Prepare extended attributes
|
|
if config.Entry.Extended == nil {
|
|
config.Entry.Extended = make(map[string][]byte)
|
|
}
|
|
|
|
// Update extended attributes
|
|
if config.Versioning != "" {
|
|
config.Entry.Extended[s3_constants.ExtVersioningKey] = []byte(config.Versioning)
|
|
}
|
|
if config.Ownership != "" {
|
|
config.Entry.Extended[s3_constants.ExtOwnershipKey] = []byte(config.Ownership)
|
|
}
|
|
if config.ACL != nil {
|
|
config.Entry.Extended[s3_constants.ExtAmzAclKey] = config.ACL
|
|
}
|
|
if config.Owner != "" {
|
|
config.Entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(config.Owner)
|
|
}
|
|
// Update Object Lock configuration
|
|
if config.ObjectLockConfig != nil {
|
|
if err := StoreObjectLockConfigurationInExtended(config.Entry, config.ObjectLockConfig); err != nil {
|
|
glog.Errorf("updateBucketConfig: failed to store Object Lock configuration for bucket %s: %v", bucket, err)
|
|
return s3err.ErrInternalError
|
|
}
|
|
}
|
|
|
|
// Save to filer
|
|
err := s3a.updateEntry(s3a.option.BucketsPath, config.Entry)
|
|
if err != nil {
|
|
glog.Errorf("updateBucketConfig: failed to update bucket entry for %s: %v", bucket, err)
|
|
return s3err.ErrInternalError
|
|
}
|
|
|
|
// Update cache
|
|
s3a.bucketConfigCache.Set(bucket, config)
|
|
|
|
return s3err.ErrNone
|
|
}
|
|
|
|
// isVersioningEnabled checks if versioning is enabled for a bucket (with caching)
|
|
func (s3a *S3ApiServer) isVersioningEnabled(bucket string) (bool, error) {
|
|
config, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
if errCode == s3err.ErrNoSuchBucket {
|
|
return false, filer_pb.ErrNotFound
|
|
}
|
|
return false, fmt.Errorf("failed to get bucket config: %v", errCode)
|
|
}
|
|
|
|
// Versioning is enabled if explicitly set to "Enabled" OR if object lock is enabled
|
|
// (since object lock requires versioning to be enabled)
|
|
return config.Versioning == s3_constants.VersioningEnabled || config.ObjectLockConfig != nil, nil
|
|
}
|
|
|
|
// isVersioningConfigured checks if versioning has been configured (either Enabled or Suspended)
|
|
func (s3a *S3ApiServer) isVersioningConfigured(bucket string) (bool, error) {
|
|
config, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
if errCode == s3err.ErrNoSuchBucket {
|
|
return false, filer_pb.ErrNotFound
|
|
}
|
|
return false, fmt.Errorf("failed to get bucket config: %v", errCode)
|
|
}
|
|
|
|
// Versioning is configured if explicitly set to either "Enabled" or "Suspended"
|
|
// OR if object lock is enabled (which forces versioning)
|
|
return config.Versioning != "" || config.ObjectLockConfig != nil, nil
|
|
}
|
|
|
|
// getVersioningState returns the detailed versioning state for a bucket
|
|
func (s3a *S3ApiServer) getVersioningState(bucket string) (string, error) {
|
|
config, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
if errCode == s3err.ErrNoSuchBucket {
|
|
return "", filer_pb.ErrNotFound
|
|
}
|
|
return "", fmt.Errorf("failed to get bucket config: %v", errCode)
|
|
}
|
|
|
|
// If object lock is enabled, versioning must be enabled regardless of explicit setting
|
|
if config.ObjectLockConfig != nil {
|
|
return s3_constants.VersioningEnabled, nil
|
|
}
|
|
|
|
// Return the explicit versioning status (empty string means never configured)
|
|
return config.Versioning, nil
|
|
}
|
|
|
|
// getBucketVersioningStatus returns the versioning status for a bucket
|
|
func (s3a *S3ApiServer) getBucketVersioningStatus(bucket string) (string, s3err.ErrorCode) {
|
|
config, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
return "", errCode
|
|
}
|
|
|
|
// Return exactly what's stored - empty string means versioning was never configured
|
|
// This matches AWS S3 behavior where new buckets have no Status field in GetBucketVersioning response
|
|
return config.Versioning, s3err.ErrNone
|
|
}
|
|
|
|
// setBucketVersioningStatus sets the versioning status for a bucket
|
|
func (s3a *S3ApiServer) setBucketVersioningStatus(bucket, status string) s3err.ErrorCode {
|
|
return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
|
|
config.Versioning = status
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// getBucketOwnership returns the ownership setting for a bucket
|
|
func (s3a *S3ApiServer) getBucketOwnership(bucket string) (string, s3err.ErrorCode) {
|
|
config, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
return "", errCode
|
|
}
|
|
|
|
return config.Ownership, s3err.ErrNone
|
|
}
|
|
|
|
// setBucketOwnership sets the ownership setting for a bucket
|
|
func (s3a *S3ApiServer) setBucketOwnership(bucket, ownership string) s3err.ErrorCode {
|
|
return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
|
|
config.Ownership = ownership
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// loadCORSFromBucketContent loads CORS configuration from bucket directory content
|
|
func (s3a *S3ApiServer) loadCORSFromBucketContent(bucket string) (*cors.CORSConfiguration, error) {
|
|
_, corsConfig, err := s3a.getBucketMetadata(bucket)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Note: corsConfig can be nil if no CORS configuration is set, which is valid
|
|
return corsConfig, nil
|
|
}
|
|
|
|
// getCORSConfiguration retrieves CORS configuration with caching
|
|
func (s3a *S3ApiServer) getCORSConfiguration(bucket string) (*cors.CORSConfiguration, s3err.ErrorCode) {
|
|
config, errCode := s3a.getBucketConfig(bucket)
|
|
if errCode != s3err.ErrNone {
|
|
return nil, errCode
|
|
}
|
|
|
|
return config.CORS, s3err.ErrNone
|
|
}
|
|
|
|
// updateCORSConfiguration updates the CORS configuration for a bucket
|
|
func (s3a *S3ApiServer) updateCORSConfiguration(bucket string, corsConfig *cors.CORSConfiguration) s3err.ErrorCode {
|
|
// Get existing metadata
|
|
existingTags, _, err := s3a.getBucketMetadata(bucket)
|
|
if err != nil {
|
|
glog.Errorf("updateCORSConfiguration: failed to get bucket metadata for bucket %s: %v", bucket, err)
|
|
return s3err.ErrInternalError
|
|
}
|
|
|
|
// Update CORS configuration
|
|
updatedCorsConfig := corsConfig
|
|
|
|
// Store updated metadata
|
|
if err := s3a.setBucketMetadata(bucket, existingTags, updatedCorsConfig); err != nil {
|
|
glog.Errorf("updateCORSConfiguration: failed to persist CORS config to bucket content for bucket %s: %v", bucket, err)
|
|
return s3err.ErrInternalError
|
|
}
|
|
|
|
// Cache will be updated automatically via metadata subscription
|
|
return s3err.ErrNone
|
|
}
|
|
|
|
// removeCORSConfiguration removes the CORS configuration for a bucket
|
|
func (s3a *S3ApiServer) removeCORSConfiguration(bucket string) s3err.ErrorCode {
|
|
// Get existing metadata
|
|
existingTags, _, err := s3a.getBucketMetadata(bucket)
|
|
if err != nil {
|
|
glog.Errorf("removeCORSConfiguration: failed to get bucket metadata for bucket %s: %v", bucket, err)
|
|
return s3err.ErrInternalError
|
|
}
|
|
|
|
// Remove CORS configuration
|
|
var nilCorsConfig *cors.CORSConfiguration = nil
|
|
|
|
// Store updated metadata
|
|
if err := s3a.setBucketMetadata(bucket, existingTags, nilCorsConfig); err != nil {
|
|
glog.Errorf("removeCORSConfiguration: failed to remove CORS config from bucket content for bucket %s: %v", bucket, err)
|
|
return s3err.ErrInternalError
|
|
}
|
|
|
|
// Cache will be updated automatically via metadata subscription
|
|
return s3err.ErrNone
|
|
}
|
|
|
|
// Conversion functions between CORS types and protobuf types
|
|
|
|
// corsRuleToProto converts a CORS rule to protobuf format
|
|
func corsRuleToProto(rule cors.CORSRule) *s3_pb.CORSRule {
|
|
return &s3_pb.CORSRule{
|
|
AllowedHeaders: rule.AllowedHeaders,
|
|
AllowedMethods: rule.AllowedMethods,
|
|
AllowedOrigins: rule.AllowedOrigins,
|
|
ExposeHeaders: rule.ExposeHeaders,
|
|
MaxAgeSeconds: int32(getMaxAgeSecondsValue(rule.MaxAgeSeconds)),
|
|
Id: rule.ID,
|
|
}
|
|
}
|
|
|
|
// corsRuleFromProto converts a protobuf CORS rule to standard format
|
|
func corsRuleFromProto(protoRule *s3_pb.CORSRule) cors.CORSRule {
|
|
var maxAge *int
|
|
// Always create the pointer if MaxAgeSeconds is >= 0
|
|
// This prevents nil pointer dereferences in tests and matches AWS behavior
|
|
if protoRule.MaxAgeSeconds >= 0 {
|
|
age := int(protoRule.MaxAgeSeconds)
|
|
maxAge = &age
|
|
}
|
|
// Only leave maxAge as nil if MaxAgeSeconds was explicitly set to a negative value
|
|
|
|
return cors.CORSRule{
|
|
AllowedHeaders: protoRule.AllowedHeaders,
|
|
AllowedMethods: protoRule.AllowedMethods,
|
|
AllowedOrigins: protoRule.AllowedOrigins,
|
|
ExposeHeaders: protoRule.ExposeHeaders,
|
|
MaxAgeSeconds: maxAge,
|
|
ID: protoRule.Id,
|
|
}
|
|
}
|
|
|
|
// corsConfigToProto converts CORS configuration to protobuf format
|
|
func corsConfigToProto(config *cors.CORSConfiguration) *s3_pb.CORSConfiguration {
|
|
if config == nil {
|
|
return nil
|
|
}
|
|
|
|
protoRules := make([]*s3_pb.CORSRule, len(config.CORSRules))
|
|
for i, rule := range config.CORSRules {
|
|
protoRules[i] = corsRuleToProto(rule)
|
|
}
|
|
|
|
return &s3_pb.CORSConfiguration{
|
|
CorsRules: protoRules,
|
|
}
|
|
}
|
|
|
|
// corsConfigFromProto converts protobuf CORS configuration to standard format
|
|
func corsConfigFromProto(protoConfig *s3_pb.CORSConfiguration) *cors.CORSConfiguration {
|
|
if protoConfig == nil {
|
|
return nil
|
|
}
|
|
|
|
rules := make([]cors.CORSRule, len(protoConfig.CorsRules))
|
|
for i, protoRule := range protoConfig.CorsRules {
|
|
rules[i] = corsRuleFromProto(protoRule)
|
|
}
|
|
|
|
return &cors.CORSConfiguration{
|
|
CORSRules: rules,
|
|
}
|
|
}
|
|
|
|
// getMaxAgeSecondsValue safely extracts max age seconds value
|
|
func getMaxAgeSecondsValue(maxAge *int) int {
|
|
if maxAge == nil {
|
|
return 0
|
|
}
|
|
return *maxAge
|
|
}
|
|
|
|
// parseAndCachePublicReadStatus parses the ACL and caches the public-read status
|
|
func parseAndCachePublicReadStatus(acl []byte) bool {
|
|
var grants []*s3.Grant
|
|
if err := json.Unmarshal(acl, &grants); err != nil {
|
|
return false
|
|
}
|
|
|
|
// Check if any grant gives read permission to "AllUsers" group
|
|
for _, grant := range grants {
|
|
if grant.Grantee != nil && grant.Grantee.URI != nil && grant.Permission != nil {
|
|
// Check for AllUsers group with Read permission
|
|
if *grant.Grantee.URI == s3_constants.GranteeGroupAllUsers &&
|
|
(*grant.Permission == s3_constants.PermissionRead || *grant.Permission == s3_constants.PermissionFullControl) {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// getBucketMetadata retrieves bucket metadata from bucket directory content using protobuf
|
|
func (s3a *S3ApiServer) getBucketMetadata(bucket string) (map[string]string, *cors.CORSConfiguration, error) {
|
|
// Validate bucket name to prevent path traversal attacks
|
|
if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") ||
|
|
strings.Contains(bucket, "..") || strings.Contains(bucket, "~") {
|
|
return nil, nil, fmt.Errorf("invalid bucket name: %s", bucket)
|
|
}
|
|
|
|
// Clean the bucket name further to prevent any potential path traversal
|
|
bucket = filepath.Clean(bucket)
|
|
if bucket == "." || bucket == ".." {
|
|
return nil, nil, fmt.Errorf("invalid bucket name: %s", bucket)
|
|
}
|
|
|
|
// Get bucket directory entry to access its content
|
|
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err)
|
|
}
|
|
if entry == nil {
|
|
return nil, nil, fmt.Errorf("bucket directory not found %s", bucket)
|
|
}
|
|
|
|
// If no content, return empty metadata
|
|
if len(entry.Content) == 0 {
|
|
return make(map[string]string), nil, nil
|
|
}
|
|
|
|
// Unmarshal metadata from protobuf
|
|
var protoMetadata s3_pb.BucketMetadata
|
|
if err := proto.Unmarshal(entry.Content, &protoMetadata); err != nil {
|
|
glog.Errorf("getBucketMetadata: failed to unmarshal protobuf metadata for bucket %s: %v", bucket, err)
|
|
return make(map[string]string), nil, nil // Return empty metadata on error, don't fail
|
|
}
|
|
|
|
// Convert protobuf CORS to standard CORS
|
|
corsConfig := corsConfigFromProto(protoMetadata.Cors)
|
|
|
|
return protoMetadata.Tags, corsConfig, nil
|
|
}
|
|
|
|
// setBucketMetadata stores bucket metadata in bucket directory content using protobuf
|
|
func (s3a *S3ApiServer) setBucketMetadata(bucket string, tags map[string]string, corsConfig *cors.CORSConfiguration) error {
|
|
// Validate bucket name to prevent path traversal attacks
|
|
if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") ||
|
|
strings.Contains(bucket, "..") || strings.Contains(bucket, "~") {
|
|
return fmt.Errorf("invalid bucket name: %s", bucket)
|
|
}
|
|
|
|
// Clean the bucket name further to prevent any potential path traversal
|
|
bucket = filepath.Clean(bucket)
|
|
if bucket == "." || bucket == ".." {
|
|
return fmt.Errorf("invalid bucket name: %s", bucket)
|
|
}
|
|
|
|
// Create protobuf metadata
|
|
protoMetadata := &s3_pb.BucketMetadata{
|
|
Tags: tags,
|
|
Cors: corsConfigToProto(corsConfig),
|
|
}
|
|
|
|
// Marshal metadata to protobuf
|
|
metadataBytes, err := proto.Marshal(protoMetadata)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal bucket metadata to protobuf: %w", err)
|
|
}
|
|
|
|
// Update the bucket entry with new content
|
|
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
// Get current bucket entry
|
|
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
|
|
if err != nil {
|
|
return fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err)
|
|
}
|
|
if entry == nil {
|
|
return fmt.Errorf("bucket directory not found %s", bucket)
|
|
}
|
|
|
|
// Update content with metadata
|
|
entry.Content = metadataBytes
|
|
|
|
request := &filer_pb.UpdateEntryRequest{
|
|
Directory: s3a.option.BucketsPath,
|
|
Entry: entry,
|
|
}
|
|
|
|
_, err = client.UpdateEntry(context.Background(), request)
|
|
return err
|
|
})
|
|
return err
|
|
}
|
|
|
|
// getBucketTags retrieves bucket tags from bucket directory content
|
|
func (s3a *S3ApiServer) getBucketTags(bucket string) (map[string]string, error) {
|
|
tags, _, err := s3a.getBucketMetadata(bucket)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(tags) == 0 {
|
|
return nil, fmt.Errorf("no tags configuration found")
|
|
}
|
|
|
|
return tags, nil
|
|
}
|
|
|
|
// setBucketTags stores bucket tags in bucket directory content
|
|
func (s3a *S3ApiServer) setBucketTags(bucket string, tags map[string]string) error {
|
|
// Get existing metadata
|
|
_, existingCorsConfig, err := s3a.getBucketMetadata(bucket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Store updated metadata with new tags
|
|
err = s3a.setBucketMetadata(bucket, tags, existingCorsConfig)
|
|
return err
|
|
}
|
|
|
|
// deleteBucketTags removes bucket tags from bucket directory content
|
|
func (s3a *S3ApiServer) deleteBucketTags(bucket string) error {
|
|
// Get existing metadata
|
|
_, existingCorsConfig, err := s3a.getBucketMetadata(bucket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Store updated metadata with empty tags
|
|
emptyTags := make(map[string]string)
|
|
err = s3a.setBucketMetadata(bucket, emptyTags, existingCorsConfig)
|
|
return err
|
|
}
|