package s3api import ( "bytes" "context" "encoding/json" "encoding/xml" "errors" "fmt" "io" "math" "net/http" "sort" "strings" "time" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil" "github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Request) { glog.V(3).Infof("ListBucketsHandler") // Get authenticated identity from context (set by Auth middleware) // For unauthenticated requests, this returns empty string identityId := s3_constants.GetIdentityNameFromContext(r) // Get the full identity object for permission and ownership checks // This is especially important for JWT users whose identity is not in the identities list // Note: We store the full Identity object in context for simplicity. Future optimization // could use a lightweight, credential-free view (name, account, actions, principal ARN) // for better data minimization. var identity *Identity if s3a.iam.isEnabled() { // Try to get the full identity from context first (works for all auth types including JWT) if identityObj := s3_constants.GetIdentityFromContext(r); identityObj != nil { if id, ok := identityObj.(*Identity); ok { identity = id } else { glog.Warningf("ListBucketsHandler: identity object in context has unexpected type: %T", identityObj) } } // Fallback to looking up by name if not in context (backward compatibility) if identity == nil && identityId != "" { identity = s3a.iam.lookupByIdentityName(identityId) } } var response ListAllMyBucketsResult entries, _, err := s3a.list(s3a.option.BucketsPath, "", "", false, math.MaxInt32) if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } var listBuckets ListAllMyBucketsList for _, entry := range entries { if entry.IsDirectory { if strings.HasPrefix(entry.Name, ".") { continue } // Unauthenticated users should not see any buckets if identity == nil { continue } // Check if bucket should be visible to this identity // A bucket is visible if the user owns it OR has explicit permission to list it isOwner := isBucketOwnedByIdentity(entry, identity) // Skip permission check if user is already the owner (optimization) if !isOwner { if errCode := s3a.iam.VerifyActionPermission(r, identity, s3_constants.ACTION_LIST, entry.Name, ""); errCode != s3err.ErrNone { continue } } listBuckets.Bucket = append(listBuckets.Bucket, ListAllMyBucketsEntry{ Name: entry.Name, CreationDate: time.Unix(entry.Attributes.Crtime, 0).UTC(), }) } } response = ListAllMyBucketsResult{ Owner: CanonicalUser{ ID: identityId, DisplayName: identityId, }, Buckets: listBuckets, } glog.V(3).Infof("ListBucketsHandler response: %+v", response) writeSuccessResponseXML(w, r, response) } // isBucketOwnedByIdentity checks if a bucket entry is owned by the given identity. // Returns true if the identity owns the bucket, false otherwise. // // Ownership rules: // - Admin users: considered owners of all buckets // - Non-admin users: own buckets where AmzIdentityId matches identity.Name // - Buckets without owner metadata are not owned by anyone (except admins) func isBucketOwnedByIdentity(entry *filer_pb.Entry, identity *Identity) bool { if !entry.IsDirectory { return false } if identity == nil { return false } // Admin users are considered owners of all buckets if identity.isAdmin() { return true } // Non-admin users with no name cannot own buckets. // This prevents misconfigured identities from matching buckets with empty owner IDs. if identity.Name == "" { return false } // Check ownership via AmzIdentityId metadata id, ok := entry.Extended[s3_constants.AmzIdentityId] if !ok || string(id) != identity.Name { return false } return true } func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request) { // collect parameters bucket, _ := s3_constants.GetBucketAndObject(r) // validate the bucket name err := s3bucket.VerifyS3BucketName(bucket) if err != nil { glog.Errorf("put invalid bucket name: %v %v", bucket, err) s3err.WriteErrorResponse(w, r, s3err.ErrInvalidBucketName) return } // Check if bucket already exists and handle ownership/settings // Get authenticated identity from context (secure, cannot be spoofed) currentIdentityId := s3_constants.GetIdentityNameFromContext(r) // Check collection existence first collectionExists := false if s3a.isTableBucket(bucket) { s3err.WriteErrorResponse(w, r, s3err.ErrBucketAlreadyExists) return } if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { if resp, err := client.CollectionList(context.Background(), &filer_pb.CollectionListRequest{ IncludeEcVolumes: true, IncludeNormalVolumes: true, }); err != nil { glog.Errorf("list collection: %v", err) return fmt.Errorf("list collections: %w", err) } else { for _, c := range resp.Collections { if s3a.getCollectionName(bucket) == c.Name { collectionExists = true break } } } return nil }); err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } // Check bucket directory existence and get metadata if exist, err := s3a.exists(s3a.option.BucketsPath, bucket, true); err == nil && exist { // Bucket exists, check ownership and settings if entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket); err == nil { // Get existing bucket owner var existingOwnerId string if entry.Extended != nil { if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok { existingOwnerId = string(id) } } // Check ownership if existingOwnerId != "" && existingOwnerId != currentIdentityId { // Different owner - always fail with BucketAlreadyExists glog.V(3).Infof("PutBucketHandler: bucket %s owned by %s, requested by %s", bucket, existingOwnerId, currentIdentityId) s3err.WriteErrorResponse(w, r, s3err.ErrBucketAlreadyExists) return } // Same owner or no owner set - check for conflicting settings objectLockRequested := strings.EqualFold(r.Header.Get(s3_constants.AmzBucketObjectLockEnabled), "true") // Get current bucket configuration bucketConfig, errCode := s3a.getBucketConfig(bucket) if errCode != s3err.ErrNone { glog.Errorf("PutBucketHandler: failed to get bucket config for %s: %v", bucket, errCode) // If we can't get config, assume no conflict and allow recreation } else { // Check for Object Lock conflict currentObjectLockEnabled := bucketConfig.ObjectLockConfig != nil && bucketConfig.ObjectLockConfig.ObjectLockEnabled == s3_constants.ObjectLockEnabled if objectLockRequested != currentObjectLockEnabled { // Conflicting Object Lock settings - fail with BucketAlreadyExists glog.V(3).Infof("PutBucketHandler: bucket %s has conflicting Object Lock settings (requested: %v, current: %v)", bucket, objectLockRequested, currentObjectLockEnabled) s3err.WriteErrorResponse(w, r, s3err.ErrBucketAlreadyExists) return } } // Bucket already exists - always return BucketAlreadyExists per S3 specification // The S3 tests expect BucketAlreadyExists in all cases, not BucketAlreadyOwnedByYou glog.V(3).Infof("PutBucketHandler: bucket %s already exists", bucket) s3err.WriteErrorResponse(w, r, s3err.ErrBucketAlreadyExists) return } } // If collection exists but bucket directory doesn't, this is an inconsistent state // that can occur when a previous bucket deletion partially completed (collection // deletion failed but directory deletion succeeded, or volumes were recreated). // Recover by proceeding to create the missing bucket directory. if collectionExists { glog.Warningf("PutBucketHandler: collection exists but bucket directory missing for %s, recovering by creating bucket directory", bucket) } // Check for x-amz-bucket-object-lock-enabled header BEFORE creating bucket // This allows us to create the bucket with Object Lock configuration atomically objectLockEnabled := strings.EqualFold(r.Header.Get(s3_constants.AmzBucketObjectLockEnabled), "true") // Capture any Object Lock configuration error from within the callback // The mkdir callback doesn't support returning errors, so we capture it here var objectLockSetupError error // Create the folder for bucket with all settings atomically // This ensures Object Lock configuration is set in the same CreateEntry call, // preventing race conditions where the bucket exists without Object Lock enabled if err := s3a.mkdir(s3a.option.BucketsPath, bucket, func(entry *filer_pb.Entry) { // Set bucket owner setBucketOwner(r)(entry) // Set Object Lock configuration atomically during bucket creation if objectLockEnabled { glog.V(3).Infof("PutBucketHandler: enabling Object Lock and Versioning for bucket %s atomically", bucket) if entry.Extended == nil { entry.Extended = make(map[string][]byte) } // Enable versioning (required for Object Lock) entry.Extended[s3_constants.ExtVersioningKey] = []byte(s3_constants.VersioningEnabled) // Create and store Object Lock configuration objectLockConfig := &ObjectLockConfiguration{ ObjectLockEnabled: s3_constants.ObjectLockEnabled, } if err := StoreObjectLockConfigurationInExtended(entry, objectLockConfig); err != nil { glog.Errorf("PutBucketHandler: failed to store Object Lock config for bucket %s: %v", bucket, err) objectLockSetupError = err // Note: The entry will still be created, but we'll roll it back below } else { glog.V(3).Infof("PutBucketHandler: set ObjectLockConfig for bucket %s: %+v", bucket, objectLockConfig) } } }); err != nil { // If mkdir failed because another request created the bucket concurrently, // return BucketAlreadyExists instead of InternalError. if exist, checkErr := s3a.exists(s3a.option.BucketsPath, bucket, true); checkErr == nil && exist { glog.V(3).Infof("PutBucketHandler: bucket %s was created concurrently", bucket) s3err.WriteErrorResponse(w, r, s3err.ErrBucketAlreadyExists) return } glog.Errorf("PutBucketHandler mkdir: %v", err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } // If Object Lock setup failed, roll back the bucket creation // This ensures we don't leave a bucket without the requested Object Lock configuration if objectLockSetupError != nil { glog.Errorf("PutBucketHandler: rolling back bucket %s creation due to Object Lock setup failure: %v", bucket, objectLockSetupError) if deleteErr := s3a.rm(s3a.option.BucketsPath, bucket, true, true); deleteErr != nil { glog.Errorf("PutBucketHandler: failed to rollback bucket %s after Object Lock setup failure: %v", bucket, deleteErr) } s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } // Remove bucket from negative cache after successful creation if s3a.bucketConfigCache != nil { s3a.bucketConfigCache.RemoveNegativeCache(bucket) } w.Header().Set("Location", "/"+bucket) writeSuccessResponseEmpty(w, r) } func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) { bucket, _ := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("DeleteBucketHandler %s", bucket) if s3a.isTableBucket(bucket) { s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) return } if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { s3err.WriteErrorResponse(w, r, err) return } // Check if bucket has object lock enabled bucketConfig, errCode := s3a.getBucketConfig(bucket) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return } // If object lock is enabled, check for objects with active locks if bucketConfig.ObjectLockConfig != nil { hasLockedObjects, checkErr := s3a.hasObjectsWithActiveLocks(r.Context(), bucket) if checkErr != nil { glog.Errorf("DeleteBucketHandler: failed to check for locked objects in bucket %s: %v", bucket, checkErr) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } if hasLockedObjects { glog.V(3).Infof("DeleteBucketHandler: bucket %s has objects with active object locks, cannot delete", bucket) s3err.WriteErrorResponse(w, r, s3err.ErrBucketNotEmpty) return } } if !s3a.option.AllowDeleteBucketNotEmpty { if hasUserObjects, err := s3a.bucketHasUserObjects(bucket); err != nil { glog.Errorf("failed to list bucket %s: %v", bucket, err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } else if hasUserObjects { s3err.WriteErrorResponse(w, r, s3err.ErrBucketNotEmpty) return } } // Delete bucket directory first, then collection. This order ensures that if // collection deletion fails, the bucket directory is already gone, preventing // the "collection exists but bucket directory missing" inconsistency that blocks // bucket recreation. An orphaned collection is harmless and will be cleaned up // or reused when the bucket is recreated. err := s3a.rm(s3a.option.BucketsPath, bucket, false, true) if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{ Collection: s3a.getCollectionName(bucket), } glog.V(1).Infof("delete collection: %v", deleteCollectionRequest) if _, err := client.DeleteCollection(context.Background(), deleteCollectionRequest); err != nil { return fmt.Errorf("delete collection %s: %v", bucket, err) } return nil }) if err != nil { // Log but don't fail — the bucket directory is already removed, so the bucket // is effectively deleted. The orphaned collection will be cleaned up or reused. glog.Errorf("DeleteBucketHandler: failed to delete collection for bucket %s: %v", bucket, err) } // Clean up bucket-related caches, locks, and metrics after successful deletion s3a.invalidateBucketConfigCache(bucket) stats_collect.DeleteBucketMetrics(bucket) s3err.WriteEmptyResponse(w, r, http.StatusNoContent) } // bucketHasUserObjects checks whether a bucket contains any non-special entries. // Special entries (.uploads, *.versions) are internal to S3 and don't count as user objects. func (s3a *S3ApiServer) bucketHasUserObjects(bucket string) (bool, error) { bucketPath := s3a.option.BucketsPath + "/" + bucket startFrom := "" // Start with a small batch — most non-empty buckets have a real object early. // If we only find special entries, switch to larger batches to page through quickly. limit := uint32(10) for { entries, isLast, err := s3a.list(bucketPath, "", startFrom, false, limit) if err != nil { return false, err } for _, entry := range entries { if entry.Name != s3_constants.MultipartUploadsFolder && !strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { return true, nil } startFrom = entry.Name } if isLast { return false, nil } limit = 1000 } } // hasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold // Delegates to the shared HasObjectsWithActiveLocks function in object_lock_utils.go func (s3a *S3ApiServer) hasObjectsWithActiveLocks(ctx context.Context, bucket string) (bool, error) { bucketPath := s3a.option.BucketsPath + "/" + bucket var hasLocks bool var checkErr error err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { hasLocks, checkErr = HasObjectsWithActiveLocks(ctx, client, bucketPath) return checkErr }) if err != nil { return false, err } return hasLocks, nil } func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) { bucket, _ := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("HeadBucketHandler %s", bucket) if entry, err := s3a.getBucketEntry(bucket); entry == nil || errors.Is(err, filer_pb.ErrNotFound) { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) return } writeSuccessResponseEmpty(w, r) } func (s3a *S3ApiServer) checkBucket(r *http.Request, bucket string) s3err.ErrorCode { // Use cached bucket config instead of direct getEntry call (optimization) config, errCode := s3a.getBucketConfig(bucket) if errCode != s3err.ErrNone { return errCode } //if iam is enabled, the access was already checked before if s3a.iam.isEnabled() { return s3err.ErrNone } if !s3a.hasAccess(r, config.Entry) { return s3err.ErrAccessDenied } return s3err.ErrNone } // ErrAutoCreatePermissionDenied is returned when a user lacks permission to auto-create buckets var ErrAutoCreatePermissionDenied = errors.New("permission denied - requires Admin permission") // ErrInvalidBucketName is returned when a bucket name doesn't meet S3 naming requirements var ErrInvalidBucketName = errors.New("invalid bucket name") // setBucketOwner creates a function that sets the bucket owner from the request context func setBucketOwner(r *http.Request) func(entry *filer_pb.Entry) { currentIdentityId := s3_constants.GetIdentityNameFromContext(r) return func(entry *filer_pb.Entry) { if currentIdentityId != "" { if entry.Extended == nil { entry.Extended = make(map[string][]byte) } entry.Extended[s3_constants.AmzIdentityId] = []byte(currentIdentityId) } } } // autoCreateBucket creates a bucket if it doesn't exist, setting the owner from the request context // Only users with admin permissions are allowed to auto-create buckets func (s3a *S3ApiServer) autoCreateBucket(r *http.Request, bucket string) error { // Validate the bucket name before auto-creating if err := s3bucket.VerifyS3BucketName(bucket); err != nil { return fmt.Errorf("auto-create bucket %s: %w", bucket, errors.Join(ErrInvalidBucketName, err)) } // Check if user has admin permissions if !s3a.isUserAdmin(r) { return fmt.Errorf("auto-create bucket %s: %w", bucket, ErrAutoCreatePermissionDenied) } if err := s3a.mkdir(s3a.option.BucketsPath, bucket, setBucketOwner(r)); err != nil { // In case of a race condition where another request created the bucket // in the meantime, check for existence before returning an error. if exist, err2 := s3a.exists(s3a.option.BucketsPath, bucket, true); err2 != nil { glog.Warningf("autoCreateBucket: failed to check existence for bucket %s: %v", bucket, err2) return fmt.Errorf("failed to auto-create bucket %s: %w", bucket, errors.Join(err, err2)) } else if exist { // The bucket exists, which is fine. However, we should ensure it has an owner. // If it was created by a concurrent request that didn't set an owner, // we'll set it here to ensure consistency. if entry, getErr := s3a.getEntry(s3a.option.BucketsPath, bucket); getErr == nil { if entry.Extended == nil || len(entry.Extended[s3_constants.AmzIdentityId]) == 0 { // No owner set, assign current admin as owner setBucketOwner(r)(entry) if updateErr := s3a.updateEntry(s3a.option.BucketsPath, entry); updateErr != nil { glog.Warningf("autoCreateBucket: failed to set owner for existing bucket %s: %v", bucket, updateErr) } else { glog.V(1).Infof("Set owner for existing bucket %s (created by concurrent request)", bucket) } } } else { glog.Warningf("autoCreateBucket: failed to get entry for existing bucket %s: %v", bucket, getErr) } // Remove bucket from negative cache — it exists now if s3a.bucketConfigCache != nil { s3a.bucketConfigCache.RemoveNegativeCache(bucket) } return nil } return fmt.Errorf("failed to auto-create bucket %s: %w", bucket, err) } // Remove bucket from negative cache after successful creation if s3a.bucketConfigCache != nil { s3a.bucketConfigCache.RemoveNegativeCache(bucket) } glog.V(1).Infof("Auto-created bucket %s", bucket) return nil } // handleAutoCreateBucket attempts to auto-create a bucket and writes appropriate error responses // Returns true if the bucket was created successfully or already exists, false if an error was written func (s3a *S3ApiServer) handleAutoCreateBucket(w http.ResponseWriter, r *http.Request, bucket, handlerName string) bool { if err := s3a.autoCreateBucket(r, bucket); err != nil { glog.Warningf("%s: %v", handlerName, err) // Check for specific errors to return appropriate S3 error codes if errors.Is(err, ErrInvalidBucketName) { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidBucketName) } else if errors.Is(err, ErrAutoCreatePermissionDenied) { s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) } else { s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) } return false } return true } func (s3a *S3ApiServer) hasAccess(r *http.Request, entry *filer_pb.Entry) bool { // Check if user is properly authenticated as admin through IAM system if s3a.isUserAdmin(r) { return true } if entry.Extended == nil { return true } // Get authenticated identity from context (secure, cannot be spoofed) identityId := s3_constants.GetIdentityNameFromContext(r) if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok { if identityId != string(id) { glog.V(3).Infof("hasAccess: %s != %s (entry.Extended = %v)", identityId, id, entry.Extended) return false } } return true } // isUserAdmin securely checks if the authenticated user is an admin // This validates admin status through proper IAM authentication, not spoofable headers func (s3a *S3ApiServer) isUserAdmin(r *http.Request) bool { // Use a minimal admin action to authenticate and check admin status adminAction := Action("Admin") identity, errCode := s3a.iam.authRequest(r, adminAction) if errCode != s3err.ErrNone { return false } // Check if the authenticated identity has admin privileges return identity != nil && identity.isAdmin() } // isBucketPublicRead checks if a bucket allows anonymous read access based on its cached ACL status func (s3a *S3ApiServer) isBucketPublicRead(bucket string) bool { // Get bucket configuration which contains cached public-read status config, errCode := s3a.getBucketConfig(bucket) if errCode != s3err.ErrNone { glog.V(4).Infof("isBucketPublicRead: failed to get bucket config for %s: %v", bucket, errCode) return false } glog.V(4).Infof("isBucketPublicRead: bucket=%s, IsPublicRead=%v", bucket, config.IsPublicRead) // Return the cached public-read status (no JSON parsing needed) return config.IsPublicRead } // isPublicReadGrants checks if the grants allow public read access func isPublicReadGrants(grants []*s3.Grant) bool { for _, grant := range grants { if grant.Grantee != nil && grant.Grantee.URI != nil && grant.Permission != nil { // Check for AllUsers group with Read permission if *grant.Grantee.URI == s3_constants.GranteeGroupAllUsers && (*grant.Permission == s3_constants.PermissionRead || *grant.Permission == s3_constants.PermissionFullControl) { return true } } } return false } // buildResourceARN builds a resource ARN from bucket and object // Used by the policy engine wrapper func buildResourceARN(bucket, object string) string { if object == "" || object == "/" { return fmt.Sprintf("arn:aws:s3:::%s", bucket) } // Remove leading slash if present object = strings.TrimPrefix(object, "/") return fmt.Sprintf("arn:aws:s3:::%s/%s", bucket, object) } // AuthWithPublicRead creates an auth wrapper that allows anonymous access for public-read buckets func (s3a *S3ApiServer) AuthWithPublicRead(handler http.HandlerFunc, action Action) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { bucket, object := s3_constants.GetBucketAndObject(r) authType := getRequestAuthType(r) isAnonymous := authType == authTypeAnonymous glog.V(4).Infof("AuthWithPublicRead: bucket=%s, object=%s, authType=%v, isAnonymous=%v", bucket, object, authType, isAnonymous) // For anonymous requests, check if bucket allows public read via ACLs or bucket policies if isAnonymous { // First check ACL-based public access isPublic := s3a.isBucketPublicRead(bucket) glog.V(4).Infof("AuthWithPublicRead: bucket=%s, isPublicACL=%v", bucket, isPublic) if isPublic { glog.V(3).Infof("AuthWithPublicRead: allowing anonymous access to public-read bucket %s (ACL)", bucket) handler(w, r) return } // Check bucket policy for anonymous access using the policy engine principal := "*" // Anonymous principal // Evaluate bucket policy (objectEntry nil - not yet fetched) allowed, evaluated, err := s3a.policyEngine.EvaluatePolicy(bucket, object, string(action), principal, r, nil, nil) if err != nil { // SECURITY: Fail-close on policy evaluation errors // If we can't evaluate the policy, deny access rather than falling through to IAM glog.Errorf("AuthWithPublicRead: error evaluating bucket policy for %s/%s: %v - denying access", bucket, object, err) s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) return } else if evaluated { // A bucket policy exists and was evaluated with a matching statement if allowed { // Policy explicitly allows anonymous access glog.V(3).Infof("AuthWithPublicRead: allowing anonymous access to bucket %s (bucket policy)", bucket) handler(w, r) return } else { // Policy explicitly denies anonymous access glog.V(3).Infof("AuthWithPublicRead: bucket policy explicitly denies anonymous access to %s/%s", bucket, object) s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) return } } // No matching policy statement - fall through to check ACLs and then IAM auth glog.V(3).Infof("AuthWithPublicRead: no bucket policy match for %s, checking ACLs", bucket) } // For all authenticated requests and anonymous requests to non-public buckets, // use normal IAM auth to enforce policies s3a.iam.Auth(handler, action)(w, r) } } // GetBucketAclHandler Get Bucket ACL // https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketAcl.html func (s3a *S3ApiServer) GetBucketAclHandler(w http.ResponseWriter, r *http.Request) { // collect parameters bucket, _ := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("GetBucketAclHandler %s", bucket) if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { s3err.WriteErrorResponse(w, r, err) return } amzAccountId := r.Header.Get(s3_constants.AmzAccountId) amzDisplayName := s3a.iam.GetAccountNameById(amzAccountId) response := AccessControlPolicy{ Owner: CanonicalUser{ ID: amzAccountId, DisplayName: amzDisplayName, }, } response.AccessControlList.Grant = append(response.AccessControlList.Grant, Grant{ Grantee: Grantee{ ID: amzAccountId, DisplayName: amzDisplayName, Type: "CanonicalUser", XMLXSI: "CanonicalUser", XMLNS: "http://www.w3.org/2001/XMLSchema-instance"}, Permission: s3.PermissionFullControl, }) writeSuccessResponseXML(w, r, response) } // PutBucketAclHandler Put bucket ACL // https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketAcl.html // func (s3a *S3ApiServer) PutBucketAclHandler(w http.ResponseWriter, r *http.Request) { // collect parameters bucket, _ := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("PutBucketAclHandler %s", bucket) if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { s3err.WriteErrorResponse(w, r, err) return } // Get account information for ACL processing amzAccountId := r.Header.Get(s3_constants.AmzAccountId) // Get bucket ownership settings (these would be used for ownership validation in a full implementation) bucketOwnership := "" // Default/simplified for now - in a full implementation this would be retrieved from bucket config bucketOwnerId := amzAccountId // Simplified - bucket owner is current account // Use the existing ACL parsing logic to handle both canned ACLs and XML body grants, errCode := ExtractAcl(r, s3a.iam, bucketOwnership, bucketOwnerId, amzAccountId, amzAccountId) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return } glog.V(3).Infof("PutBucketAclHandler: bucket=%s, extracted %d grants", bucket, len(grants)) isPublic := isPublicReadGrants(grants) glog.V(3).Infof("PutBucketAclHandler: bucket=%s, isPublicReadGrants=%v", bucket, isPublic) // Store the bucket ACL in bucket metadata errCode = s3a.updateBucketConfig(bucket, func(config *BucketConfig) error { if len(grants) > 0 { grantsBytes, err := json.Marshal(grants) if err != nil { glog.Errorf("PutBucketAclHandler: failed to marshal grants: %v", err) return err } config.ACL = grantsBytes // Cache the public-read status to avoid JSON parsing on every request config.IsPublicRead = isPublicReadGrants(grants) glog.V(4).Infof("PutBucketAclHandler: bucket=%s, setting IsPublicRead=%v", bucket, config.IsPublicRead) } else { config.ACL = nil config.IsPublicRead = false } config.Owner = amzAccountId return nil }) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return } glog.V(3).Infof("PutBucketAclHandler: Successfully stored ACL for bucket %s with %d grants", bucket, len(grants)) // Small delay to ensure ACL propagation across distributed caches // This prevents race conditions in tests where anonymous access is attempted immediately after ACL change time.Sleep(50 * time.Millisecond) writeSuccessResponseEmpty(w, r) } // GetBucketLifecycleConfigurationHandler Get Bucket Lifecycle configuration // https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLifecycleConfiguration.html func (s3a *S3ApiServer) GetBucketLifecycleConfigurationHandler(w http.ResponseWriter, r *http.Request) { // collect parameters bucket, _ := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("GetBucketLifecycleConfigurationHandler %s", bucket) if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { s3err.WriteErrorResponse(w, r, err) return } if lifecycleXML, transitionMinimumObjectSize, found, errCode := s3a.getStoredBucketLifecycleConfiguration(bucket); errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return } else if found { w.Header().Set(bucketLifecycleTransitionMinimumObjectSizeHeader, transitionMinimumObjectSize) writeSuccessResponseXMLBytes(w, r, lifecycleXML) return } // ReadFilerConfFromFilers provides multi-filer failover fc, err := filer.ReadFilerConfFromFilers(s3a.option.Filers, s3a.option.GrpcDialOption, nil) if err != nil { glog.Errorf("GetBucketLifecycleConfigurationHandler: %s", err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } ttls := fc.GetCollectionTtls(s3a.getCollectionName(bucket)) if len(ttls) == 0 { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchLifecycleConfiguration) return } response := Lifecycle{} // Sort locationPrefixes to ensure consistent ordering of lifecycle rules var locationPrefixes []string for locationPrefix := range ttls { locationPrefixes = append(locationPrefixes, locationPrefix) } sort.Strings(locationPrefixes) for _, locationPrefix := range locationPrefixes { internalTtl := ttls[locationPrefix] ttl, _ := needle.ReadTTL(internalTtl) days := int(ttl.Minutes() / 60 / 24) if days == 0 { continue } prefix, found := strings.CutPrefix(locationPrefix, fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)) if !found { continue } response.Rules = append(response.Rules, Rule{ ID: prefix, Status: Enabled, Prefix: Prefix{val: prefix, set: true}, Expiration: Expiration{Days: days, set: true}, }) } if len(response.Rules) > 0 { w.Header().Set(bucketLifecycleTransitionMinimumObjectSizeHeader, defaultLifecycleTransitionMinimumObjectSize) } writeSuccessResponseXML(w, r, response) } // resolveLifecycleDefaultsFromFilerConf returns replication and volumeGrowthCount for use when adding a lifecycle TTL rule. // S3 does not set DataCenter/Rack/DataNode so placement is not pinned to a specific DC/rack. // Precedence: parent path rule first, then filer global. If volumeGrowthCount is 0 but replication is set, // use replication's copy count so the rule is valid (volumeGrowthCount must be divisible by copy count). func resolveLifecycleDefaultsFromFilerConf(fc *filer.FilerConf, filerConfigReplication, bucketsPath, bucket string) (replication string, volumeGrowthCount uint32, err error) { bucketPath := fmt.Sprintf("%s/%s/", bucketsPath, bucket) parentRule := fc.MatchStorageRule(bucketPath) replication = parentRule.Replication if replication == "" { replication = filerConfigReplication } volumeGrowthCount = parentRule.VolumeGrowthCount if volumeGrowthCount == 0 && replication != "" { var rp *super_block.ReplicaPlacement rp, err = super_block.NewReplicaPlacementFromString(replication) if err == nil { volumeGrowthCount = uint32(rp.GetCopyCount()) } } return } // PutBucketLifecycleConfigurationHandler Put Bucket Lifecycle configuration // https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLifecycleConfiguration.html func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWriter, r *http.Request) { // collect parameters bucket, _ := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("PutBucketLifecycleConfigurationHandler %s", bucket) if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { s3err.WriteErrorResponse(w, r, err) return } r.Body = http.MaxBytesReader(w, r.Body, maxBucketLifecycleConfigurationSize) lifecycleXML, err := io.ReadAll(r.Body) if err != nil { glog.Warningf("PutBucketLifecycleConfigurationHandler read body: %s", err) var maxBytesErr *http.MaxBytesError if errors.As(err, &maxBytesErr) { s3err.WriteErrorResponse(w, r, s3err.ErrEntityTooLarge) return } s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest) return } lifeCycleConfig := Lifecycle{} if err := xmlDecoder(bytes.NewReader(lifecycleXML), &lifeCycleConfig, int64(len(lifecycleXML))); err != nil { glog.Warningf("PutBucketLifecycleConfigurationHandler xml decode: %s", err) s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML) return } fc, err := filer.ReadFilerConfFromFilers(s3a.option.Filers, s3a.option.GrpcDialOption, nil) if err != nil { glog.Errorf("PutBucketLifecycleConfigurationHandler read filer config: %s", err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } // Resolve replication so lifecycle rules do not create filer.conf entries with empty replication. var filerConfigReplication string if filerErr := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(r.Context(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err } filerConfigReplication = resp.GetReplication() return nil }); filerErr != nil { glog.V(2).Infof("PutBucketLifecycleConfigurationHandler: could not get filer config: %v", filerErr) } defaultReplication, defaultVolumeGrowthCount, err := resolveLifecycleDefaultsFromFilerConf(fc, filerConfigReplication, s3a.option.BucketsPath, bucket) if err != nil { glog.Warningf("PutBucketLifecycleConfigurationHandler bucket %s: invalid replication %q: %v", bucket, defaultReplication, err) } collectionName := s3a.getCollectionName(bucket) collectionTtls := fc.GetCollectionTtls(collectionName) changed := false // Check whether the bucket has versioning enabled. Versioned buckets must // NOT use the TTL fast-path because: // 1. TTL volumes expire as a unit, destroying all data — including // noncurrent versions that should be preserved. // 2. Filer-backend TTL (RocksDB compaction, Redis expire) removes entries // without triggering chunk deletion, leaving orphaned volume data. // 3. On AWS S3, Expiration.Days on a versioned bucket creates a delete // marker — it does not delete data. TTL has no such nuance. // For versioned buckets the lifecycle worker handles all rule evaluation // at scan time, which correctly operates on individual versions. bucketVersioning, versioningErr := s3a.getBucketVersioningStatus(bucket) if versioningErr != s3err.ErrNone { // Fail closed: if we cannot determine versioning status, treat the // bucket as versioned to avoid creating TTL entries that would // destroy noncurrent versions. glog.V(1).Infof("PutBucketLifecycleConfigurationHandler: could not determine versioning status for %s (err %v), skipping TTL fast-path", bucket, versioningErr) } isVersioned := versioningErr != s3err.ErrNone || bucketVersioning == s3_constants.VersioningEnabled || bucketVersioning == s3_constants.VersioningSuspended for _, rule := range lifeCycleConfig.Rules { if rule.Status != Enabled { continue } // Reject Transition rules — they require storage class migration // infrastructure that does not exist yet. if rule.Transition.set || rule.NoncurrentVersionTransition.set { s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented) return } if isVersioned { continue // all rules evaluated by lifecycle worker at scan time } var rulePrefix string switch { case rule.Filter.andSet: rulePrefix = rule.Filter.And.Prefix.val case rule.Filter.Prefix.set: rulePrefix = rule.Filter.Prefix.val case rule.Prefix.set: rulePrefix = rule.Prefix.val } // Only create filer.conf TTL entries for simple Expiration.Days rules // with prefix-only filters (the fast path handled by RocksDB compaction // filter). Rules with tag or size filters must be evaluated at scan time // by the lifecycle worker, because TTL applies to all objects under the // prefix regardless of tags or size. if rule.Expiration.Days == 0 { continue } hasTagOrSizeFilter := rule.Filter.tagSet || rule.Filter.ObjectSizeGreaterThan > 0 || rule.Filter.ObjectSizeLessThan > 0 || (rule.Filter.andSet && (len(rule.Filter.And.Tags) > 0 || rule.Filter.And.ObjectSizeGreaterThan > 0 || rule.Filter.And.ObjectSizeLessThan > 0)) if hasTagOrSizeFilter { continue // evaluated by lifecycle worker at scan time } locationPrefix := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, rulePrefix) locConf := &filer_pb.FilerConf_PathConf{ LocationPrefix: locationPrefix, Collection: collectionName, Ttl: fmt.Sprintf("%dd", rule.Expiration.Days), Replication: defaultReplication, VolumeGrowthCount: defaultVolumeGrowthCount, // DataCenter/Rack/DataNode intentionally not set: S3 is not tied to a specific DC/rack, // requests can hit any filer; setting them would pin placement unnecessarily. } if ttl, ok := collectionTtls[locConf.LocationPrefix]; ok && ttl == locConf.Ttl { continue } if err := fc.AddLocationConf(locConf); err != nil { glog.Errorf("PutBucketLifecycleConfigurationHandler add location config: %s", err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } ttlSec := int32((time.Duration(rule.Expiration.Days) * util.LifeCycleInterval).Seconds()) glog.V(2).Infof("Start updating TTL for %s", locationPrefix) if updErr := s3a.updateEntriesTTL(locationPrefix, ttlSec); updErr != nil { glog.Errorf("PutBucketLifecycleConfigurationHandler update TTL for %s: %s", locationPrefix, updErr) } else { glog.V(2).Infof("Finished updating TTL for %s", locationPrefix) } changed = true } if changed { var buf bytes.Buffer if err := fc.ToText(&buf); err != nil { glog.Errorf("PutBucketLifecycleConfigurationHandler save config to text: %s", err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) } if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf.Bytes()) }); err != nil { glog.Errorf("PutBucketLifecycleConfigurationHandler save config inside filer: %s", err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } } if errCode := s3a.storeBucketLifecycleConfiguration(bucket, lifecycleXML, r.Header.Get(bucketLifecycleTransitionMinimumObjectSizeHeader)); errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return } writeSuccessResponseEmpty(w, r) } // DeleteBucketLifecycleHandler Delete Bucket Lifecycle // https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketLifecycle.html func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *http.Request) { // collect parameters bucket, _ := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("DeleteBucketLifecycleHandler %s", bucket) if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { s3err.WriteErrorResponse(w, r, err) return } fc, err := filer.ReadFilerConfFromFilers(s3a.option.Filers, s3a.option.GrpcDialOption, nil) if err != nil { glog.Errorf("DeleteBucketLifecycleHandler read filer config: %s", err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } collectionTtls := fc.GetCollectionTtls(s3a.getCollectionName(bucket)) changed := false for prefix, ttl := range collectionTtls { bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket) if strings.HasPrefix(prefix, bucketPrefix) && strings.HasSuffix(ttl, "d") { pathConf, found := fc.GetLocationConf(prefix) if found { pathConf.Ttl = "" fc.SetLocationConf(pathConf) } changed = true } } if changed { var buf bytes.Buffer if err := fc.ToText(&buf); err != nil { glog.Errorf("DeleteBucketLifecycleHandler save config to text: %s", err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) } if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf.Bytes()) }); err != nil { glog.Errorf("DeleteBucketLifecycleHandler save config inside filer: %s", err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } } if errCode := s3a.clearStoredBucketLifecycleConfiguration(bucket); errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return } s3err.WriteEmptyResponse(w, r, http.StatusNoContent) } // GetBucketLocationHandler Get bucket location // https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLocation.html func (s3a *S3ApiServer) GetBucketLocationHandler(w http.ResponseWriter, r *http.Request) { bucket, _ := s3_constants.GetBucketAndObject(r) if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { s3err.WriteErrorResponse(w, r, err) return } writeSuccessResponseXML(w, r, CreateBucketConfiguration{}) } // GetBucketRequestPaymentHandler Get bucket location // https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketRequestPayment.html func (s3a *S3ApiServer) GetBucketRequestPaymentHandler(w http.ResponseWriter, r *http.Request) { writeSuccessResponseXML(w, r, RequestPaymentConfiguration{Payer: "BucketOwner"}) } // PutBucketOwnershipControls https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketOwnershipControls.html func (s3a *S3ApiServer) PutBucketOwnershipControls(w http.ResponseWriter, r *http.Request) { bucket, _ := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("PutBucketOwnershipControls %s", bucket) errCode := s3a.checkAccessByOwnership(r, bucket) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return } if r.Body == nil || r.Body == http.NoBody { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest) return } var v s3.OwnershipControls defer util_http.CloseRequest(r) err := xmlutil.UnmarshalXML(&v, xml.NewDecoder(r.Body), "") if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest) return } if len(v.Rules) != 1 { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest) return } printOwnership := true ownership := *v.Rules[0].ObjectOwnership switch ownership { case s3_constants.OwnershipObjectWriter: case s3_constants.OwnershipBucketOwnerPreferred: case s3_constants.OwnershipBucketOwnerEnforced: printOwnership = false default: s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest) return } // Check if ownership needs to be updated currentOwnership, errCode := s3a.getBucketOwnership(bucket) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return } if currentOwnership != ownership { errCode = s3a.setBucketOwnership(bucket, ownership) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return } } if printOwnership { result := &s3.PutBucketOwnershipControlsInput{ OwnershipControls: &v, } s3err.WriteAwsXMLResponse(w, r, http.StatusOK, result) } else { writeSuccessResponseEmpty(w, r) } } // GetBucketOwnershipControls https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketOwnershipControls.html func (s3a *S3ApiServer) GetBucketOwnershipControls(w http.ResponseWriter, r *http.Request) { bucket, _ := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("GetBucketOwnershipControls %s", bucket) errCode := s3a.checkAccessByOwnership(r, bucket) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return } // Get ownership using new bucket config system ownership, errCode := s3a.getBucketOwnership(bucket) if errCode == s3err.ErrNoSuchBucket { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) return } else if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, s3err.OwnershipControlsNotFoundError) return } result := &s3.PutBucketOwnershipControlsInput{ OwnershipControls: &s3.OwnershipControls{ Rules: []*s3.OwnershipControlsRule{ { ObjectOwnership: &ownership, }, }, }, } s3err.WriteAwsXMLResponse(w, r, http.StatusOK, result) } // DeleteBucketOwnershipControls https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketOwnershipControls.html func (s3a *S3ApiServer) DeleteBucketOwnershipControls(w http.ResponseWriter, r *http.Request) { bucket, _ := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("PutBucketOwnershipControls %s", bucket) errCode := s3a.checkAccessByOwnership(r, bucket) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return } bucketEntry, err := s3a.getEntry(s3a.option.BucketsPath, bucket) if err != nil { if errors.Is(err, filer_pb.ErrNotFound) { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) return } s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } _, ok := bucketEntry.Extended[s3_constants.ExtOwnershipKey] if !ok { s3err.WriteErrorResponse(w, r, s3err.OwnershipControlsNotFoundError) return } delete(bucketEntry.Extended, s3_constants.ExtOwnershipKey) err = s3a.updateEntry(s3a.option.BucketsPath, bucketEntry) if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } emptyOwnershipControls := &s3.OwnershipControls{ Rules: []*s3.OwnershipControlsRule{}, } s3err.WriteAwsXMLResponse(w, r, http.StatusOK, emptyOwnershipControls) } // GetBucketVersioningHandler Get Bucket Versioning status // https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html func (s3a *S3ApiServer) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Request) { bucket, _ := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("GetBucketVersioning %s", bucket) if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { s3err.WriteErrorResponse(w, r, err) return } // Get versioning status using new bucket config system versioningStatus, errCode := s3a.getBucketVersioningStatus(bucket) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return } // AWS S3 behavior: If versioning was never configured, don't return Status field var response *s3.PutBucketVersioningInput if versioningStatus == "" { // No versioning configuration - return empty response (no Status field) response = &s3.PutBucketVersioningInput{ VersioningConfiguration: &s3.VersioningConfiguration{}, } } else { // Versioning was explicitly configured - return the status response = &s3.PutBucketVersioningInput{ VersioningConfiguration: &s3.VersioningConfiguration{ Status: aws.String(versioningStatus), }, } } s3err.WriteAwsXMLResponse(w, r, http.StatusOK, response) } // PutBucketVersioningHandler Put bucket Versioning // https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html func (s3a *S3ApiServer) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Request) { bucket, _ := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("PutBucketVersioning %s", bucket) if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { s3err.WriteErrorResponse(w, r, err) return } if r.Body == nil || r.Body == http.NoBody { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest) return } var versioningConfig s3.VersioningConfiguration defer util_http.CloseRequest(r) err := xmlutil.UnmarshalXML(&versioningConfig, xml.NewDecoder(r.Body), "") if err != nil { glog.Warningf("PutBucketVersioningHandler xml decode: %s", err) s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML) return } if versioningConfig.Status == nil { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest) return } status := *versioningConfig.Status if status != s3_constants.VersioningEnabled && status != s3_constants.VersioningSuspended { glog.Errorf("PutBucketVersioningHandler: invalid status '%s' for bucket %s", status, bucket) s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest) return } // Check if trying to suspend versioning on a bucket with object lock enabled if status == s3_constants.VersioningSuspended { // Get bucket configuration to check for object lock bucketConfig, errCode := s3a.getBucketConfig(bucket) if errCode == s3err.ErrNone && bucketConfig.ObjectLockConfig != nil { // Object lock is enabled, cannot suspend versioning s3err.WriteErrorResponse(w, r, s3err.ErrInvalidBucketState) return } } // Update bucket versioning configuration using new bucket config system if errCode := s3a.setBucketVersioningStatus(bucket, status); errCode != s3err.ErrNone { glog.Errorf("PutBucketVersioningHandler save config: bucket=%s, status='%s', errCode=%d", bucket, status, errCode) s3err.WriteErrorResponse(w, r, errCode) return } writeSuccessResponseEmpty(w, r) }