fix: resolve CORS cache race condition causing stale 404 responses (#8748)
The metadata subscription handler (updateBucketConfigCacheFromEntry) was making a separate RPC call via loadCORSFromBucketContent to load CORS configuration. This created a race window where a slow CreateBucket subscription event could re-cache stale data after PutBucketCors had already cleared the cache, causing subsequent GetBucketCors to return 404 NoSuchCORSConfiguration. Parse CORS directly from the subscription entry's Content field instead of making a separate RPC. Also fix getBucketConfig to parse CORS from the already-fetched entry, eliminating a redundant RPC call. Fix TestCORSCaching to use require.NoError to prevent nil pointer dereference panics when GetBucketCors fails.
This commit is contained in:
@@ -590,17 +590,14 @@ func TestCORSCaching(t *testing.T) {
|
|||||||
Bucket: aws.String(bucketName),
|
Bucket: aws.String(bucketName),
|
||||||
CORSConfiguration: corsConfig1,
|
CORSConfiguration: corsConfig1,
|
||||||
})
|
})
|
||||||
assert.NoError(t, err, "Should be able to put initial CORS configuration")
|
require.NoError(t, err, "Should be able to put initial CORS configuration")
|
||||||
|
|
||||||
// Wait for metadata subscription to update cache
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
// Get the configuration
|
// Get the configuration
|
||||||
getResp1, err := client.GetBucketCors(context.TODO(), &s3.GetBucketCorsInput{
|
getResp1, err := client.GetBucketCors(context.TODO(), &s3.GetBucketCorsInput{
|
||||||
Bucket: aws.String(bucketName),
|
Bucket: aws.String(bucketName),
|
||||||
})
|
})
|
||||||
assert.NoError(t, err, "Should be able to get initial CORS configuration")
|
require.NoError(t, err, "Should be able to get initial CORS configuration")
|
||||||
assert.Len(t, getResp1.CORSRules, 1, "Should have one CORS rule")
|
require.Len(t, getResp1.CORSRules, 1, "Should have one CORS rule")
|
||||||
|
|
||||||
// Update the configuration
|
// Update the configuration
|
||||||
corsConfig2 := &types.CORSConfiguration{
|
corsConfig2 := &types.CORSConfiguration{
|
||||||
@@ -618,17 +615,14 @@ func TestCORSCaching(t *testing.T) {
|
|||||||
Bucket: aws.String(bucketName),
|
Bucket: aws.String(bucketName),
|
||||||
CORSConfiguration: corsConfig2,
|
CORSConfiguration: corsConfig2,
|
||||||
})
|
})
|
||||||
assert.NoError(t, err, "Should be able to update CORS configuration")
|
require.NoError(t, err, "Should be able to update CORS configuration")
|
||||||
|
|
||||||
// Wait for metadata subscription to update cache
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
// Get the updated configuration (should reflect the changes)
|
// Get the updated configuration (should reflect the changes)
|
||||||
getResp2, err := client.GetBucketCors(context.TODO(), &s3.GetBucketCorsInput{
|
getResp2, err := client.GetBucketCors(context.TODO(), &s3.GetBucketCorsInput{
|
||||||
Bucket: aws.String(bucketName),
|
Bucket: aws.String(bucketName),
|
||||||
})
|
})
|
||||||
assert.NoError(t, err, "Should be able to get updated CORS configuration")
|
require.NoError(t, err, "Should be able to get updated CORS configuration")
|
||||||
assert.Len(t, getResp2.CORSRules, 1, "Should have one CORS rule")
|
require.Len(t, getResp2.CORSRules, 1, "Should have one CORS rule")
|
||||||
|
|
||||||
rule := getResp2.CORSRules[0]
|
rule := getResp2.CORSRules[0]
|
||||||
assert.Equal(t, []string{"Content-Type"}, rule.AllowedHeaders, "Should have updated headers")
|
assert.Equal(t, []string{"Content-Type"}, rule.AllowedHeaders, "Should have updated headers")
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package s3api
|
package s3api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -224,14 +223,13 @@ func (s3a *S3ApiServer) updateBucketConfigCacheFromEntry(entry *filer_pb.Entry)
|
|||||||
// Sync bucket policy to the policy engine for evaluation
|
// Sync bucket policy to the policy engine for evaluation
|
||||||
s3a.syncBucketPolicyToEngine(bucket, config.BucketPolicy)
|
s3a.syncBucketPolicyToEngine(bucket, config.BucketPolicy)
|
||||||
|
|
||||||
// Load CORS configuration from bucket directory content
|
// Parse CORS configuration directly from the subscription entry's Content field.
|
||||||
if corsConfig, err := s3a.loadCORSFromBucketContent(bucket); err != nil {
|
// This avoids a separate RPC call that could return stale data when racing with
|
||||||
if !errors.Is(err, filer_pb.ErrNotFound) {
|
// concurrent metadata updates (e.g., PutBucketCors clearing the cache while this
|
||||||
glog.Errorf("updateBucketConfigCacheFromEntry: failed to load CORS configuration for bucket %s: %v", bucket, err)
|
// handler is still processing an older event).
|
||||||
}
|
config.CORS = parseCORSFromEntryContent(entry.Content)
|
||||||
} else {
|
if config.CORS != nil {
|
||||||
config.CORS = corsConfig
|
glog.V(2).Infof("updateBucketConfigCacheFromEntry: parsed CORS config for bucket %s from entry content", bucket)
|
||||||
glog.V(2).Infof("updateBucketConfigCacheFromEntry: loaded CORS config for bucket %s", bucket)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update timestamp
|
// Update timestamp
|
||||||
|
|||||||
@@ -408,18 +408,9 @@ func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.Err
|
|||||||
// Sync bucket policy to the policy engine for evaluation
|
// Sync bucket policy to the policy engine for evaluation
|
||||||
s3a.syncBucketPolicyToEngine(bucket, config.BucketPolicy)
|
s3a.syncBucketPolicyToEngine(bucket, config.BucketPolicy)
|
||||||
|
|
||||||
// Load CORS configuration from bucket directory content
|
// Parse CORS configuration directly from the entry's Content field.
|
||||||
if corsConfig, err := s3a.loadCORSFromBucketContent(bucket); err != nil {
|
// This avoids a redundant RPC call since we already have the entry.
|
||||||
if errors.Is(err, filer_pb.ErrNotFound) {
|
config.CORS = parseCORSFromEntryContent(entry.Content)
|
||||||
// Missing metadata is not an error; fall back cleanly
|
|
||||||
glog.V(2).Infof("CORS metadata not found for bucket %s, falling back to default behavior", bucket)
|
|
||||||
} else {
|
|
||||||
// Log parsing or validation errors
|
|
||||||
glog.Errorf("Failed to load CORS configuration for bucket %s: %v", bucket, err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
config.CORS = corsConfig
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cache the result
|
// Cache the result
|
||||||
s3a.bucketConfigCache.Set(bucket, config)
|
s3a.bucketConfigCache.Set(bucket, config)
|
||||||
@@ -588,15 +579,19 @@ func (s3a *S3ApiServer) setBucketOwnership(bucket, ownership string) s3err.Error
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// loadCORSFromBucketContent loads CORS configuration from bucket directory content
|
// parseCORSFromEntryContent parses CORS configuration directly from an entry's Content field.
|
||||||
func (s3a *S3ApiServer) loadCORSFromBucketContent(bucket string) (*cors.CORSConfiguration, error) {
|
// This avoids a separate RPC call when the entry is already available (e.g., from a
|
||||||
metadata, err := s3a.GetBucketMetadata(bucket)
|
// subscription event or a prior getBucketEntry call).
|
||||||
if err != nil {
|
func parseCORSFromEntryContent(content []byte) *cors.CORSConfiguration {
|
||||||
return nil, err
|
if len(content) == 0 {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
var protoMetadata s3_pb.BucketMetadata
|
||||||
// Note: corsConfig can be nil if no CORS configuration is set, which is valid
|
if err := proto.Unmarshal(content, &protoMetadata); err != nil {
|
||||||
return metadata.CORS, nil
|
glog.Errorf("parseCORSFromEntryContent: failed to unmarshal protobuf metadata: %v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return corsConfigFromProto(protoMetadata.Cors)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getCORSConfiguration retrieves CORS configuration with caching
|
// getCORSConfiguration retrieves CORS configuration with caching
|
||||||
|
|||||||
Reference in New Issue
Block a user