Fix S3 conditional writes with versioning (Issue #8073) (#8080)

* Fix S3 conditional writes with versioning (Issue #8073)

Refactors conditional header checks to properly resolve the latest object version when versioning is enabled. This prevents incorrect validation against non-versioned root objects.

* Add integration test for S3 conditional writes with versioning (Issue #8073)

* Refactor: Propagate internal errors in conditional header checks

- Make resolveObjectEntry return errors from isVersioningConfigured
- Update checkConditionalHeaders checks to return 500 on internal resolve errors

* Refactor: Stricter error handling and test assertions

- Propagate internal errors in checkConditionalHeaders*WithGetter functions
- Enforce strict 412 PreconditionFailed check in integration test

* Perf: Add early return for conditional headers + safety improvements

- Add fast path to skip resolveObjectEntry when no conditional headers present
- Avoids expensive getLatestObjectVersion retries in common case
- Add nil checks before dereferencing pointers in integration test
- Fix grammar in test comments
- Remove duplicate comment in resolveObjectEntry

* Refactor: Use errors.Is for robust ErrNotFound checking

- Update checkConditionalHeaders* to use errors.Is(err, filer_pb.ErrNotFound)
- Update resolveObjectEntry to use errors.Is for wrapped error compatibility
- Remove duplicate comment lines in s3api handlers

* Perf: Optimize resolveObjectEntry for conditional checks

- Refactor getLatestObjectVersion to doGetLatestObjectVersion supporting variable retries
- Use 1-retry path in resolveObjectEntry to avoid exponential backoff latency

* Test: Enhance integration test with content verification

- Verify actual object content equals expected content after successful conditional write
- Add missing io and errors imports to test file

* Refactor: Final refinements based on feedback

- Optimize header validation by passing parsed headers to avoid redundant parsing
- Simplify integration test assertions using require.Error and assert.True
- Fix build errors in s3api handler and test imports

* Test: Use smithy.APIError for robust error code checking

- Replace string-based error checking with structured API error
- Add smithy-go import for AWS SDK v2 error handling

* Test: Use types.PreconditionFailed and handle io.ReadAll error

- Replace smithy.APIError with more specific types.PreconditionFailed
- Add proper error handling for io.ReadAll in content verification

* Refactor: Use combined error checking and add nil guards

- Use smithy.APIError with ErrorCode() for robust error checking
- Add nil guards for entry.Attributes before accessing Mtime
- Prevents potential panics when Attributes is uninitialized
This commit is contained in:
Chris Lu
2026-01-21 16:36:18 -08:00
committed by GitHub
parent 52882aed70
commit 51735e667c
4 changed files with 258 additions and 57 deletions

View File

@@ -398,6 +398,29 @@ func (s3a *S3ApiServer) checkDirectoryObject(bucket, object string) (*filer_pb.E
return dirEntry, true, nil
}
// resolveObjectEntry resolves the object entry for conditional checks,
// handling versioned buckets by resolving the latest version.
func (s3a *S3ApiServer) resolveObjectEntry(bucket, object string) (*filer_pb.Entry, error) {
// Check if versioning is configured
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil && !errors.Is(err, filer_pb.ErrNotFound) {
glog.Errorf("resolveObjectEntry: error checking versioning config for %s: %v", bucket, err)
return nil, err
}
if versioningConfigured {
// For versioned buckets, we must use getLatestObjectVersion to correctly
// find the latest versioned object (in .versions/) or null version.
// Standard getEntry would fail to find objects moved to .versions/.
// Use 1 retry (fast path) for conditional checks to avoid backoff latency.
return s3a.doGetLatestObjectVersion(bucket, object, 1)
}
// For non-versioned buckets, verify directly
bucketDir := s3a.option.BucketsPath + "/" + bucket
return s3a.getEntry(bucketDir, object)
}
// serveDirectoryContent serves the content of a directory object directly
func (s3a *S3ApiServer) serveDirectoryContent(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry) {
// Defensive nil checks - entry and attributes should never be nil, but guard against it

View File

@@ -1621,22 +1621,13 @@ func (s3a *S3ApiServer) etagMatches(headerValue, objectETag string) bool {
return false
}
// checkConditionalHeadersWithGetter is a testable method that accepts a simple EntryGetter
// Uses the production getObjectETag and etagMatches methods to ensure testing of real logic
func (s3a *S3ApiServer) checkConditionalHeadersWithGetter(getter EntryGetter, r *http.Request, bucket, object string) s3err.ErrorCode {
headers, errCode := parseConditionalHeaders(r)
if errCode != s3err.ErrNone {
glog.V(3).Infof("checkConditionalHeaders: Invalid date format")
return errCode
}
// validateConditionalHeaders checks conditional headers against the provided entry
func (s3a *S3ApiServer) validateConditionalHeaders(r *http.Request, headers conditionalHeaders, entry *filer_pb.Entry, bucket, object string) s3err.ErrorCode {
if !headers.isSet {
return s3err.ErrNone
}
// Get object entry for conditional checks.
bucketDir := "/buckets/" + bucket
entry, entryErr := getter.getEntry(bucketDir, object)
objectExists := entryErr == nil
objectExists := entry != nil
// For PUT requests, all specified conditions must be met.
// The evaluation order follows AWS S3 behavior for consistency.
@@ -1644,7 +1635,7 @@ func (s3a *S3ApiServer) checkConditionalHeadersWithGetter(getter EntryGetter, r
// 1. Check If-Match
if headers.ifMatch != "" {
if !objectExists {
glog.V(3).Infof("checkConditionalHeaders: If-Match failed - object %s/%s does not exist", bucket, object)
glog.V(3).Infof("validateConditionalHeaders: If-Match failed - object %s/%s does not exist", bucket, object)
return s3err.ErrPreconditionFailed
}
// If `ifMatch` is "*", the condition is met if the object exists.
@@ -1654,22 +1645,24 @@ func (s3a *S3ApiServer) checkConditionalHeadersWithGetter(getter EntryGetter, r
objectETag := s3a.getObjectETag(entry)
// Use production etagMatches method
if !s3a.etagMatches(headers.ifMatch, objectETag) {
glog.V(3).Infof("checkConditionalHeaders: If-Match failed for object %s/%s - expected ETag %s, got %s", bucket, object, headers.ifMatch, objectETag)
glog.V(3).Infof("validateConditionalHeaders: If-Match failed for object %s/%s - expected ETag %s, got %s", bucket, object, headers.ifMatch, objectETag)
return s3err.ErrPreconditionFailed
}
}
glog.V(3).Infof("checkConditionalHeaders: If-Match passed for object %s/%s", bucket, object)
glog.V(3).Infof("validateConditionalHeaders: If-Match passed for object %s/%s", bucket, object)
}
// 2. Check If-Unmodified-Since
if !headers.ifUnmodifiedSince.IsZero() {
if objectExists {
objectModTime := time.Unix(entry.Attributes.Mtime, 0)
if objectModTime.After(headers.ifUnmodifiedSince) {
glog.V(3).Infof("checkConditionalHeaders: If-Unmodified-Since failed - object modified after %s", r.Header.Get(s3_constants.IfUnmodifiedSince))
return s3err.ErrPreconditionFailed
if entry.Attributes != nil {
objectModTime := time.Unix(entry.Attributes.Mtime, 0)
if objectModTime.After(headers.ifUnmodifiedSince) {
glog.V(3).Infof("validateConditionalHeaders: If-Unmodified-Since failed - object modified after %s", r.Header.Get(s3_constants.IfUnmodifiedSince))
return s3err.ErrPreconditionFailed
}
glog.V(3).Infof("validateConditionalHeaders: If-Unmodified-Since passed - object not modified since %s", r.Header.Get(s3_constants.IfUnmodifiedSince))
}
glog.V(3).Infof("checkConditionalHeaders: If-Unmodified-Since passed - object not modified since %s", r.Header.Get(s3_constants.IfUnmodifiedSince))
}
}
@@ -1677,67 +1670,104 @@ func (s3a *S3ApiServer) checkConditionalHeadersWithGetter(getter EntryGetter, r
if headers.ifNoneMatch != "" {
if objectExists {
if headers.ifNoneMatch == "*" {
glog.V(3).Infof("checkConditionalHeaders: If-None-Match=* failed - object %s/%s exists", bucket, object)
glog.V(3).Infof("validateConditionalHeaders: If-None-Match=* failed - object %s/%s exists", bucket, object)
return s3err.ErrPreconditionFailed
}
// Use production getObjectETag method
objectETag := s3a.getObjectETag(entry)
// Use production etagMatches method
if s3a.etagMatches(headers.ifNoneMatch, objectETag) {
glog.V(3).Infof("checkConditionalHeaders: If-None-Match failed - ETag matches %s", objectETag)
glog.V(3).Infof("validateConditionalHeaders: If-None-Match failed - ETag matches %s", objectETag)
return s3err.ErrPreconditionFailed
}
glog.V(3).Infof("checkConditionalHeaders: If-None-Match passed - ETag %s doesn't match %s", objectETag, headers.ifNoneMatch)
glog.V(3).Infof("validateConditionalHeaders: If-None-Match passed - ETag %s doesn't match %s", objectETag, headers.ifNoneMatch)
} else {
glog.V(3).Infof("checkConditionalHeaders: If-None-Match passed - object %s/%s does not exist", bucket, object)
glog.V(3).Infof("validateConditionalHeaders: If-None-Match passed - object %s/%s does not exist", bucket, object)
}
}
// 4. Check If-Modified-Since
if !headers.ifModifiedSince.IsZero() {
if objectExists {
objectModTime := time.Unix(entry.Attributes.Mtime, 0)
if !objectModTime.After(headers.ifModifiedSince) {
glog.V(3).Infof("checkConditionalHeaders: If-Modified-Since failed - object not modified since %s", r.Header.Get(s3_constants.IfModifiedSince))
return s3err.ErrPreconditionFailed
if entry.Attributes != nil {
objectModTime := time.Unix(entry.Attributes.Mtime, 0)
if !objectModTime.After(headers.ifModifiedSince) {
glog.V(3).Infof("validateConditionalHeaders: If-Modified-Since failed - object not modified since %s", r.Header.Get(s3_constants.IfModifiedSince))
return s3err.ErrPreconditionFailed
}
glog.V(3).Infof("validateConditionalHeaders: If-Modified-Since passed - object modified after %s", r.Header.Get(s3_constants.IfModifiedSince))
}
glog.V(3).Infof("checkConditionalHeaders: If-Modified-Since passed - object modified after %s", r.Header.Get(s3_constants.IfModifiedSince))
}
}
return s3err.ErrNone
}
// checkConditionalHeaders is the production method that uses the S3ApiServer as EntryGetter
func (s3a *S3ApiServer) checkConditionalHeaders(r *http.Request, bucket, object string) s3err.ErrorCode {
return s3a.checkConditionalHeadersWithGetter(s3a, r, bucket, object)
}
// checkConditionalHeadersForReadsWithGetter is a testable method for read operations
// checkConditionalHeadersWithGetter is a testable method that accepts a simple EntryGetter
// Uses the production getObjectETag and etagMatches methods to ensure testing of real logic
func (s3a *S3ApiServer) checkConditionalHeadersForReadsWithGetter(getter EntryGetter, r *http.Request, bucket, object string) ConditionalHeaderResult {
func (s3a *S3ApiServer) checkConditionalHeadersWithGetter(getter EntryGetter, r *http.Request, bucket, object string) s3err.ErrorCode {
headers, errCode := parseConditionalHeaders(r)
if errCode != s3err.ErrNone {
glog.V(3).Infof("checkConditionalHeadersForReads: Invalid date format")
return ConditionalHeaderResult{ErrorCode: errCode}
return errCode
}
if !headers.isSet {
return ConditionalHeaderResult{ErrorCode: s3err.ErrNone}
}
// Get object entry for conditional checks.
bucketDir := "/buckets/" + bucket
entry, entryErr := getter.getEntry(bucketDir, object)
objectExists := entryErr == nil
if entryErr != nil {
if errors.Is(entryErr, filer_pb.ErrNotFound) {
entry = nil
} else {
glog.Errorf("checkConditionalHeadersWithGetter: failed to get entry for %s/%s: %v", bucket, object, entryErr)
return s3err.ErrInternalError
}
}
return s3a.validateConditionalHeaders(r, headers, entry, bucket, object)
}
// checkConditionalHeaders is the production method that uses the S3ApiServer as EntryGetter
func (s3a *S3ApiServer) checkConditionalHeaders(r *http.Request, bucket, object string) s3err.ErrorCode {
// Fast path: if no conditional headers are present, skip object resolution entirely.
// This avoids expensive lookups (especially getLatestObjectVersion retries in versioned buckets)
// for the common case where no conditions are specified.
headers, errCode := parseConditionalHeaders(r)
if errCode != s3err.ErrNone {
return errCode
}
if !headers.isSet {
return s3err.ErrNone
}
// Use resolveObjectEntry to correctly handle versioned objects.
// This ensures we check conditions against the LATEST version, not a null version.
entry, err := s3a.resolveObjectEntry(bucket, object)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
entry = nil
} else {
glog.Errorf("checkConditionalHeaders: error resolving object entry for %s/%s: %v", bucket, object, err)
return s3err.ErrInternalError
}
}
return s3a.validateConditionalHeaders(r, headers, entry, bucket, object)
}
// validateConditionalHeadersForReads checks conditional headers for read operations against the provided entry
func (s3a *S3ApiServer) validateConditionalHeadersForReads(r *http.Request, headers conditionalHeaders, entry *filer_pb.Entry, bucket, object string) ConditionalHeaderResult {
if !headers.isSet {
return ConditionalHeaderResult{ErrorCode: s3err.ErrNone, Entry: entry}
}
objectExists := entry != nil
// If object doesn't exist, fail for If-Match and If-Unmodified-Since
if !objectExists {
if headers.ifMatch != "" {
glog.V(3).Infof("checkConditionalHeadersForReads: If-Match failed - object %s/%s does not exist", bucket, object)
glog.V(3).Infof("validateConditionalHeadersForReads: If-Match failed - object %s/%s does not exist", bucket, object)
return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed, Entry: nil}
}
if !headers.ifUnmodifiedSince.IsZero() {
glog.V(3).Infof("checkConditionalHeadersForReads: If-Unmodified-Since failed - object %s/%s does not exist", bucket, object)
glog.V(3).Infof("validateConditionalHeadersForReads: If-Unmodified-Since failed - object %s/%s does not exist", bucket, object)
return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed, Entry: nil}
}
// If-None-Match and If-Modified-Since succeed when object doesn't exist
@@ -1757,21 +1787,21 @@ func (s3a *S3ApiServer) checkConditionalHeadersForReadsWithGetter(getter EntryGe
objectETag := s3a.getObjectETag(entry)
// Use production etagMatches method
if !s3a.etagMatches(headers.ifMatch, objectETag) {
glog.V(3).Infof("checkConditionalHeadersForReads: If-Match failed for object %s/%s - expected ETag %s, got %s", bucket, object, headers.ifMatch, objectETag)
glog.V(3).Infof("validateConditionalHeadersForReads: If-Match failed for object %s/%s - expected ETag %s, got %s", bucket, object, headers.ifMatch, objectETag)
return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed, Entry: entry}
}
}
glog.V(3).Infof("checkConditionalHeadersForReads: If-Match passed for object %s/%s", bucket, object)
glog.V(3).Infof("validateConditionalHeadersForReads: If-Match passed for object %s/%s", bucket, object)
}
// 2. Check If-Unmodified-Since (412 Precondition Failed if fails)
if !headers.ifUnmodifiedSince.IsZero() {
objectModTime := time.Unix(entry.Attributes.Mtime, 0)
if objectModTime.After(headers.ifUnmodifiedSince) {
glog.V(3).Infof("checkConditionalHeadersForReads: If-Unmodified-Since failed - object modified after %s", r.Header.Get(s3_constants.IfUnmodifiedSince))
glog.V(3).Infof("validateConditionalHeadersForReads: If-Unmodified-Since failed - object modified after %s", r.Header.Get(s3_constants.IfUnmodifiedSince))
return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed, Entry: entry}
}
glog.V(3).Infof("checkConditionalHeadersForReads: If-Unmodified-Since passed - object not modified since %s", r.Header.Get(s3_constants.IfUnmodifiedSince))
glog.V(3).Infof("validateConditionalHeadersForReads: If-Unmodified-Since passed - object not modified since %s", r.Header.Get(s3_constants.IfUnmodifiedSince))
}
// 3. Check If-None-Match (304 Not Modified if fails)
@@ -1780,15 +1810,15 @@ func (s3a *S3ApiServer) checkConditionalHeadersForReadsWithGetter(getter EntryGe
objectETag := s3a.getObjectETag(entry)
if headers.ifNoneMatch == "*" {
glog.V(3).Infof("checkConditionalHeadersForReads: If-None-Match=* failed - object %s/%s exists", bucket, object)
glog.V(3).Infof("validateConditionalHeadersForReads: If-None-Match=* failed - object %s/%s exists", bucket, object)
return ConditionalHeaderResult{ErrorCode: s3err.ErrNotModified, ETag: objectETag, Entry: entry}
}
// Use production etagMatches method
if s3a.etagMatches(headers.ifNoneMatch, objectETag) {
glog.V(3).Infof("checkConditionalHeadersForReads: If-None-Match failed - ETag matches %s", objectETag)
glog.V(3).Infof("validateConditionalHeadersForReads: If-None-Match failed - ETag matches %s", objectETag)
return ConditionalHeaderResult{ErrorCode: s3err.ErrNotModified, ETag: objectETag, Entry: entry}
}
glog.V(3).Infof("checkConditionalHeadersForReads: If-None-Match passed - ETag %s doesn't match %s", objectETag, headers.ifNoneMatch)
glog.V(3).Infof("validateConditionalHeadersForReads: If-None-Match passed - ETag %s doesn't match %s", objectETag, headers.ifNoneMatch)
}
// 4. Check If-Modified-Since (304 Not Modified if fails)
@@ -1797,19 +1827,63 @@ func (s3a *S3ApiServer) checkConditionalHeadersForReadsWithGetter(getter EntryGe
if !objectModTime.After(headers.ifModifiedSince) {
// Use production getObjectETag method
objectETag := s3a.getObjectETag(entry)
glog.V(3).Infof("checkConditionalHeadersForReads: If-Modified-Since failed - object not modified since %s", r.Header.Get(s3_constants.IfModifiedSince))
glog.V(3).Infof("validateConditionalHeadersForReads: If-Modified-Since failed - object not modified since %s", r.Header.Get(s3_constants.IfModifiedSince))
return ConditionalHeaderResult{ErrorCode: s3err.ErrNotModified, ETag: objectETag, Entry: entry}
}
glog.V(3).Infof("checkConditionalHeadersForReads: If-Modified-Since passed - object modified after %s", r.Header.Get(s3_constants.IfModifiedSince))
glog.V(3).Infof("validateConditionalHeadersForReads: If-Modified-Since passed - object modified after %s", r.Header.Get(s3_constants.IfModifiedSince))
}
// Return success with the fetched entry for reuse
return ConditionalHeaderResult{ErrorCode: s3err.ErrNone, Entry: entry}
}
// checkConditionalHeadersForReadsWithGetter is a testable method for read operations
// Uses the production getObjectETag and etagMatches methods to ensure testing of real logic
func (s3a *S3ApiServer) checkConditionalHeadersForReadsWithGetter(getter EntryGetter, r *http.Request, bucket, object string) ConditionalHeaderResult {
headers, errCode := parseConditionalHeaders(r)
if errCode != s3err.ErrNone {
return ConditionalHeaderResult{ErrorCode: errCode}
}
// Get object entry for conditional checks.
bucketDir := "/buckets/" + bucket
entry, entryErr := getter.getEntry(bucketDir, object)
if entryErr != nil {
if errors.Is(entryErr, filer_pb.ErrNotFound) {
entry = nil
} else {
glog.Errorf("checkConditionalHeadersForReadsWithGetter: failed to get entry for %s/%s: %v", bucket, object, entryErr)
return ConditionalHeaderResult{ErrorCode: s3err.ErrInternalError}
}
}
return s3a.validateConditionalHeadersForReads(r, headers, entry, bucket, object)
}
// checkConditionalHeadersForReads is the production method that uses the S3ApiServer as EntryGetter
func (s3a *S3ApiServer) checkConditionalHeadersForReads(r *http.Request, bucket, object string) ConditionalHeaderResult {
return s3a.checkConditionalHeadersForReadsWithGetter(s3a, r, bucket, object)
// Fast path: if no conditional headers are present, skip object resolution entirely.
// This avoids expensive lookups (especially getLatestObjectVersion retries in versioned buckets)
// for the common case where no conditions are specified.
headers, errCode := parseConditionalHeaders(r)
if errCode != s3err.ErrNone {
return ConditionalHeaderResult{ErrorCode: errCode}
}
if !headers.isSet {
return ConditionalHeaderResult{ErrorCode: s3err.ErrNone, Entry: nil}
}
// Use resolveObjectEntry to correctly handle versioned objects.
// This ensures we check conditions against the LATEST version, not a null version.
entry, err := s3a.resolveObjectEntry(bucket, object)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
entry = nil
} else {
glog.Errorf("checkConditionalHeadersForReads: error resolving object entry for %s/%s: %v", bucket, object, err)
return ConditionalHeaderResult{ErrorCode: s3err.ErrInternalError, Entry: nil}
}
}
return s3a.validateConditionalHeadersForReads(r, headers, entry, bucket, object)
}
// deleteOrphanedChunks attempts to delete chunks that were uploaded but whose entry creation failed

View File

@@ -1099,18 +1099,21 @@ func (s3a *S3ApiServer) ListObjectVersionsHandler(w http.ResponseWriter, r *http
// getLatestObjectVersion finds the latest version of an object by reading .versions directory metadata
func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb.Entry, error) {
return s3a.doGetLatestObjectVersion(bucket, object, 8)
}
func (s3a *S3ApiServer) doGetLatestObjectVersion(bucket, object string, maxRetries int) (*filer_pb.Entry, error) {
// Normalize object path to ensure consistency with toFilerPath behavior
normalizedObject := s3_constants.NormalizeObjectKey(object)
bucketDir := s3a.option.BucketsPath + "/" + bucket
versionsObjectPath := normalizedObject + s3_constants.VersionsFolder
glog.V(1).Infof("getLatestObjectVersion: looking for latest version of %s/%s (normalized: %s)", bucket, object, normalizedObject)
glog.V(1).Infof("doGetLatestObjectVersion: looking for latest version of %s/%s (normalized: %s, retries: %d)", bucket, object, normalizedObject, maxRetries)
// Get the .versions directory entry to read latest version metadata with retry logic for filer consistency
var versionsEntry *filer_pb.Entry
var err error
maxRetries := 8
for attempt := 1; attempt <= maxRetries; attempt++ {
versionsEntry, err = s3a.getEntry(bucketDir, versionsObjectPath)
if err == nil {