Files
seaweedFS/weed/s3api/s3api_object_handlers_acl.go
Chris Lu 2f6aa98221 Refactor: Replace removeDuplicateSlashes with NormalizeObjectKey (#7873)
* Replace removeDuplicateSlashes with NormalizeObjectKey

Use s3_constants.NormalizeObjectKey instead of removeDuplicateSlashes in most places
for consistency. NormalizeObjectKey handles both duplicate slash removal and ensures
the path starts with '/', providing more complete normalization.

* Fix double slash issues after NormalizeObjectKey

After using NormalizeObjectKey, object keys have a leading '/'. This commit ensures:
- getVersionedObjectDir strips leading slash before concatenation
- getEntry calls receive names without leading slash
- String concatenation with '/' doesn't create '//' paths

This prevents path construction errors like:
  /buckets/bucket//object  (wrong)
  /buckets/bucket/object   (correct)

* ensure object key leading "/"

* fix compilation

* fix: Strip leading slash from object keys in S3 API responses

After introducing NormalizeObjectKey, all internal object keys have a
leading slash. However, S3 API responses must return keys without
leading slashes to match AWS S3 behavior.

Fixed in three functions:
- addVersion: Strip slash for version list entries
- processRegularFile: Strip slash for regular file entries
- processExplicitDirectory: Strip slash for directory entries

This ensures ListObjectVersions and similar APIs return keys like
'bar' instead of '/bar', matching S3 API specifications.

* fix: Normalize keyMarker for consistent pagination comparison

The S3 API provides keyMarker without a leading slash (e.g., 'object-001'),
but after introducing NormalizeObjectKey, all internal object keys have
leading slashes (e.g., '/object-001').

When comparing keyMarker < normalizedObjectKey in shouldSkipObjectForMarker,
the ASCII value of '/' (47) is less than 'o' (111), causing all objects
to be incorrectly skipped during pagination. This resulted in page 2 and
beyond returning 0 results.

Fix: Normalize the keyMarker when creating versionCollector so comparisons
work correctly with normalized object keys.

Fixes pagination tests:
- TestVersioningPaginationOver1000Versions
- TestVersioningPaginationMultipleObjectsManyVersions

* refactor: Change NormalizeObjectKey to return keys without leading slash

BREAKING STRATEGY CHANGE:
Previously, NormalizeObjectKey added a leading slash to all object keys,
which required stripping it when returning keys to S3 API clients and
caused complexity in marker normalization for pagination.

NEW STRATEGY:
- NormalizeObjectKey now returns keys WITHOUT leading slash (e.g., 'foo/bar' not '/foo/bar')
- This matches the S3 API format directly
- All path concatenations now explicitly add '/' between bucket and object
- No need to strip slashes in responses or normalize markers

Changes:
1. Modified NormalizeObjectKey to strip leading slash instead of adding it
2. Fixed all path concatenations to use:
   - BucketsPath + '/' + bucket + '/' + object
   instead of:
   - BucketsPath + '/' + bucket + object
3. Reverted response key stripping in:
   - addVersion()
   - processRegularFile()
   - processExplicitDirectory()
4. Reverted keyMarker normalization in findVersionsRecursively()
5. Updated matchesPrefixFilter() to work with keys without leading slash
6. Fixed paths in handlers:
   - s3api_object_handlers.go (GetObject, HeadObject, cacheRemoteObjectForStreaming)
   - s3api_object_handlers_postpolicy.go
   - s3api_object_handlers_tagging.go
   - s3api_object_handlers_acl.go
   - s3api_version_id.go (getVersionedObjectDir, getVersionIdFormat)
   - s3api_object_versioning.go (getObjectVersionList, updateLatestVersionAfterDeletion)

All versioning tests pass including pagination stress tests.

* adjust format

* Update post policy tests to match new NormalizeObjectKey behavior

- Update TestPostPolicyKeyNormalization to expect keys without leading slashes
- Update TestNormalizeObjectKey to expect keys without leading slashes
- Update TestPostPolicyFilenameSubstitution to expect keys without leading slashes
- Update path construction in tests to use new pattern: BucketsPath + '/' + bucket + '/' + object

* Fix ListObjectVersions prefix filtering

Remove leading slash addition to prefix parameter to allow correct filtering
of .versions directories when listing object versions with a specific prefix.

The prefix parameter should match entry paths relative to bucket root.
Adding a leading slash was breaking the prefix filter for paginated requests.

Fixes pagination issue where second page returned 0 versions instead of
continuing with remaining versions.

* no leading slash

* Fix urlEscapeObject to add leading slash for filer paths

NormalizeObjectKey now returns keys without leading slashes to match S3 API format.
However, urlEscapeObject is used for filer paths which require leading slashes.
Add leading slash back after normalization to ensure filer paths are correct.

Fixes TestS3ApiServer_toFilerPath test failures.

* adjust tests

* normalize

* Fix: Normalize prefixes and markers in LIST operations using NormalizeObjectKey

Ensure consistent key normalization across all S3 operations (GET, PUT, LIST).
Previously, LIST operations were not applying the same normalization rules
(handling backslashes, duplicate slashes, leading slashes) as GET/PUT operations.

Changes:
- Updated normalizePrefixMarker() to call NormalizeObjectKey for both prefix and marker
- This ensures prefixes with leading slashes, backslashes, or duplicate slashes are
  handled consistently with how object keys are normalized
- Fixes Parquet test failures where pads.write_dataset creates implicit directory
  structures that couldn't be discovered by subsequent LIST operations
- Added TestPrefixNormalizationInList and TestListPrefixConsistency tests

All existing LIST tests continue to pass with the normalization improvements.

* Add debugging logging to LIST operations to track prefix normalization

* Fix: Remove leading slash addition from GetPrefix to work with NormalizeObjectKey

The NormalizeObjectKey function removes leading slashes to match S3 API format
(e.g., 'foo/bar' not '/foo/bar'). However, GetPrefix was adding a leading slash
back, which caused LIST operations to fail with incorrect path handling.

Now GetPrefix only normalizes duplicate slashes without adding a leading slash,
which allows NormalizeObjectKey changes to work correctly for S3 LIST operations.

All Parquet integration tests now pass (20/20).

* Fix: Handle object paths without leading slash in checkDirectoryObject

NormalizeObjectKey() removes the leading slash to match S3 API format.
However, checkDirectoryObject() was assuming the object path has a leading
slash when processing directory markers (paths ending with '/').

Now we ensure the object has a leading slash before processing it for
filer operations.

Fixes implicit directory marker test (explicit_dir/) while keeping
Parquet integration tests passing (20/20).

All tests pass:
- Implicit directory tests: 6/6
- Parquet integration tests: 20/20

* Fix: Handle explicit directory markers with trailing slashes

Explicit directory markers created with put_object(Key='dir/', ...) are stored
in the filer with the trailing slash as part of the name. The checkDirectoryObject()
function now checks for both:
1. Explicit directories: lookup with trailing slash preserved (e.g., 'explicit_dir/')
2. Implicit directories: lookup without trailing slash (e.g., 'implicit_dir')

This ensures both types of directory markers are properly recognized.

All tests pass:
- Implicit directory tests: 6/6 (including explicit directory marker test)
- Parquet integration tests: 20/20

* Fix: Preserve trailing slash in NormalizeObjectKey

NormalizeObjectKey now preserves trailing slashes when normalizing object keys.
This is important for explicit directory markers like 'explicit_dir/' which rely
on the trailing slash to be recognized as directory objects.

The normalization process:
1. Notes if trailing slash was present
2. Removes duplicate slashes and converts backslashes
3. Removes leading slash for S3 API format
4. Restores trailing slash if it was in the original

This ensures explicit directory markers created with put_object(Key='dir/', ...)
are properly normalized and can be looked up by their exact name.

All tests pass:
- Implicit directory tests: 6/6
- Parquet integration tests: 20/20

* clean object

* Fix: Don't restore trailing slash if result is empty

When normalizing paths that are only slashes (e.g., '///', '/'), the function
should return an empty string, not a single slash. The fix ensures we only
restore the trailing slash if the result is non-empty.

This fixes the 'just_slashes' test case:
- Input: '///'
- Expected: ''
- Previous: '/'
- Fixed: ''

All tests now pass:
- Unit tests: TestNormalizeObjectKey (13/13)
- Implicit directory tests: 6/6
- Parquet integration tests: 20/20

* prefixEndsOnDelimiter

* Update s3api_object_handlers_list.go

* Update s3api_object_handlers_list.go

* handle create directory
2025-12-24 19:07:08 -08:00

355 lines
12 KiB
Go

package s3api
import (
"context"
"errors"
"fmt"
"net/http"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
// GetObjectAclHandler Get object ACL
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectAcl.html
func (s3a *S3ApiServer) GetObjectAclHandler(w http.ResponseWriter, r *http.Request) {
// collect parameters
bucket, object := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("GetObjectAclHandler %s %s", bucket, object)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
// Check for specific version ID in query parameters
versionId := r.URL.Query().Get("versionId")
// Check if versioning is configured for the bucket (Enabled or Suspended)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
glog.Errorf("GetObjectAclHandler: Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
var entry *filer_pb.Entry
if versioningConfigured {
// Handle versioned object ACL retrieval - use same logic as GetObjectHandler
if versionId != "" {
// Request for specific version
glog.V(2).Infof("GetObjectAclHandler: requesting ACL for specific version %s of %s/%s", versionId, bucket, object)
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
} else {
// Request for latest version
glog.V(2).Infof("GetObjectAclHandler: requesting ACL for latest version of %s/%s", bucket, object)
entry, err = s3a.getLatestObjectVersion(bucket, object)
}
if err != nil {
glog.Errorf("GetObjectAclHandler: Failed to get object version %s for %s/%s: %v", versionId, bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
// Check if this is a delete marker
if entry.Extended != nil {
if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
}
} else {
// Handle regular (non-versioned) object ACL retrieval
entry, err = s3a.fetchObjectEntryRequired(bucket, object)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
glog.Errorf("GetObjectAclHandler: error checking object %s/%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
}
if entry == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
// Get object owner from metadata, fallback to request account
var objectOwner string
var objectOwnerDisplayName string
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if entry.Extended != nil {
if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
objectOwner = string(ownerBytes)
}
}
// Fallback to current account if no owner stored
if objectOwner == "" {
objectOwner = amzAccountId
}
objectOwnerDisplayName = s3a.iam.GetAccountNameById(objectOwner)
// Build ACL response
response := AccessControlPolicy{
Owner: CanonicalUser{
ID: objectOwner,
DisplayName: objectOwnerDisplayName,
},
}
// Get grants from stored ACL metadata
grants := GetAcpGrants(entry.Extended)
if len(grants) > 0 {
// Convert AWS SDK grants to local Grant format
for _, grant := range grants {
localGrant := Grant{
Permission: Permission(*grant.Permission),
}
if grant.Grantee != nil {
localGrant.Grantee = Grantee{
Type: *grant.Grantee.Type,
XMLXSI: "CanonicalUser",
XMLNS: "http://www.w3.org/2001/XMLSchema-instance",
}
if grant.Grantee.ID != nil {
localGrant.Grantee.ID = *grant.Grantee.ID
localGrant.Grantee.DisplayName = s3a.iam.GetAccountNameById(*grant.Grantee.ID)
}
if grant.Grantee.URI != nil {
localGrant.Grantee.URI = *grant.Grantee.URI
}
}
response.AccessControlList.Grant = append(response.AccessControlList.Grant, localGrant)
}
} else {
// Fallback to default full control for object owner
response.AccessControlList.Grant = append(response.AccessControlList.Grant, Grant{
Grantee: Grantee{
ID: objectOwner,
DisplayName: objectOwnerDisplayName,
Type: "CanonicalUser",
XMLXSI: "CanonicalUser",
XMLNS: "http://www.w3.org/2001/XMLSchema-instance"},
Permission: Permission(s3_constants.PermissionFullControl),
})
}
writeSuccessResponseXML(w, r, response)
}
// PutObjectAclHandler Put object ACL
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectAcl.html
func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Request) {
// collect parameters
bucket, object := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("PutObjectAclHandler %s %s", bucket, object)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
// Check for specific version ID in query parameters
versionId := r.URL.Query().Get("versionId")
// Check if versioning is configured for the bucket (Enabled or Suspended)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
glog.Errorf("PutObjectAclHandler: Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
var entry *filer_pb.Entry
if versioningConfigured {
// Handle versioned object ACL modification - use same logic as GetObjectHandler
if versionId != "" {
// Request for specific version
glog.V(2).Infof("PutObjectAclHandler: modifying ACL for specific version %s of %s/%s", versionId, bucket, object)
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
} else {
// Request for latest version
glog.V(2).Infof("PutObjectAclHandler: modifying ACL for latest version of %s/%s", bucket, object)
entry, err = s3a.getLatestObjectVersion(bucket, object)
}
if err != nil {
glog.Errorf("PutObjectAclHandler: Failed to get object version %s for %s/%s: %v", versionId, bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
// Check if this is a delete marker
if entry.Extended != nil {
if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
}
} else {
// Handle regular (non-versioned) object ACL modification
entry, err = s3a.fetchObjectEntryRequired(bucket, object)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
glog.Errorf("PutObjectAclHandler: error checking object %s/%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
}
if entry == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
// Get current object owner from metadata
var objectOwner string
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if entry.Extended != nil {
if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
objectOwner = string(ownerBytes)
}
}
// Fallback to current account if no owner stored
if objectOwner == "" {
objectOwner = amzAccountId
}
// **PERMISSION CHECKS**
// 1. Check if user is admin (admins can modify any ACL)
if !s3a.isUserAdmin(r) {
// 2. Check object ownership - only object owner can modify ACL (unless admin)
if objectOwner != amzAccountId {
glog.V(3).Infof("PutObjectAclHandler: Access denied - user %s is not owner of object %s/%s (owner: %s)",
amzAccountId, bucket, object, objectOwner)
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
}
// 3. Check object-level WRITE_ACP permission
// Create the specific action for this object
writeAcpAction := Action(fmt.Sprintf("WriteAcp:%s/%s", bucket, object))
identity, errCode := s3a.iam.authRequest(r, writeAcpAction)
if errCode != s3err.ErrNone {
glog.V(3).Infof("PutObjectAclHandler: Auth failed for WriteAcp action on %s/%s: %v", bucket, object, errCode)
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
}
// 4. Verify the authenticated identity can perform WriteAcp on this specific object
if identity == nil || !identity.canDo(writeAcpAction, bucket, object) {
glog.V(3).Infof("PutObjectAclHandler: Identity %v cannot perform WriteAcp on %s/%s", identity, bucket, object)
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
}
} else {
glog.V(3).Infof("PutObjectAclHandler: Admin user %s granted ACL modification permission for %s/%s", amzAccountId, bucket, object)
}
// Get bucket config for ownership settings
bucketConfig, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
bucketOwnership := bucketConfig.Ownership
bucketOwnerId := bucketConfig.Owner
// Extract ACL from request (either canned ACL or XML body)
// This function also validates that the owner in the request matches the object owner
grants, errCode := ExtractAcl(r, s3a.iam, bucketOwnership, bucketOwnerId, objectOwner, amzAccountId)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
// Store ACL in object metadata
if errCode := AssembleEntryWithAcp(entry, objectOwner, grants); errCode != s3err.ErrNone {
glog.Errorf("PutObjectAclHandler: failed to assemble entry with ACP: %v", errCode)
s3err.WriteErrorResponse(w, r, errCode)
return
}
// Calculate the correct directory for ACL update
var updateDirectory string
if versioningConfigured {
if versionId != "" && versionId != "null" {
// Versioned object - update the specific version file in .versions directory
updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + s3_constants.VersionsFolder
} else {
// Latest version in versioned bucket - could be null version or versioned object
// Extract version ID from the entry to determine where it's stored
var actualVersionId string
if entry.Extended != nil {
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
actualVersionId = string(versionIdBytes)
}
}
if actualVersionId == "null" || actualVersionId == "" {
// Null version (pre-versioning object) - stored as regular file
updateDirectory = s3a.option.BucketsPath + "/" + bucket
} else {
// Versioned object - stored in .versions directory
updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + s3_constants.VersionsFolder
}
}
} else {
// Non-versioned object - stored as regular file
updateDirectory = s3a.option.BucketsPath + "/" + bucket
}
// Update the object with new ACL metadata
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
Directory: updateDirectory,
Entry: entry,
}
if _, err := client.UpdateEntry(context.Background(), request); err != nil {
return err
}
return nil
})
if err != nil {
glog.Errorf("PutObjectAclHandler: failed to update entry: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
glog.V(3).Infof("PutObjectAclHandler: Successfully updated ACL for %s/%s by user %s", bucket, object, amzAccountId)
writeSuccessResponseEmpty(w, r)
}