Files
seaweedFS/weed/s3api/s3api_bucket_handlers.go
Chris Lu 297cdef1a3 s3api: accept all supported lifecycle rule types (#8813)
* s3api: accept NoncurrentVersionExpiration, AbortIncompleteMultipartUpload, Expiration.Date

Update PutBucketLifecycleConfigurationHandler to accept all newly-supported
lifecycle rule types. Only Transition and NoncurrentVersionTransition are
still rejected (require storage class tier infrastructure).

Changes:
- Remove ErrNotImplemented for Expiration.Date (handled by worker at scan time)
- Only reject rules with Transition.set or NoncurrentVersionTransition.set
- Extract prefix from Filter.And when present
- Add comment explaining that non-Expiration.Days rules are evaluated by
  the lifecycle worker from stored lifecycle XML, not via filer.conf TTL

The lifecycle XML is already stored verbatim in bucket metadata, so new
rule types are preserved on Get even without explicit handler support.
Filer.conf TTL entries are only created for Expiration.Days (fast path).

* s3api: skip TTL fast path for rules with tag or size filters

Rules with tag or size constraints (Filter.Tag, Filter.And with tags
or size bounds, Filter.ObjectSizeGreaterThan/LessThan) must not be
lowered to filer.conf TTL entries, because TTL applies unconditionally
to all objects under the prefix. These rules are evaluated at scan
time by the lifecycle worker which checks each object's tags and size.

Only simple Expiration.Days rules with prefix-only filters use the
TTL fast path (RocksDB compaction filter).

---------

Co-authored-by: Copilot <copilot@github.com>
2026-03-28 19:39:21 -07:00

1351 lines
49 KiB
Go

package s3api
import (
"bytes"
"context"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"io"
"math"
"net/http"
"sort"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Request) {
glog.V(3).Infof("ListBucketsHandler")
// Get authenticated identity from context (set by Auth middleware)
// For unauthenticated requests, this returns empty string
identityId := s3_constants.GetIdentityNameFromContext(r)
// Get the full identity object for permission and ownership checks
// This is especially important for JWT users whose identity is not in the identities list
// Note: We store the full Identity object in context for simplicity. Future optimization
// could use a lightweight, credential-free view (name, account, actions, principal ARN)
// for better data minimization.
var identity *Identity
if s3a.iam.isEnabled() {
// Try to get the full identity from context first (works for all auth types including JWT)
if identityObj := s3_constants.GetIdentityFromContext(r); identityObj != nil {
if id, ok := identityObj.(*Identity); ok {
identity = id
} else {
glog.Warningf("ListBucketsHandler: identity object in context has unexpected type: %T", identityObj)
}
}
// Fallback to looking up by name if not in context (backward compatibility)
if identity == nil && identityId != "" {
identity = s3a.iam.lookupByIdentityName(identityId)
}
}
var response ListAllMyBucketsResult
entries, _, err := s3a.list(s3a.option.BucketsPath, "", "", false, math.MaxInt32)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
var listBuckets ListAllMyBucketsList
for _, entry := range entries {
if entry.IsDirectory {
if strings.HasPrefix(entry.Name, ".") {
continue
}
// Unauthenticated users should not see any buckets
if identity == nil {
continue
}
// Check if bucket should be visible to this identity
// A bucket is visible if the user owns it OR has explicit permission to list it
isOwner := isBucketOwnedByIdentity(entry, identity)
// Skip permission check if user is already the owner (optimization)
if !isOwner {
if errCode := s3a.iam.VerifyActionPermission(r, identity, s3_constants.ACTION_LIST, entry.Name, ""); errCode != s3err.ErrNone {
continue
}
}
listBuckets.Bucket = append(listBuckets.Bucket, ListAllMyBucketsEntry{
Name: entry.Name,
CreationDate: time.Unix(entry.Attributes.Crtime, 0).UTC(),
})
}
}
response = ListAllMyBucketsResult{
Owner: CanonicalUser{
ID: identityId,
DisplayName: identityId,
},
Buckets: listBuckets,
}
glog.V(3).Infof("ListBucketsHandler response: %+v", response)
writeSuccessResponseXML(w, r, response)
}
// isBucketOwnedByIdentity checks if a bucket entry is owned by the given identity.
// Returns true if the identity owns the bucket, false otherwise.
//
// Ownership rules:
// - Admin users: considered owners of all buckets
// - Non-admin users: own buckets where AmzIdentityId matches identity.Name
// - Buckets without owner metadata are not owned by anyone (except admins)
func isBucketOwnedByIdentity(entry *filer_pb.Entry, identity *Identity) bool {
if !entry.IsDirectory {
return false
}
if identity == nil {
return false
}
// Admin users are considered owners of all buckets
if identity.isAdmin() {
return true
}
// Non-admin users with no name cannot own buckets.
// This prevents misconfigured identities from matching buckets with empty owner IDs.
if identity.Name == "" {
return false
}
// Check ownership via AmzIdentityId metadata
id, ok := entry.Extended[s3_constants.AmzIdentityId]
if !ok || string(id) != identity.Name {
return false
}
return true
}
// isBucketVisibleToIdentity is kept for backward compatibility with tests.
// It checks if a bucket should be visible based on ownership only.
// Deprecated: Use isBucketOwnedByIdentity instead. The ListBucketsHandler
// now uses OR logic: a bucket is visible if user owns it OR has List permission.
func isBucketVisibleToIdentity(entry *filer_pb.Entry, identity *Identity) bool {
return isBucketOwnedByIdentity(entry, identity)
}
func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request) {
// collect parameters
bucket, _ := s3_constants.GetBucketAndObject(r)
// validate the bucket name
err := s3bucket.VerifyS3BucketName(bucket)
if err != nil {
glog.Errorf("put invalid bucket name: %v %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidBucketName)
return
}
// Check if bucket already exists and handle ownership/settings
// Get authenticated identity from context (secure, cannot be spoofed)
currentIdentityId := s3_constants.GetIdentityNameFromContext(r)
// Check collection existence first
collectionExists := false
if s3a.isTableBucket(bucket) {
s3err.WriteErrorResponse(w, r, s3err.ErrBucketAlreadyExists)
return
}
if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
if resp, err := client.CollectionList(context.Background(), &filer_pb.CollectionListRequest{
IncludeEcVolumes: true,
IncludeNormalVolumes: true,
}); err != nil {
glog.Errorf("list collection: %v", err)
return fmt.Errorf("list collections: %w", err)
} else {
for _, c := range resp.Collections {
if s3a.getCollectionName(bucket) == c.Name {
collectionExists = true
break
}
}
}
return nil
}); err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Check bucket directory existence and get metadata
if exist, err := s3a.exists(s3a.option.BucketsPath, bucket, true); err == nil && exist {
// Bucket exists, check ownership and settings
if entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket); err == nil {
// Get existing bucket owner
var existingOwnerId string
if entry.Extended != nil {
if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
existingOwnerId = string(id)
}
}
// Check ownership
if existingOwnerId != "" && existingOwnerId != currentIdentityId {
// Different owner - always fail with BucketAlreadyExists
glog.V(3).Infof("PutBucketHandler: bucket %s owned by %s, requested by %s", bucket, existingOwnerId, currentIdentityId)
s3err.WriteErrorResponse(w, r, s3err.ErrBucketAlreadyExists)
return
}
// Same owner or no owner set - check for conflicting settings
objectLockRequested := strings.EqualFold(r.Header.Get(s3_constants.AmzBucketObjectLockEnabled), "true")
// Get current bucket configuration
bucketConfig, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
glog.Errorf("PutBucketHandler: failed to get bucket config for %s: %v", bucket, errCode)
// If we can't get config, assume no conflict and allow recreation
} else {
// Check for Object Lock conflict
currentObjectLockEnabled := bucketConfig.ObjectLockConfig != nil &&
bucketConfig.ObjectLockConfig.ObjectLockEnabled == s3_constants.ObjectLockEnabled
if objectLockRequested != currentObjectLockEnabled {
// Conflicting Object Lock settings - fail with BucketAlreadyExists
glog.V(3).Infof("PutBucketHandler: bucket %s has conflicting Object Lock settings (requested: %v, current: %v)",
bucket, objectLockRequested, currentObjectLockEnabled)
s3err.WriteErrorResponse(w, r, s3err.ErrBucketAlreadyExists)
return
}
}
// Bucket already exists - always return BucketAlreadyExists per S3 specification
// The S3 tests expect BucketAlreadyExists in all cases, not BucketAlreadyOwnedByYou
glog.V(3).Infof("PutBucketHandler: bucket %s already exists", bucket)
s3err.WriteErrorResponse(w, r, s3err.ErrBucketAlreadyExists)
return
}
}
// If collection exists but bucket directory doesn't, this is an inconsistent state
// that can occur when a previous bucket deletion partially completed (collection
// deletion failed but directory deletion succeeded, or volumes were recreated).
// Recover by proceeding to create the missing bucket directory.
if collectionExists {
glog.Warningf("PutBucketHandler: collection exists but bucket directory missing for %s, recovering by creating bucket directory", bucket)
}
// Check for x-amz-bucket-object-lock-enabled header BEFORE creating bucket
// This allows us to create the bucket with Object Lock configuration atomically
objectLockEnabled := strings.EqualFold(r.Header.Get(s3_constants.AmzBucketObjectLockEnabled), "true")
// Capture any Object Lock configuration error from within the callback
// The mkdir callback doesn't support returning errors, so we capture it here
var objectLockSetupError error
// Create the folder for bucket with all settings atomically
// This ensures Object Lock configuration is set in the same CreateEntry call,
// preventing race conditions where the bucket exists without Object Lock enabled
if err := s3a.mkdir(s3a.option.BucketsPath, bucket, func(entry *filer_pb.Entry) {
// Set bucket owner
setBucketOwner(r)(entry)
// Set Object Lock configuration atomically during bucket creation
if objectLockEnabled {
glog.V(3).Infof("PutBucketHandler: enabling Object Lock and Versioning for bucket %s atomically", bucket)
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
// Enable versioning (required for Object Lock)
entry.Extended[s3_constants.ExtVersioningKey] = []byte(s3_constants.VersioningEnabled)
// Create and store Object Lock configuration
objectLockConfig := &ObjectLockConfiguration{
ObjectLockEnabled: s3_constants.ObjectLockEnabled,
}
if err := StoreObjectLockConfigurationInExtended(entry, objectLockConfig); err != nil {
glog.Errorf("PutBucketHandler: failed to store Object Lock config for bucket %s: %v", bucket, err)
objectLockSetupError = err
// Note: The entry will still be created, but we'll roll it back below
} else {
glog.V(3).Infof("PutBucketHandler: set ObjectLockConfig for bucket %s: %+v", bucket, objectLockConfig)
}
}
}); err != nil {
// If mkdir failed because another request created the bucket concurrently,
// return BucketAlreadyExists instead of InternalError.
if exist, checkErr := s3a.exists(s3a.option.BucketsPath, bucket, true); checkErr == nil && exist {
glog.V(3).Infof("PutBucketHandler: bucket %s was created concurrently", bucket)
s3err.WriteErrorResponse(w, r, s3err.ErrBucketAlreadyExists)
return
}
glog.Errorf("PutBucketHandler mkdir: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// If Object Lock setup failed, roll back the bucket creation
// This ensures we don't leave a bucket without the requested Object Lock configuration
if objectLockSetupError != nil {
glog.Errorf("PutBucketHandler: rolling back bucket %s creation due to Object Lock setup failure: %v", bucket, objectLockSetupError)
if deleteErr := s3a.rm(s3a.option.BucketsPath, bucket, true, true); deleteErr != nil {
glog.Errorf("PutBucketHandler: failed to rollback bucket %s after Object Lock setup failure: %v", bucket, deleteErr)
}
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Remove bucket from negative cache after successful creation
if s3a.bucketConfigCache != nil {
s3a.bucketConfigCache.RemoveNegativeCache(bucket)
}
w.Header().Set("Location", "/"+bucket)
writeSuccessResponseEmpty(w, r)
}
func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) {
bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("DeleteBucketHandler %s", bucket)
if s3a.isTableBucket(bucket) {
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
}
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
// Check if bucket has object lock enabled
bucketConfig, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
// If object lock is enabled, check for objects with active locks
if bucketConfig.ObjectLockConfig != nil {
hasLockedObjects, checkErr := s3a.hasObjectsWithActiveLocks(r.Context(), bucket)
if checkErr != nil {
glog.Errorf("DeleteBucketHandler: failed to check for locked objects in bucket %s: %v", bucket, checkErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if hasLockedObjects {
glog.V(3).Infof("DeleteBucketHandler: bucket %s has objects with active object locks, cannot delete", bucket)
s3err.WriteErrorResponse(w, r, s3err.ErrBucketNotEmpty)
return
}
}
if !s3a.option.AllowDeleteBucketNotEmpty {
if hasUserObjects, err := s3a.bucketHasUserObjects(bucket); err != nil {
glog.Errorf("failed to list bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
} else if hasUserObjects {
s3err.WriteErrorResponse(w, r, s3err.ErrBucketNotEmpty)
return
}
}
// Delete bucket directory first, then collection. This order ensures that if
// collection deletion fails, the bucket directory is already gone, preventing
// the "collection exists but bucket directory missing" inconsistency that blocks
// bucket recreation. An orphaned collection is harmless and will be cleaned up
// or reused when the bucket is recreated.
err := s3a.rm(s3a.option.BucketsPath, bucket, false, true)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{
Collection: s3a.getCollectionName(bucket),
}
glog.V(1).Infof("delete collection: %v", deleteCollectionRequest)
if _, err := client.DeleteCollection(context.Background(), deleteCollectionRequest); err != nil {
return fmt.Errorf("delete collection %s: %v", bucket, err)
}
return nil
})
if err != nil {
// Log but don't fail — the bucket directory is already removed, so the bucket
// is effectively deleted. The orphaned collection will be cleaned up or reused.
glog.Errorf("DeleteBucketHandler: failed to delete collection for bucket %s: %v", bucket, err)
}
// Clean up bucket-related caches, locks, and metrics after successful deletion
s3a.invalidateBucketConfigCache(bucket)
stats_collect.DeleteBucketMetrics(bucket)
s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
}
// bucketHasUserObjects checks whether a bucket contains any non-special entries.
// Special entries (.uploads, *.versions) are internal to S3 and don't count as user objects.
func (s3a *S3ApiServer) bucketHasUserObjects(bucket string) (bool, error) {
bucketPath := s3a.option.BucketsPath + "/" + bucket
startFrom := ""
// Start with a small batch — most non-empty buckets have a real object early.
// If we only find special entries, switch to larger batches to page through quickly.
limit := uint32(10)
for {
entries, isLast, err := s3a.list(bucketPath, "", startFrom, false, limit)
if err != nil {
return false, err
}
for _, entry := range entries {
if entry.Name != s3_constants.MultipartUploadsFolder &&
!strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) {
return true, nil
}
startFrom = entry.Name
}
if isLast {
return false, nil
}
limit = 1000
}
}
// hasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold
// Delegates to the shared HasObjectsWithActiveLocks function in object_lock_utils.go
func (s3a *S3ApiServer) hasObjectsWithActiveLocks(ctx context.Context, bucket string) (bool, error) {
bucketPath := s3a.option.BucketsPath + "/" + bucket
var hasLocks bool
var checkErr error
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
hasLocks, checkErr = HasObjectsWithActiveLocks(ctx, client, bucketPath)
return checkErr
})
if err != nil {
return false, err
}
return hasLocks, nil
}
func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) {
bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("HeadBucketHandler %s", bucket)
if entry, err := s3a.getBucketEntry(bucket); entry == nil || errors.Is(err, filer_pb.ErrNotFound) {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
writeSuccessResponseEmpty(w, r)
}
func (s3a *S3ApiServer) checkBucket(r *http.Request, bucket string) s3err.ErrorCode {
// Use cached bucket config instead of direct getEntry call (optimization)
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
return errCode
}
//if iam is enabled, the access was already checked before
if s3a.iam.isEnabled() {
return s3err.ErrNone
}
if !s3a.hasAccess(r, config.Entry) {
return s3err.ErrAccessDenied
}
return s3err.ErrNone
}
// ErrAutoCreatePermissionDenied is returned when a user lacks permission to auto-create buckets
var ErrAutoCreatePermissionDenied = errors.New("permission denied - requires Admin permission")
// ErrInvalidBucketName is returned when a bucket name doesn't meet S3 naming requirements
var ErrInvalidBucketName = errors.New("invalid bucket name")
// setBucketOwner creates a function that sets the bucket owner from the request context
func setBucketOwner(r *http.Request) func(entry *filer_pb.Entry) {
currentIdentityId := s3_constants.GetIdentityNameFromContext(r)
return func(entry *filer_pb.Entry) {
if currentIdentityId != "" {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.AmzIdentityId] = []byte(currentIdentityId)
}
}
}
// autoCreateBucket creates a bucket if it doesn't exist, setting the owner from the request context
// Only users with admin permissions are allowed to auto-create buckets
func (s3a *S3ApiServer) autoCreateBucket(r *http.Request, bucket string) error {
// Validate the bucket name before auto-creating
if err := s3bucket.VerifyS3BucketName(bucket); err != nil {
return fmt.Errorf("auto-create bucket %s: %w", bucket, errors.Join(ErrInvalidBucketName, err))
}
// Check if user has admin permissions
if !s3a.isUserAdmin(r) {
return fmt.Errorf("auto-create bucket %s: %w", bucket, ErrAutoCreatePermissionDenied)
}
if err := s3a.mkdir(s3a.option.BucketsPath, bucket, setBucketOwner(r)); err != nil {
// In case of a race condition where another request created the bucket
// in the meantime, check for existence before returning an error.
if exist, err2 := s3a.exists(s3a.option.BucketsPath, bucket, true); err2 != nil {
glog.Warningf("autoCreateBucket: failed to check existence for bucket %s: %v", bucket, err2)
return fmt.Errorf("failed to auto-create bucket %s: %w", bucket, errors.Join(err, err2))
} else if exist {
// The bucket exists, which is fine. However, we should ensure it has an owner.
// If it was created by a concurrent request that didn't set an owner,
// we'll set it here to ensure consistency.
if entry, getErr := s3a.getEntry(s3a.option.BucketsPath, bucket); getErr == nil {
if entry.Extended == nil || len(entry.Extended[s3_constants.AmzIdentityId]) == 0 {
// No owner set, assign current admin as owner
setBucketOwner(r)(entry)
if updateErr := s3a.updateEntry(s3a.option.BucketsPath, entry); updateErr != nil {
glog.Warningf("autoCreateBucket: failed to set owner for existing bucket %s: %v", bucket, updateErr)
} else {
glog.V(1).Infof("Set owner for existing bucket %s (created by concurrent request)", bucket)
}
}
} else {
glog.Warningf("autoCreateBucket: failed to get entry for existing bucket %s: %v", bucket, getErr)
}
// Remove bucket from negative cache — it exists now
if s3a.bucketConfigCache != nil {
s3a.bucketConfigCache.RemoveNegativeCache(bucket)
}
return nil
}
return fmt.Errorf("failed to auto-create bucket %s: %w", bucket, err)
}
// Remove bucket from negative cache after successful creation
if s3a.bucketConfigCache != nil {
s3a.bucketConfigCache.RemoveNegativeCache(bucket)
}
glog.V(1).Infof("Auto-created bucket %s", bucket)
return nil
}
// handleAutoCreateBucket attempts to auto-create a bucket and writes appropriate error responses
// Returns true if the bucket was created successfully or already exists, false if an error was written
func (s3a *S3ApiServer) handleAutoCreateBucket(w http.ResponseWriter, r *http.Request, bucket, handlerName string) bool {
if err := s3a.autoCreateBucket(r, bucket); err != nil {
glog.Warningf("%s: %v", handlerName, err)
// Check for specific errors to return appropriate S3 error codes
if errors.Is(err, ErrInvalidBucketName) {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidBucketName)
} else if errors.Is(err, ErrAutoCreatePermissionDenied) {
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
} else {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
}
return false
}
return true
}
func (s3a *S3ApiServer) hasAccess(r *http.Request, entry *filer_pb.Entry) bool {
// Check if user is properly authenticated as admin through IAM system
if s3a.isUserAdmin(r) {
return true
}
if entry.Extended == nil {
return true
}
// Get authenticated identity from context (secure, cannot be spoofed)
identityId := s3_constants.GetIdentityNameFromContext(r)
if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
if identityId != string(id) {
glog.V(3).Infof("hasAccess: %s != %s (entry.Extended = %v)", identityId, id, entry.Extended)
return false
}
}
return true
}
// isUserAdmin securely checks if the authenticated user is an admin
// This validates admin status through proper IAM authentication, not spoofable headers
func (s3a *S3ApiServer) isUserAdmin(r *http.Request) bool {
// Use a minimal admin action to authenticate and check admin status
adminAction := Action("Admin")
identity, errCode := s3a.iam.authRequest(r, adminAction)
if errCode != s3err.ErrNone {
return false
}
// Check if the authenticated identity has admin privileges
return identity != nil && identity.isAdmin()
}
// isBucketPublicRead checks if a bucket allows anonymous read access based on its cached ACL status
func (s3a *S3ApiServer) isBucketPublicRead(bucket string) bool {
// Get bucket configuration which contains cached public-read status
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
glog.V(4).Infof("isBucketPublicRead: failed to get bucket config for %s: %v", bucket, errCode)
return false
}
glog.V(4).Infof("isBucketPublicRead: bucket=%s, IsPublicRead=%v", bucket, config.IsPublicRead)
// Return the cached public-read status (no JSON parsing needed)
return config.IsPublicRead
}
// isPublicReadGrants checks if the grants allow public read access
func isPublicReadGrants(grants []*s3.Grant) bool {
for _, grant := range grants {
if grant.Grantee != nil && grant.Grantee.URI != nil && grant.Permission != nil {
// Check for AllUsers group with Read permission
if *grant.Grantee.URI == s3_constants.GranteeGroupAllUsers &&
(*grant.Permission == s3_constants.PermissionRead || *grant.Permission == s3_constants.PermissionFullControl) {
return true
}
}
}
return false
}
// buildResourceARN builds a resource ARN from bucket and object
// Used by the policy engine wrapper
func buildResourceARN(bucket, object string) string {
if object == "" || object == "/" {
return fmt.Sprintf("arn:aws:s3:::%s", bucket)
}
// Remove leading slash if present
object = strings.TrimPrefix(object, "/")
return fmt.Sprintf("arn:aws:s3:::%s/%s", bucket, object)
}
// AuthWithPublicRead creates an auth wrapper that allows anonymous access for public-read buckets
func (s3a *S3ApiServer) AuthWithPublicRead(handler http.HandlerFunc, action Action) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
bucket, object := s3_constants.GetBucketAndObject(r)
authType := getRequestAuthType(r)
isAnonymous := authType == authTypeAnonymous
glog.V(4).Infof("AuthWithPublicRead: bucket=%s, object=%s, authType=%v, isAnonymous=%v", bucket, object, authType, isAnonymous)
// For anonymous requests, check if bucket allows public read via ACLs or bucket policies
if isAnonymous {
// First check ACL-based public access
isPublic := s3a.isBucketPublicRead(bucket)
glog.V(4).Infof("AuthWithPublicRead: bucket=%s, isPublicACL=%v", bucket, isPublic)
if isPublic {
glog.V(3).Infof("AuthWithPublicRead: allowing anonymous access to public-read bucket %s (ACL)", bucket)
handler(w, r)
return
}
// Check bucket policy for anonymous access using the policy engine
principal := "*" // Anonymous principal
// Evaluate bucket policy (objectEntry nil - not yet fetched)
allowed, evaluated, err := s3a.policyEngine.EvaluatePolicy(bucket, object, string(action), principal, r, nil, nil)
if err != nil {
// SECURITY: Fail-close on policy evaluation errors
// If we can't evaluate the policy, deny access rather than falling through to IAM
glog.Errorf("AuthWithPublicRead: error evaluating bucket policy for %s/%s: %v - denying access", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
} else if evaluated {
// A bucket policy exists and was evaluated with a matching statement
if allowed {
// Policy explicitly allows anonymous access
glog.V(3).Infof("AuthWithPublicRead: allowing anonymous access to bucket %s (bucket policy)", bucket)
handler(w, r)
return
} else {
// Policy explicitly denies anonymous access
glog.V(3).Infof("AuthWithPublicRead: bucket policy explicitly denies anonymous access to %s/%s", bucket, object)
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
}
}
// No matching policy statement - fall through to check ACLs and then IAM auth
glog.V(3).Infof("AuthWithPublicRead: no bucket policy match for %s, checking ACLs", bucket)
}
// For all authenticated requests and anonymous requests to non-public buckets,
// use normal IAM auth to enforce policies
s3a.iam.Auth(handler, action)(w, r)
}
}
// GetBucketAclHandler Get Bucket ACL
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketAcl.html
func (s3a *S3ApiServer) GetBucketAclHandler(w http.ResponseWriter, r *http.Request) {
// collect parameters
bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("GetBucketAclHandler %s", bucket)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
amzDisplayName := s3a.iam.GetAccountNameById(amzAccountId)
response := AccessControlPolicy{
Owner: CanonicalUser{
ID: amzAccountId,
DisplayName: amzDisplayName,
},
}
response.AccessControlList.Grant = append(response.AccessControlList.Grant, Grant{
Grantee: Grantee{
ID: amzAccountId,
DisplayName: amzDisplayName,
Type: "CanonicalUser",
XMLXSI: "CanonicalUser",
XMLNS: "http://www.w3.org/2001/XMLSchema-instance"},
Permission: s3.PermissionFullControl,
})
writeSuccessResponseXML(w, r, response)
}
// PutBucketAclHandler Put bucket ACL
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketAcl.html //
func (s3a *S3ApiServer) PutBucketAclHandler(w http.ResponseWriter, r *http.Request) {
// collect parameters
bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("PutBucketAclHandler %s", bucket)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
// Get account information for ACL processing
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
// Get bucket ownership settings (these would be used for ownership validation in a full implementation)
bucketOwnership := "" // Default/simplified for now - in a full implementation this would be retrieved from bucket config
bucketOwnerId := amzAccountId // Simplified - bucket owner is current account
// Use the existing ACL parsing logic to handle both canned ACLs and XML body
grants, errCode := ExtractAcl(r, s3a.iam, bucketOwnership, bucketOwnerId, amzAccountId, amzAccountId)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
glog.V(3).Infof("PutBucketAclHandler: bucket=%s, extracted %d grants", bucket, len(grants))
isPublic := isPublicReadGrants(grants)
glog.V(3).Infof("PutBucketAclHandler: bucket=%s, isPublicReadGrants=%v", bucket, isPublic)
// Store the bucket ACL in bucket metadata
errCode = s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
if len(grants) > 0 {
grantsBytes, err := json.Marshal(grants)
if err != nil {
glog.Errorf("PutBucketAclHandler: failed to marshal grants: %v", err)
return err
}
config.ACL = grantsBytes
// Cache the public-read status to avoid JSON parsing on every request
config.IsPublicRead = isPublicReadGrants(grants)
glog.V(4).Infof("PutBucketAclHandler: bucket=%s, setting IsPublicRead=%v", bucket, config.IsPublicRead)
} else {
config.ACL = nil
config.IsPublicRead = false
}
config.Owner = amzAccountId
return nil
})
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
glog.V(3).Infof("PutBucketAclHandler: Successfully stored ACL for bucket %s with %d grants", bucket, len(grants))
// Small delay to ensure ACL propagation across distributed caches
// This prevents race conditions in tests where anonymous access is attempted immediately after ACL change
time.Sleep(50 * time.Millisecond)
writeSuccessResponseEmpty(w, r)
}
// GetBucketLifecycleConfigurationHandler Get Bucket Lifecycle configuration
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLifecycleConfiguration.html
func (s3a *S3ApiServer) GetBucketLifecycleConfigurationHandler(w http.ResponseWriter, r *http.Request) {
// collect parameters
bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("GetBucketLifecycleConfigurationHandler %s", bucket)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
if lifecycleXML, transitionMinimumObjectSize, found, errCode := s3a.getStoredBucketLifecycleConfiguration(bucket); errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
} else if found {
w.Header().Set(bucketLifecycleTransitionMinimumObjectSizeHeader, transitionMinimumObjectSize)
writeSuccessResponseXMLBytes(w, r, lifecycleXML)
return
}
// ReadFilerConfFromFilers provides multi-filer failover
fc, err := filer.ReadFilerConfFromFilers(s3a.option.Filers, s3a.option.GrpcDialOption, nil)
if err != nil {
glog.Errorf("GetBucketLifecycleConfigurationHandler: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
ttls := fc.GetCollectionTtls(s3a.getCollectionName(bucket))
if len(ttls) == 0 {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchLifecycleConfiguration)
return
}
response := Lifecycle{}
// Sort locationPrefixes to ensure consistent ordering of lifecycle rules
var locationPrefixes []string
for locationPrefix := range ttls {
locationPrefixes = append(locationPrefixes, locationPrefix)
}
sort.Strings(locationPrefixes)
for _, locationPrefix := range locationPrefixes {
internalTtl := ttls[locationPrefix]
ttl, _ := needle.ReadTTL(internalTtl)
days := int(ttl.Minutes() / 60 / 24)
if days == 0 {
continue
}
prefix, found := strings.CutPrefix(locationPrefix, fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket))
if !found {
continue
}
response.Rules = append(response.Rules, Rule{
ID: prefix,
Status: Enabled,
Prefix: Prefix{val: prefix, set: true},
Expiration: Expiration{Days: days, set: true},
})
}
if len(response.Rules) > 0 {
w.Header().Set(bucketLifecycleTransitionMinimumObjectSizeHeader, defaultLifecycleTransitionMinimumObjectSize)
}
writeSuccessResponseXML(w, r, response)
}
// resolveLifecycleDefaultsFromFilerConf returns replication and volumeGrowthCount for use when adding a lifecycle TTL rule.
// S3 does not set DataCenter/Rack/DataNode so placement is not pinned to a specific DC/rack.
// Precedence: parent path rule first, then filer global. If volumeGrowthCount is 0 but replication is set,
// use replication's copy count so the rule is valid (volumeGrowthCount must be divisible by copy count).
func resolveLifecycleDefaultsFromFilerConf(fc *filer.FilerConf, filerConfigReplication, bucketsPath, bucket string) (replication string, volumeGrowthCount uint32, err error) {
bucketPath := fmt.Sprintf("%s/%s/", bucketsPath, bucket)
parentRule := fc.MatchStorageRule(bucketPath)
replication = parentRule.Replication
if replication == "" {
replication = filerConfigReplication
}
volumeGrowthCount = parentRule.VolumeGrowthCount
if volumeGrowthCount == 0 && replication != "" {
var rp *super_block.ReplicaPlacement
rp, err = super_block.NewReplicaPlacementFromString(replication)
if err == nil {
volumeGrowthCount = uint32(rp.GetCopyCount())
}
}
return
}
// PutBucketLifecycleConfigurationHandler Put Bucket Lifecycle configuration
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLifecycleConfiguration.html
func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWriter, r *http.Request) {
// collect parameters
bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("PutBucketLifecycleConfigurationHandler %s", bucket)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
r.Body = http.MaxBytesReader(w, r.Body, maxBucketLifecycleConfigurationSize)
lifecycleXML, err := io.ReadAll(r.Body)
if err != nil {
glog.Warningf("PutBucketLifecycleConfigurationHandler read body: %s", err)
var maxBytesErr *http.MaxBytesError
if errors.As(err, &maxBytesErr) {
s3err.WriteErrorResponse(w, r, s3err.ErrEntityTooLarge)
return
}
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
return
}
lifeCycleConfig := Lifecycle{}
if err := xmlDecoder(bytes.NewReader(lifecycleXML), &lifeCycleConfig, int64(len(lifecycleXML))); err != nil {
glog.Warningf("PutBucketLifecycleConfigurationHandler xml decode: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
return
}
fc, err := filer.ReadFilerConfFromFilers(s3a.option.Filers, s3a.option.GrpcDialOption, nil)
if err != nil {
glog.Errorf("PutBucketLifecycleConfigurationHandler read filer config: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Resolve replication so lifecycle rules do not create filer.conf entries with empty replication.
var filerConfigReplication string
if filerErr := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(r.Context(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
}
filerConfigReplication = resp.GetReplication()
return nil
}); filerErr != nil {
glog.V(2).Infof("PutBucketLifecycleConfigurationHandler: could not get filer config: %v", filerErr)
}
defaultReplication, defaultVolumeGrowthCount, err := resolveLifecycleDefaultsFromFilerConf(fc, filerConfigReplication, s3a.option.BucketsPath, bucket)
if err != nil {
glog.Warningf("PutBucketLifecycleConfigurationHandler bucket %s: invalid replication %q: %v", bucket, defaultReplication, err)
}
collectionName := s3a.getCollectionName(bucket)
collectionTtls := fc.GetCollectionTtls(collectionName)
changed := false
for _, rule := range lifeCycleConfig.Rules {
if rule.Status != Enabled {
continue
}
// Reject Transition rules — they require storage class migration
// infrastructure that does not exist yet.
if rule.Transition.set || rule.NoncurrentVersionTransition.set {
s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
return
}
var rulePrefix string
switch {
case rule.Filter.andSet:
rulePrefix = rule.Filter.And.Prefix.val
case rule.Filter.Prefix.set:
rulePrefix = rule.Filter.Prefix.val
case rule.Prefix.set:
rulePrefix = rule.Prefix.val
}
// Only create filer.conf TTL entries for simple Expiration.Days rules
// with prefix-only filters (the fast path handled by RocksDB compaction
// filter). Rules with tag or size filters must be evaluated at scan time
// by the lifecycle worker, because TTL applies to all objects under the
// prefix regardless of tags or size.
if rule.Expiration.Days == 0 {
continue
}
hasTagOrSizeFilter := rule.Filter.tagSet ||
rule.Filter.ObjectSizeGreaterThan > 0 || rule.Filter.ObjectSizeLessThan > 0 ||
(rule.Filter.andSet && (len(rule.Filter.And.Tags) > 0 ||
rule.Filter.And.ObjectSizeGreaterThan > 0 || rule.Filter.And.ObjectSizeLessThan > 0))
if hasTagOrSizeFilter {
continue // evaluated by lifecycle worker at scan time
}
locationPrefix := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, rulePrefix)
locConf := &filer_pb.FilerConf_PathConf{
LocationPrefix: locationPrefix,
Collection: collectionName,
Ttl: fmt.Sprintf("%dd", rule.Expiration.Days),
Replication: defaultReplication,
VolumeGrowthCount: defaultVolumeGrowthCount,
// DataCenter/Rack/DataNode intentionally not set: S3 is not tied to a specific DC/rack,
// requests can hit any filer; setting them would pin placement unnecessarily.
}
if ttl, ok := collectionTtls[locConf.LocationPrefix]; ok && ttl == locConf.Ttl {
continue
}
if err := fc.AddLocationConf(locConf); err != nil {
glog.Errorf("PutBucketLifecycleConfigurationHandler add location config: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
ttlSec := int32((time.Duration(rule.Expiration.Days) * util.LifeCycleInterval).Seconds())
glog.V(2).Infof("Start updating TTL for %s", locationPrefix)
if updErr := s3a.updateEntriesTTL(locationPrefix, ttlSec); updErr != nil {
glog.Errorf("PutBucketLifecycleConfigurationHandler update TTL for %s: %s", locationPrefix, updErr)
} else {
glog.V(2).Infof("Finished updating TTL for %s", locationPrefix)
}
changed = true
}
if changed {
var buf bytes.Buffer
if err := fc.ToText(&buf); err != nil {
glog.Errorf("PutBucketLifecycleConfigurationHandler save config to text: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
}
if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf.Bytes())
}); err != nil {
glog.Errorf("PutBucketLifecycleConfigurationHandler save config inside filer: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
}
if errCode := s3a.storeBucketLifecycleConfiguration(bucket, lifecycleXML, r.Header.Get(bucketLifecycleTransitionMinimumObjectSizeHeader)); errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
writeSuccessResponseEmpty(w, r)
}
// DeleteBucketLifecycleHandler Delete Bucket Lifecycle
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketLifecycle.html
func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *http.Request) {
// collect parameters
bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("DeleteBucketLifecycleHandler %s", bucket)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
fc, err := filer.ReadFilerConfFromFilers(s3a.option.Filers, s3a.option.GrpcDialOption, nil)
if err != nil {
glog.Errorf("DeleteBucketLifecycleHandler read filer config: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
collectionTtls := fc.GetCollectionTtls(s3a.getCollectionName(bucket))
changed := false
for prefix, ttl := range collectionTtls {
bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
if strings.HasPrefix(prefix, bucketPrefix) && strings.HasSuffix(ttl, "d") {
pathConf, found := fc.GetLocationConf(prefix)
if found {
pathConf.Ttl = ""
fc.SetLocationConf(pathConf)
}
changed = true
}
}
if changed {
var buf bytes.Buffer
if err := fc.ToText(&buf); err != nil {
glog.Errorf("DeleteBucketLifecycleHandler save config to text: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
}
if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf.Bytes())
}); err != nil {
glog.Errorf("DeleteBucketLifecycleHandler save config inside filer: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
}
if errCode := s3a.clearStoredBucketLifecycleConfiguration(bucket); errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
}
// GetBucketLocationHandler Get bucket location
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLocation.html
func (s3a *S3ApiServer) GetBucketLocationHandler(w http.ResponseWriter, r *http.Request) {
bucket, _ := s3_constants.GetBucketAndObject(r)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
writeSuccessResponseXML(w, r, CreateBucketConfiguration{})
}
// GetBucketRequestPaymentHandler Get bucket location
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketRequestPayment.html
func (s3a *S3ApiServer) GetBucketRequestPaymentHandler(w http.ResponseWriter, r *http.Request) {
writeSuccessResponseXML(w, r, RequestPaymentConfiguration{Payer: "BucketOwner"})
}
// PutBucketOwnershipControls https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketOwnershipControls.html
func (s3a *S3ApiServer) PutBucketOwnershipControls(w http.ResponseWriter, r *http.Request) {
bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("PutBucketOwnershipControls %s", bucket)
errCode := s3a.checkAccessByOwnership(r, bucket)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
if r.Body == nil || r.Body == http.NoBody {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
return
}
var v s3.OwnershipControls
defer util_http.CloseRequest(r)
err := xmlutil.UnmarshalXML(&v, xml.NewDecoder(r.Body), "")
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
return
}
if len(v.Rules) != 1 {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
return
}
printOwnership := true
ownership := *v.Rules[0].ObjectOwnership
switch ownership {
case s3_constants.OwnershipObjectWriter:
case s3_constants.OwnershipBucketOwnerPreferred:
case s3_constants.OwnershipBucketOwnerEnforced:
printOwnership = false
default:
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
return
}
// Check if ownership needs to be updated
currentOwnership, errCode := s3a.getBucketOwnership(bucket)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
if currentOwnership != ownership {
errCode = s3a.setBucketOwnership(bucket, ownership)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
}
if printOwnership {
result := &s3.PutBucketOwnershipControlsInput{
OwnershipControls: &v,
}
s3err.WriteAwsXMLResponse(w, r, http.StatusOK, result)
} else {
writeSuccessResponseEmpty(w, r)
}
}
// GetBucketOwnershipControls https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketOwnershipControls.html
func (s3a *S3ApiServer) GetBucketOwnershipControls(w http.ResponseWriter, r *http.Request) {
bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("GetBucketOwnershipControls %s", bucket)
errCode := s3a.checkAccessByOwnership(r, bucket)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
// Get ownership using new bucket config system
ownership, errCode := s3a.getBucketOwnership(bucket)
if errCode == s3err.ErrNoSuchBucket {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
} else if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, s3err.OwnershipControlsNotFoundError)
return
}
result := &s3.PutBucketOwnershipControlsInput{
OwnershipControls: &s3.OwnershipControls{
Rules: []*s3.OwnershipControlsRule{
{
ObjectOwnership: &ownership,
},
},
},
}
s3err.WriteAwsXMLResponse(w, r, http.StatusOK, result)
}
// DeleteBucketOwnershipControls https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketOwnershipControls.html
func (s3a *S3ApiServer) DeleteBucketOwnershipControls(w http.ResponseWriter, r *http.Request) {
bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("PutBucketOwnershipControls %s", bucket)
errCode := s3a.checkAccessByOwnership(r, bucket)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
bucketEntry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
_, ok := bucketEntry.Extended[s3_constants.ExtOwnershipKey]
if !ok {
s3err.WriteErrorResponse(w, r, s3err.OwnershipControlsNotFoundError)
return
}
delete(bucketEntry.Extended, s3_constants.ExtOwnershipKey)
err = s3a.updateEntry(s3a.option.BucketsPath, bucketEntry)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
emptyOwnershipControls := &s3.OwnershipControls{
Rules: []*s3.OwnershipControlsRule{},
}
s3err.WriteAwsXMLResponse(w, r, http.StatusOK, emptyOwnershipControls)
}
// GetBucketVersioningHandler Get Bucket Versioning status
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html
func (s3a *S3ApiServer) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("GetBucketVersioning %s", bucket)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
// Get versioning status using new bucket config system
versioningStatus, errCode := s3a.getBucketVersioningStatus(bucket)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
// AWS S3 behavior: If versioning was never configured, don't return Status field
var response *s3.PutBucketVersioningInput
if versioningStatus == "" {
// No versioning configuration - return empty response (no Status field)
response = &s3.PutBucketVersioningInput{
VersioningConfiguration: &s3.VersioningConfiguration{},
}
} else {
// Versioning was explicitly configured - return the status
response = &s3.PutBucketVersioningInput{
VersioningConfiguration: &s3.VersioningConfiguration{
Status: aws.String(versioningStatus),
},
}
}
s3err.WriteAwsXMLResponse(w, r, http.StatusOK, response)
}
// PutBucketVersioningHandler Put bucket Versioning
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html
func (s3a *S3ApiServer) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("PutBucketVersioning %s", bucket)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
if r.Body == nil || r.Body == http.NoBody {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
return
}
var versioningConfig s3.VersioningConfiguration
defer util_http.CloseRequest(r)
err := xmlutil.UnmarshalXML(&versioningConfig, xml.NewDecoder(r.Body), "")
if err != nil {
glog.Warningf("PutBucketVersioningHandler xml decode: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
return
}
if versioningConfig.Status == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
return
}
status := *versioningConfig.Status
if status != s3_constants.VersioningEnabled && status != s3_constants.VersioningSuspended {
glog.Errorf("PutBucketVersioningHandler: invalid status '%s' for bucket %s", status, bucket)
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
return
}
// Check if trying to suspend versioning on a bucket with object lock enabled
if status == s3_constants.VersioningSuspended {
// Get bucket configuration to check for object lock
bucketConfig, errCode := s3a.getBucketConfig(bucket)
if errCode == s3err.ErrNone && bucketConfig.ObjectLockConfig != nil {
// Object lock is enabled, cannot suspend versioning
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidBucketState)
return
}
}
// Update bucket versioning configuration using new bucket config system
if errCode := s3a.setBucketVersioningStatus(bucket, status); errCode != s3err.ErrNone {
glog.Errorf("PutBucketVersioningHandler save config: bucket=%s, status='%s', errCode=%d", bucket, status, errCode)
s3err.WriteErrorResponse(w, r, errCode)
return
}
writeSuccessResponseEmpty(w, r)
}