fix listing objects (#7008)
* fix listing objects * add more list testing * address comments * fix next marker * fix isTruncated in listing * fix tests * address tests * Update s3api_object_handlers_multipart.go * fixes * store json into bucket content, for tagging and cors * switch bucket metadata from json to proto * fix * Update s3api_bucket_config.go * fix test issue * fix test_bucket_listv2_delimiter_prefix * Update cors.go * skip special characters * passing listing * fix test_bucket_list_delimiter_prefix * ok. fix the xsd generated go code now * fix cors tests * fix test * fix test_bucket_list_unordered and test_bucket_listv2_unordered do not accept the allow-unordered and delimiter parameter combination * fix test_bucket_list_objects_anonymous and test_bucket_listv2_objects_anonymous The tests test_bucket_list_objects_anonymous and test_bucket_listv2_objects_anonymous were failing because they try to set bucket ACL to public-read, but SeaweedFS only supported private ACL. Updated PutBucketAclHandler to use the existing ExtractAcl function which already supports all standard S3 canned ACLs Replaced the hardcoded check for only private ACL with proper ACL parsing that handles public-read, public-read-write, authenticated-read, bucket-owner-read, bucket-owner-full-control, etc. Added unit tests to verify all standard canned ACLs are accepted * fix list unordered The test is expecting the error code to be InvalidArgument instead of InvalidRequest * allow anonymous listing( and head, get) * fix test_bucket_list_maxkeys_invalid Invalid values: max-keys=blah → Returns ErrInvalidMaxKeys (HTTP 400) * updating IsPublicRead when parsing acl * more logs * CORS Test Fix * fix test_bucket_list_return_data * default to private * fix test_bucket_list_delimiter_not_skip_special * default no acl * add debug logging * more logs * use basic http client remove logs also * fixes * debug * Update stats.go * debugging * fix anonymous test expectation anonymous user can read, as configured in s3 json.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -9,8 +10,12 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/cors"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||
@@ -23,6 +28,7 @@ type BucketConfig struct {
|
||||
Ownership string
|
||||
ACL []byte
|
||||
Owner string
|
||||
IsPublicRead bool // Cached flag to avoid JSON parsing on every request
|
||||
CORS *cors.CORSConfiguration
|
||||
ObjectLockConfig *ObjectLockConfiguration // Cached parsed Object Lock configuration
|
||||
LastModified time.Time
|
||||
@@ -110,8 +116,9 @@ func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.Err
|
||||
}
|
||||
|
||||
config := &BucketConfig{
|
||||
Name: bucket,
|
||||
Entry: entry,
|
||||
Name: bucket,
|
||||
Entry: entry,
|
||||
IsPublicRead: false, // Explicitly default to false for private buckets
|
||||
}
|
||||
|
||||
// Extract configuration from extended attributes
|
||||
@@ -124,6 +131,11 @@ func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.Err
|
||||
}
|
||||
if acl, exists := entry.Extended[s3_constants.ExtAmzAclKey]; exists {
|
||||
config.ACL = acl
|
||||
// Parse ACL once and cache public-read status
|
||||
config.IsPublicRead = parseAndCachePublicReadStatus(acl)
|
||||
} else {
|
||||
// No ACL means private bucket
|
||||
config.IsPublicRead = false
|
||||
}
|
||||
if owner, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
|
||||
config.Owner = string(owner)
|
||||
@@ -135,8 +147,8 @@ func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.Err
|
||||
}
|
||||
}
|
||||
|
||||
// Load CORS configuration from .s3metadata
|
||||
if corsConfig, err := s3a.loadCORSFromMetadata(bucket); err != nil {
|
||||
// Load CORS configuration from bucket directory content
|
||||
if corsConfig, err := s3a.loadCORSFromBucketContent(bucket); err != nil {
|
||||
if errors.Is(err, filer_pb.ErrNotFound) {
|
||||
// Missing metadata is not an error; fall back cleanly
|
||||
glog.V(2).Infof("CORS metadata not found for bucket %s, falling back to default behavior", bucket)
|
||||
@@ -293,57 +305,15 @@ func (s3a *S3ApiServer) setBucketOwnership(bucket, ownership string) s3err.Error
|
||||
})
|
||||
}
|
||||
|
||||
// loadCORSFromMetadata loads CORS configuration from bucket metadata
|
||||
func (s3a *S3ApiServer) loadCORSFromMetadata(bucket string) (*cors.CORSConfiguration, error) {
|
||||
// Validate bucket name to prevent path traversal attacks
|
||||
if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") ||
|
||||
strings.Contains(bucket, "..") || strings.Contains(bucket, "~") {
|
||||
return nil, fmt.Errorf("invalid bucket name: %s", bucket)
|
||||
}
|
||||
|
||||
// Clean the bucket name further to prevent any potential path traversal
|
||||
bucket = filepath.Clean(bucket)
|
||||
if bucket == "." || bucket == ".." {
|
||||
return nil, fmt.Errorf("invalid bucket name: %s", bucket)
|
||||
}
|
||||
|
||||
bucketMetadataPath := filepath.Join(s3a.option.BucketsPath, bucket, cors.S3MetadataFileName)
|
||||
|
||||
entry, err := s3a.getEntry("", bucketMetadataPath)
|
||||
// loadCORSFromBucketContent loads CORS configuration from bucket directory content
|
||||
func (s3a *S3ApiServer) loadCORSFromBucketContent(bucket string) (*cors.CORSConfiguration, error) {
|
||||
_, corsConfig, err := s3a.getBucketMetadata(bucket)
|
||||
if err != nil {
|
||||
glog.V(3).Infof("loadCORSFromMetadata: error retrieving metadata for bucket %s: %v", bucket, err)
|
||||
return nil, fmt.Errorf("error retrieving CORS metadata for bucket %s: %w", bucket, err)
|
||||
}
|
||||
if entry == nil {
|
||||
glog.V(3).Infof("loadCORSFromMetadata: no metadata entry found for bucket %s", bucket)
|
||||
return nil, fmt.Errorf("no metadata entry found for bucket %s", bucket)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(entry.Content) == 0 {
|
||||
glog.V(3).Infof("loadCORSFromMetadata: empty metadata content for bucket %s", bucket)
|
||||
return nil, fmt.Errorf("no metadata content for bucket %s", bucket)
|
||||
}
|
||||
|
||||
var metadata map[string]json.RawMessage
|
||||
if err := json.Unmarshal(entry.Content, &metadata); err != nil {
|
||||
glog.Errorf("loadCORSFromMetadata: failed to unmarshal metadata for bucket %s: %v", bucket, err)
|
||||
return nil, fmt.Errorf("failed to unmarshal metadata: %w", err)
|
||||
}
|
||||
|
||||
corsData, exists := metadata["cors"]
|
||||
if !exists {
|
||||
glog.V(3).Infof("loadCORSFromMetadata: no CORS configuration found for bucket %s", bucket)
|
||||
return nil, fmt.Errorf("no CORS configuration found")
|
||||
}
|
||||
|
||||
// Directly unmarshal the raw JSON to CORSConfiguration to avoid round-trip allocations
|
||||
var config cors.CORSConfiguration
|
||||
if err := json.Unmarshal(corsData, &config); err != nil {
|
||||
glog.Errorf("loadCORSFromMetadata: failed to unmarshal CORS configuration for bucket %s: %v", bucket, err)
|
||||
return nil, fmt.Errorf("failed to unmarshal CORS configuration: %w", err)
|
||||
}
|
||||
|
||||
return &config, nil
|
||||
// Note: corsConfig can be nil if no CORS configuration is set, which is valid
|
||||
return corsConfig, nil
|
||||
}
|
||||
|
||||
// getCORSConfiguration retrieves CORS configuration with caching
|
||||
@@ -356,50 +326,275 @@ func (s3a *S3ApiServer) getCORSConfiguration(bucket string) (*cors.CORSConfigura
|
||||
return config.CORS, s3err.ErrNone
|
||||
}
|
||||
|
||||
// getCORSStorage returns a CORS storage instance for persistent operations
|
||||
func (s3a *S3ApiServer) getCORSStorage() *cors.Storage {
|
||||
entryGetter := &S3EntryGetter{server: s3a}
|
||||
return cors.NewStorage(s3a, entryGetter, s3a.option.BucketsPath)
|
||||
}
|
||||
|
||||
// updateCORSConfiguration updates CORS configuration and invalidates cache
|
||||
// updateCORSConfiguration updates the CORS configuration for a bucket
|
||||
func (s3a *S3ApiServer) updateCORSConfiguration(bucket string, corsConfig *cors.CORSConfiguration) s3err.ErrorCode {
|
||||
// Update in-memory cache
|
||||
errCode := s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
|
||||
config.CORS = corsConfig
|
||||
return nil
|
||||
})
|
||||
if errCode != s3err.ErrNone {
|
||||
return errCode
|
||||
}
|
||||
|
||||
// Persist to .s3metadata file
|
||||
storage := s3a.getCORSStorage()
|
||||
if err := storage.Store(bucket, corsConfig); err != nil {
|
||||
glog.Errorf("updateCORSConfiguration: failed to persist CORS config to metadata for bucket %s: %v", bucket, err)
|
||||
// Get existing metadata
|
||||
existingTags, _, err := s3a.getBucketMetadata(bucket)
|
||||
if err != nil {
|
||||
glog.Errorf("updateCORSConfiguration: failed to get bucket metadata for bucket %s: %v", bucket, err)
|
||||
return s3err.ErrInternalError
|
||||
}
|
||||
|
||||
// Update CORS configuration
|
||||
updatedCorsConfig := corsConfig
|
||||
|
||||
// Store updated metadata
|
||||
if err := s3a.setBucketMetadata(bucket, existingTags, updatedCorsConfig); err != nil {
|
||||
glog.Errorf("updateCORSConfiguration: failed to persist CORS config to bucket content for bucket %s: %v", bucket, err)
|
||||
return s3err.ErrInternalError
|
||||
}
|
||||
|
||||
// Cache will be updated automatically via metadata subscription
|
||||
return s3err.ErrNone
|
||||
}
|
||||
|
||||
// removeCORSConfiguration removes CORS configuration and invalidates cache
|
||||
// removeCORSConfiguration removes the CORS configuration for a bucket
|
||||
func (s3a *S3ApiServer) removeCORSConfiguration(bucket string) s3err.ErrorCode {
|
||||
// Remove from in-memory cache
|
||||
errCode := s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
|
||||
config.CORS = nil
|
||||
return nil
|
||||
})
|
||||
if errCode != s3err.ErrNone {
|
||||
return errCode
|
||||
}
|
||||
|
||||
// Remove from .s3metadata file
|
||||
storage := s3a.getCORSStorage()
|
||||
if err := storage.Delete(bucket); err != nil {
|
||||
glog.Errorf("removeCORSConfiguration: failed to remove CORS config from metadata for bucket %s: %v", bucket, err)
|
||||
// Get existing metadata
|
||||
existingTags, _, err := s3a.getBucketMetadata(bucket)
|
||||
if err != nil {
|
||||
glog.Errorf("removeCORSConfiguration: failed to get bucket metadata for bucket %s: %v", bucket, err)
|
||||
return s3err.ErrInternalError
|
||||
}
|
||||
|
||||
// Remove CORS configuration
|
||||
var nilCorsConfig *cors.CORSConfiguration = nil
|
||||
|
||||
// Store updated metadata
|
||||
if err := s3a.setBucketMetadata(bucket, existingTags, nilCorsConfig); err != nil {
|
||||
glog.Errorf("removeCORSConfiguration: failed to remove CORS config from bucket content for bucket %s: %v", bucket, err)
|
||||
return s3err.ErrInternalError
|
||||
}
|
||||
|
||||
// Cache will be updated automatically via metadata subscription
|
||||
return s3err.ErrNone
|
||||
}
|
||||
|
||||
// Conversion functions between CORS types and protobuf types
|
||||
|
||||
// corsRuleToProto converts a CORS rule to protobuf format
|
||||
func corsRuleToProto(rule cors.CORSRule) *s3_pb.CORSRule {
|
||||
return &s3_pb.CORSRule{
|
||||
AllowedHeaders: rule.AllowedHeaders,
|
||||
AllowedMethods: rule.AllowedMethods,
|
||||
AllowedOrigins: rule.AllowedOrigins,
|
||||
ExposeHeaders: rule.ExposeHeaders,
|
||||
MaxAgeSeconds: int32(getMaxAgeSecondsValue(rule.MaxAgeSeconds)),
|
||||
Id: rule.ID,
|
||||
}
|
||||
}
|
||||
|
||||
// corsRuleFromProto converts a protobuf CORS rule to standard format
|
||||
func corsRuleFromProto(protoRule *s3_pb.CORSRule) cors.CORSRule {
|
||||
var maxAge *int
|
||||
// Always create the pointer if MaxAgeSeconds is >= 0
|
||||
// This prevents nil pointer dereferences in tests and matches AWS behavior
|
||||
if protoRule.MaxAgeSeconds >= 0 {
|
||||
age := int(protoRule.MaxAgeSeconds)
|
||||
maxAge = &age
|
||||
}
|
||||
// Only leave maxAge as nil if MaxAgeSeconds was explicitly set to a negative value
|
||||
|
||||
return cors.CORSRule{
|
||||
AllowedHeaders: protoRule.AllowedHeaders,
|
||||
AllowedMethods: protoRule.AllowedMethods,
|
||||
AllowedOrigins: protoRule.AllowedOrigins,
|
||||
ExposeHeaders: protoRule.ExposeHeaders,
|
||||
MaxAgeSeconds: maxAge,
|
||||
ID: protoRule.Id,
|
||||
}
|
||||
}
|
||||
|
||||
// corsConfigToProto converts CORS configuration to protobuf format
|
||||
func corsConfigToProto(config *cors.CORSConfiguration) *s3_pb.CORSConfiguration {
|
||||
if config == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
protoRules := make([]*s3_pb.CORSRule, len(config.CORSRules))
|
||||
for i, rule := range config.CORSRules {
|
||||
protoRules[i] = corsRuleToProto(rule)
|
||||
}
|
||||
|
||||
return &s3_pb.CORSConfiguration{
|
||||
CorsRules: protoRules,
|
||||
}
|
||||
}
|
||||
|
||||
// corsConfigFromProto converts protobuf CORS configuration to standard format
|
||||
func corsConfigFromProto(protoConfig *s3_pb.CORSConfiguration) *cors.CORSConfiguration {
|
||||
if protoConfig == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
rules := make([]cors.CORSRule, len(protoConfig.CorsRules))
|
||||
for i, protoRule := range protoConfig.CorsRules {
|
||||
rules[i] = corsRuleFromProto(protoRule)
|
||||
}
|
||||
|
||||
return &cors.CORSConfiguration{
|
||||
CORSRules: rules,
|
||||
}
|
||||
}
|
||||
|
||||
// getMaxAgeSecondsValue safely extracts max age seconds value
|
||||
func getMaxAgeSecondsValue(maxAge *int) int {
|
||||
if maxAge == nil {
|
||||
return 0
|
||||
}
|
||||
return *maxAge
|
||||
}
|
||||
|
||||
// parseAndCachePublicReadStatus parses the ACL and caches the public-read status
|
||||
func parseAndCachePublicReadStatus(acl []byte) bool {
|
||||
var grants []*s3.Grant
|
||||
if err := json.Unmarshal(acl, &grants); err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if any grant gives read permission to "AllUsers" group
|
||||
for _, grant := range grants {
|
||||
if grant.Grantee != nil && grant.Grantee.URI != nil && grant.Permission != nil {
|
||||
// Check for AllUsers group with Read permission
|
||||
if *grant.Grantee.URI == s3_constants.GranteeGroupAllUsers &&
|
||||
(*grant.Permission == s3_constants.PermissionRead || *grant.Permission == s3_constants.PermissionFullControl) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// getBucketMetadata retrieves bucket metadata from bucket directory content using protobuf
|
||||
func (s3a *S3ApiServer) getBucketMetadata(bucket string) (map[string]string, *cors.CORSConfiguration, error) {
|
||||
// Validate bucket name to prevent path traversal attacks
|
||||
if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") ||
|
||||
strings.Contains(bucket, "..") || strings.Contains(bucket, "~") {
|
||||
return nil, nil, fmt.Errorf("invalid bucket name: %s", bucket)
|
||||
}
|
||||
|
||||
// Clean the bucket name further to prevent any potential path traversal
|
||||
bucket = filepath.Clean(bucket)
|
||||
if bucket == "." || bucket == ".." {
|
||||
return nil, nil, fmt.Errorf("invalid bucket name: %s", bucket)
|
||||
}
|
||||
|
||||
// Get bucket directory entry to access its content
|
||||
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err)
|
||||
}
|
||||
if entry == nil {
|
||||
return nil, nil, fmt.Errorf("bucket directory not found %s", bucket)
|
||||
}
|
||||
|
||||
// If no content, return empty metadata
|
||||
if len(entry.Content) == 0 {
|
||||
return make(map[string]string), nil, nil
|
||||
}
|
||||
|
||||
// Unmarshal metadata from protobuf
|
||||
var protoMetadata s3_pb.BucketMetadata
|
||||
if err := proto.Unmarshal(entry.Content, &protoMetadata); err != nil {
|
||||
glog.Errorf("getBucketMetadata: failed to unmarshal protobuf metadata for bucket %s: %v", bucket, err)
|
||||
return make(map[string]string), nil, nil // Return empty metadata on error, don't fail
|
||||
}
|
||||
|
||||
// Convert protobuf CORS to standard CORS
|
||||
corsConfig := corsConfigFromProto(protoMetadata.Cors)
|
||||
|
||||
return protoMetadata.Tags, corsConfig, nil
|
||||
}
|
||||
|
||||
// setBucketMetadata stores bucket metadata in bucket directory content using protobuf
|
||||
func (s3a *S3ApiServer) setBucketMetadata(bucket string, tags map[string]string, corsConfig *cors.CORSConfiguration) error {
|
||||
// Validate bucket name to prevent path traversal attacks
|
||||
if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") ||
|
||||
strings.Contains(bucket, "..") || strings.Contains(bucket, "~") {
|
||||
return fmt.Errorf("invalid bucket name: %s", bucket)
|
||||
}
|
||||
|
||||
// Clean the bucket name further to prevent any potential path traversal
|
||||
bucket = filepath.Clean(bucket)
|
||||
if bucket == "." || bucket == ".." {
|
||||
return fmt.Errorf("invalid bucket name: %s", bucket)
|
||||
}
|
||||
|
||||
// Create protobuf metadata
|
||||
protoMetadata := &s3_pb.BucketMetadata{
|
||||
Tags: tags,
|
||||
Cors: corsConfigToProto(corsConfig),
|
||||
}
|
||||
|
||||
// Marshal metadata to protobuf
|
||||
metadataBytes, err := proto.Marshal(protoMetadata)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal bucket metadata to protobuf: %w", err)
|
||||
}
|
||||
|
||||
// Update the bucket entry with new content
|
||||
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
// Get current bucket entry
|
||||
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err)
|
||||
}
|
||||
if entry == nil {
|
||||
return fmt.Errorf("bucket directory not found %s", bucket)
|
||||
}
|
||||
|
||||
// Update content with metadata
|
||||
entry.Content = metadataBytes
|
||||
|
||||
request := &filer_pb.UpdateEntryRequest{
|
||||
Directory: s3a.option.BucketsPath,
|
||||
Entry: entry,
|
||||
}
|
||||
|
||||
_, err = client.UpdateEntry(context.Background(), request)
|
||||
return err
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// getBucketTags retrieves bucket tags from bucket directory content
|
||||
func (s3a *S3ApiServer) getBucketTags(bucket string) (map[string]string, error) {
|
||||
tags, _, err := s3a.getBucketMetadata(bucket)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(tags) == 0 {
|
||||
return nil, fmt.Errorf("no tags configuration found")
|
||||
}
|
||||
|
||||
return tags, nil
|
||||
}
|
||||
|
||||
// setBucketTags stores bucket tags in bucket directory content
|
||||
func (s3a *S3ApiServer) setBucketTags(bucket string, tags map[string]string) error {
|
||||
// Get existing metadata
|
||||
_, existingCorsConfig, err := s3a.getBucketMetadata(bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Store updated metadata with new tags
|
||||
err = s3a.setBucketMetadata(bucket, tags, existingCorsConfig)
|
||||
return err
|
||||
}
|
||||
|
||||
// deleteBucketTags removes bucket tags from bucket directory content
|
||||
func (s3a *S3ApiServer) deleteBucketTags(bucket string) error {
|
||||
// Get existing metadata
|
||||
_, existingCorsConfig, err := s3a.getBucketMetadata(bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Store updated metadata with empty tags
|
||||
emptyTags := make(map[string]string)
|
||||
err = s3a.setBucketMetadata(bucket, emptyTags, existingCorsConfig)
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user