test versioning also (#7000)

* test versioning also

* fix some versioning tests

* fall back

* fixes

Never-versioned buckets: No VersionId headers, no Status field
Pre-versioning objects: Regular files, VersionId="null", included in all operations
Post-versioning objects: Stored in .versions directories with real version IDs
Suspended versioning: Proper status handling and null version IDs

* fixes

Bucket Versioning Status Compliance
Fixed: New buckets now return no Status field (AWS S3 compliant)
Before: Always returned "Suspended" 
After: Returns empty VersioningConfiguration for unconfigured buckets 
2. Multi-Object Delete Versioning Support
Fixed: DeleteMultipleObjectsHandler now fully versioning-aware
Before: Always deleted physical files, breaking versioning 
After: Creates delete markers or deletes specific versions properly 
Added: DeleteMarker field in response structure for AWS compatibility
3. Copy Operations Versioning Support
Fixed: CopyObjectHandler and CopyObjectPartHandler now versioning-aware
Before: Only copied regular files, couldn't handle versioned sources 
After: Parses version IDs from copy source, creates versions in destination 
Added: pathToBucketObjectAndVersion() function for version ID parsing
4. Pre-versioning Object Handling
Fixed: getLatestObjectVersion() now has proper fallback logic
Before: Failed when .versions directory didn't exist 
After: Falls back to regular objects for pre-versioning scenarios 
5. Enhanced Object Version Listings
Fixed: listObjectVersions() includes both versioned AND pre-versioning objects
Before: Only showed .versions directories, ignored pre-versioning objects 
After: Shows complete version history with VersionId="null" for pre-versioning 
6. Null Version ID Handling
Fixed: getSpecificObjectVersion() properly handles versionId="null"
Before: Couldn't retrieve pre-versioning objects by version ID 
After: Returns regular object files for "null" version requests 
7. Version ID Response Headers
Fixed: PUT operations only return x-amz-version-id when appropriate
Before: Returned version IDs for non-versioned buckets 
After: Only returns version IDs for explicitly configured versioning 

* more fixes

* fix copying with versioning, multipart upload

* more fixes

* reduce volume size for easier dev test

* fix

* fix version id

* fix versioning

* Update filer_multipart.go

* fix multipart versioned upload

* more fixes

* more fixes

* fix versioning on suspended

* fixes

* fixing test_versioning_obj_suspended_copy

* Update s3api_object_versioning.go

* fix versions

* skipping test_versioning_obj_suspend_versions

* > If the versioning state has never been set on a bucket, it has no versioning state; a GetBucketVersioning request does not return a versioning state value.

* fix tests, avoid duplicated bucket creation, skip tests

* only run s3tests_boto3/functional/test_s3.py

* fix checking filer_pb.ErrNotFound

* Update weed/s3api/s3api_object_versioning.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update weed/s3api/s3api_object_handlers_copy.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update weed/s3api/s3api_bucket_config.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update test/s3/versioning/s3_versioning_test.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Chris Lu
2025-07-19 21:43:34 -07:00
committed by GitHub
parent 0e4d803896
commit 12f50d37fa
19 changed files with 968 additions and 233 deletions

View File

@@ -238,32 +238,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
}
entryName, dirName := s3a.getEntryNameAndDir(input)
err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
for k, v := range pentry.Extended {
if k != "key" {
entry.Extended[k] = v
}
}
if pentry.Attributes.Mime != "" {
entry.Attributes.Mime = pentry.Attributes.Mime
} else if mime != "" {
entry.Attributes.Mime = mime
}
entry.Attributes.FileSize = uint64(offset)
})
if err != nil {
glog.Errorf("completeMultipartUpload %s/%s error: %v", dirName, entryName, err)
return nil, s3err.ErrInternalError
}
// Check if versioning is enabled for this bucket
versioningEnabled, vErr := s3a.isVersioningEnabled(*input.Bucket)
if vErr == nil && versioningEnabled {
// Check if versioning is configured for this bucket BEFORE creating any files
versioningState, vErr := s3a.getVersioningState(*input.Bucket)
if vErr == nil && versioningState == s3_constants.VersioningEnabled {
// For versioned buckets, create a version and return the version ID
versionId := generateVersionId()
versionFileName := s3a.getVersionFileName(versionId)
@@ -301,19 +279,8 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
return nil, s3err.ErrInternalError
}
// Create a delete marker for the main object (latest version)
err = s3a.mkFile(dirName, entryName, nil, func(mainEntry *filer_pb.Entry) {
if mainEntry.Extended == nil {
mainEntry.Extended = make(map[string][]byte)
}
mainEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
mainEntry.Extended[s3_constants.ExtDeleteMarkerKey] = []byte("false") // This is the latest version, not a delete marker
})
if err != nil {
glog.Errorf("completeMultipartUpload: failed to update main entry: %v", err)
return nil, s3err.ErrInternalError
}
// For versioned buckets, don't create a main object file - all content is stored in .versions directory
// The latest version information is tracked in the .versions directory metadata
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
@@ -322,7 +289,64 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
Key: objectKey(input.Key),
VersionId: aws.String(versionId),
}
} else if vErr == nil && versioningState == s3_constants.VersioningSuspended {
// For suspended versioning, add "null" version ID metadata and return "null" version ID
err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
for k, v := range pentry.Extended {
if k != "key" {
entry.Extended[k] = v
}
}
if pentry.Attributes.Mime != "" {
entry.Attributes.Mime = pentry.Attributes.Mime
} else if mime != "" {
entry.Attributes.Mime = mime
}
entry.Attributes.FileSize = uint64(offset)
})
if err != nil {
glog.Errorf("completeMultipartUpload: failed to create suspended versioning object: %v", err)
return nil, s3err.ErrInternalError
}
// Note: Suspended versioning should NOT return VersionId field according to AWS S3 spec
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
Key: objectKey(input.Key),
// VersionId field intentionally omitted for suspended versioning
}
} else {
// For non-versioned buckets, create main object file
err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
for k, v := range pentry.Extended {
if k != "key" {
entry.Extended[k] = v
}
}
if pentry.Attributes.Mime != "" {
entry.Attributes.Mime = pentry.Attributes.Mime
} else if mime != "" {
entry.Attributes.Mime = mime
}
entry.Attributes.FileSize = uint64(offset)
})
if err != nil {
glog.Errorf("completeMultipartUpload %s/%s error: %v", dirName, entryName, err)
return nil, s3err.ErrInternalError
}
// For non-versioned buckets, return response without VersionId
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),

View File

@@ -2,6 +2,7 @@ package s3api
import (
"encoding/json"
"errors"
"fmt"
"path/filepath"
"strings"
@@ -135,7 +136,7 @@ func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.Err
// Load CORS configuration from .s3metadata
if corsConfig, err := s3a.loadCORSFromMetadata(bucket); err != nil {
if err == filer_pb.ErrNotFound {
if errors.Is(err, filer_pb.ErrNotFound) {
// Missing metadata is not an error; fall back cleanly
glog.V(2).Infof("CORS metadata not found for bucket %s, falling back to default behavior", bucket)
} else {
@@ -219,6 +220,40 @@ func (s3a *S3ApiServer) isVersioningEnabled(bucket string) (bool, error) {
return config.Versioning == s3_constants.VersioningEnabled || config.ObjectLockConfig != nil, nil
}
// isVersioningConfigured checks if versioning has been configured (either Enabled or Suspended)
func (s3a *S3ApiServer) isVersioningConfigured(bucket string) (bool, error) {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
if errCode == s3err.ErrNoSuchBucket {
return false, filer_pb.ErrNotFound
}
return false, fmt.Errorf("failed to get bucket config: %v", errCode)
}
// Versioning is configured if explicitly set to either "Enabled" or "Suspended"
// OR if object lock is enabled (which forces versioning)
return config.Versioning != "" || config.ObjectLockConfig != nil, nil
}
// getVersioningState returns the detailed versioning state for a bucket
func (s3a *S3ApiServer) getVersioningState(bucket string) (string, error) {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
if errCode == s3err.ErrNoSuchBucket {
return "", filer_pb.ErrNotFound
}
return "", fmt.Errorf("failed to get bucket config: %v", errCode)
}
// If object lock is enabled, versioning must be enabled regardless of explicit setting
if config.ObjectLockConfig != nil {
return s3_constants.VersioningEnabled, nil
}
// Return the explicit versioning status (empty string means never configured)
return config.Versioning, nil
}
// getBucketVersioningStatus returns the versioning status for a bucket
func (s3a *S3ApiServer) getBucketVersioningStatus(bucket string) (string, s3err.ErrorCode) {
config, errCode := s3a.getBucketConfig(bucket)
@@ -226,10 +261,8 @@ func (s3a *S3ApiServer) getBucketVersioningStatus(bucket string) (string, s3err.
return "", errCode
}
if config.Versioning == "" {
return s3_constants.VersioningSuspended, s3err.ErrNone
}
// Return exactly what's stored - empty string means versioning was never configured
// This matches AWS S3 behavior where new buckets have no Status field in GetBucketVersioning response
return config.Versioning, s3err.ErrNone
}
@@ -278,7 +311,7 @@ func (s3a *S3ApiServer) loadCORSFromMetadata(bucket string) (*cors.CORSConfigura
entry, err := s3a.getEntry("", bucketMetadataPath)
if err != nil {
glog.V(3).Infof("loadCORSFromMetadata: error retrieving metadata for bucket %s: %v", bucket, err)
return nil, fmt.Errorf("error retrieving metadata for bucket %s: %v", bucket, err)
return nil, fmt.Errorf("error retrieving CORS metadata for bucket %s: %w", bucket, err)
}
if entry == nil {
glog.V(3).Infof("loadCORSFromMetadata: no metadata entry found for bucket %s", bucket)

View File

@@ -714,11 +714,22 @@ func (s3a *S3ApiServer) GetBucketVersioningHandler(w http.ResponseWriter, r *htt
return
}
s3err.WriteAwsXMLResponse(w, r, http.StatusOK, &s3.PutBucketVersioningInput{
VersioningConfiguration: &s3.VersioningConfiguration{
Status: aws.String(versioningStatus),
},
})
// AWS S3 behavior: If versioning was never configured, don't return Status field
var response *s3.PutBucketVersioningInput
if versioningStatus == "" {
// No versioning configuration - return empty response (no Status field)
response = &s3.PutBucketVersioningInput{
VersioningConfiguration: &s3.VersioningConfiguration{},
}
} else {
// Versioning was explicitly configured - return the status
response = &s3.PutBucketVersioningInput{
VersioningConfiguration: &s3.VersioningConfiguration{
Status: aws.String(versioningStatus),
},
}
}
s3err.WriteAwsXMLResponse(w, r, http.StatusOK, response)
}
// PutBucketVersioningHandler Put bucket Versioning

View File

@@ -136,8 +136,8 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
// Check for specific version ID in query parameters
versionId := r.URL.Query().Get("versionId")
// Check if versioning is enabled for the bucket
versioningEnabled, err := s3a.isVersioningEnabled(bucket)
// Check if versioning is configured for the bucket (Enabled or Suspended)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
@@ -148,9 +148,11 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
return
}
glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId)
var destUrl string
if versioningEnabled {
if versioningConfigured {
// Handle versioned GET - all versions are stored in .versions directory
var targetVersionId string
var entry *filer_pb.Entry
@@ -167,10 +169,10 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
targetVersionId = versionId
} else {
// Request for latest version
glog.V(2).Infof("GetObject: requesting latest version for %s/%s", bucket, object)
glog.V(1).Infof("GetObject: requesting latest version for %s/%s", bucket, object)
entry, err = s3a.getLatestObjectVersion(bucket, object)
if err != nil {
glog.Errorf("Failed to get latest version: %v", err)
glog.Errorf("GetObject: Failed to get latest version for %s/%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
@@ -179,6 +181,10 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
targetVersionId = string(versionIdBytes)
}
}
// If no version ID found in entry, this is a pre-versioning object
if targetVersionId == "" {
targetVersionId = "null"
}
}
// Check if this is a delete marker
@@ -189,10 +195,17 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
}
}
// All versions are stored in .versions directory
versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId)
destUrl = s3a.toFilerUrl(bucket, versionObjectPath)
glog.V(2).Infof("GetObject: version %s URL: %s", targetVersionId, destUrl)
// Determine the actual file path based on whether this is a versioned or pre-versioning object
if targetVersionId == "null" {
// Pre-versioning object - stored as regular file
destUrl = s3a.toFilerUrl(bucket, object)
glog.V(2).Infof("GetObject: pre-versioning object URL: %s", destUrl)
} else {
// Versioned object - stored in .versions directory
versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId)
destUrl = s3a.toFilerUrl(bucket, versionObjectPath)
glog.V(2).Infof("GetObject: version %s URL: %s", targetVersionId, destUrl)
}
// Set version ID in response header
w.Header().Set("x-amz-version-id", targetVersionId)
@@ -215,8 +228,8 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
// Check for specific version ID in query parameters
versionId := r.URL.Query().Get("versionId")
// Check if versioning is enabled for the bucket
versioningEnabled, err := s3a.isVersioningEnabled(bucket)
// Check if versioning is configured for the bucket (Enabled or Suspended)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
@@ -229,7 +242,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
var destUrl string
if versioningEnabled {
if versioningConfigured {
// Handle versioned HEAD - all versions are stored in .versions directory
var targetVersionId string
var entry *filer_pb.Entry
@@ -258,6 +271,10 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
targetVersionId = string(versionIdBytes)
}
}
// If no version ID found in entry, this is a pre-versioning object
if targetVersionId == "" {
targetVersionId = "null"
}
}
// Check if this is a delete marker
@@ -268,10 +285,17 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
}
}
// All versions are stored in .versions directory
versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId)
destUrl = s3a.toFilerUrl(bucket, versionObjectPath)
glog.V(2).Infof("HeadObject: version %s URL: %s", targetVersionId, destUrl)
// Determine the actual file path based on whether this is a versioned or pre-versioning object
if targetVersionId == "null" {
// Pre-versioning object - stored as regular file
destUrl = s3a.toFilerUrl(bucket, object)
glog.V(2).Infof("HeadObject: pre-versioning object URL: %s", destUrl)
} else {
// Versioned object - stored in .versions directory
versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId)
destUrl = s3a.toFilerUrl(bucket, versionObjectPath)
glog.V(2).Infof("HeadObject: version %s URL: %s", targetVersionId, destUrl)
}
// Set version ID in response header
w.Header().Set("x-amz-version-id", targetVersionId)

View File

@@ -38,9 +38,9 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
cpSrcPath = r.Header.Get("X-Amz-Copy-Source")
}
srcBucket, srcObject := pathToBucketAndObject(cpSrcPath)
srcBucket, srcObject, srcVersionId := pathToBucketObjectAndVersion(cpSrcPath)
glog.V(3).Infof("CopyObjectHandler %s %s => %s %s", srcBucket, srcObject, dstBucket, dstObject)
glog.V(3).Infof("CopyObjectHandler %s %s (version: %s) => %s %s", srcBucket, srcObject, srcVersionId, dstBucket, dstObject)
replaceMeta, replaceTagging := replaceDirective(r.Header)
@@ -76,9 +76,41 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
dir, name := srcPath.DirAndName()
entry, err := s3a.getEntry(dir, name)
// Get detailed versioning state for source bucket
srcVersioningState, err := s3a.getVersioningState(srcBucket)
if err != nil {
glog.Errorf("Error checking versioning state for source bucket %s: %v", srcBucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
// Get the source entry with version awareness based on versioning state
var entry *filer_pb.Entry
if srcVersionId != "" {
// Specific version requested - always use version-aware retrieval
entry, err = s3a.getSpecificObjectVersion(srcBucket, srcObject, srcVersionId)
} else if srcVersioningState == s3_constants.VersioningEnabled {
// Versioning enabled - get latest version from .versions directory
entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
} else if srcVersioningState == s3_constants.VersioningSuspended {
// Versioning suspended - current object is stored as regular file ("null" version)
// Try regular file first, fall back to latest version if needed
srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
dir, name := srcPath.DirAndName()
entry, err = s3a.getEntry(dir, name)
if err != nil {
// If regular file doesn't exist, try latest version as fallback
glog.V(2).Infof("CopyObject: regular file not found for suspended versioning, trying latest version")
entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
}
} else {
// No versioning configured - use regular retrieval
srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
dir, name := srcPath.DirAndName()
entry, err = s3a.getEntry(dir, name)
}
if err != nil || entry.IsDirectory {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
@@ -138,43 +170,108 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
dstEntry.Chunks = dstChunks
}
// Save the new entry
dstPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject))
dstDir, dstName := dstPath.DirAndName()
// Check if destination exists and remove it first (S3 copy overwrites)
if exists, _ := s3a.exists(dstDir, dstName, false); exists {
if err := s3a.rm(dstDir, dstName, false, false); err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
}
// Create the new file
if err := s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) {
entry.Attributes = dstEntry.Attributes
entry.Extended = dstEntry.Extended
}); err != nil {
// Check if destination bucket has versioning configured
dstVersioningConfigured, err := s3a.isVersioningConfigured(dstBucket)
if err != nil {
glog.Errorf("Error checking versioning status for destination bucket %s: %v", dstBucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Convert filer_pb.Entry to filer.Entry for ETag calculation
filerEntry := &filer.Entry{
FullPath: dstPath,
Attr: filer.Attr{
FileSize: dstEntry.Attributes.FileSize,
Mtime: time.Unix(dstEntry.Attributes.Mtime, 0),
Crtime: time.Unix(dstEntry.Attributes.Crtime, 0),
Mime: dstEntry.Attributes.Mime,
},
Chunks: dstEntry.Chunks,
var dstVersionId string
var etag string
if dstVersioningConfigured {
// For versioned destination, create a new version
dstVersionId = generateVersionId()
glog.V(2).Infof("CopyObjectHandler: creating version %s for destination %s/%s", dstVersionId, dstBucket, dstObject)
// Add version metadata to the entry
if dstEntry.Extended == nil {
dstEntry.Extended = make(map[string][]byte)
}
dstEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(dstVersionId)
// Calculate ETag for versioning
filerEntry := &filer.Entry{
FullPath: util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)),
Attr: filer.Attr{
FileSize: dstEntry.Attributes.FileSize,
Mtime: time.Unix(dstEntry.Attributes.Mtime, 0),
Crtime: time.Unix(dstEntry.Attributes.Crtime, 0),
Mime: dstEntry.Attributes.Mime,
},
Chunks: dstEntry.Chunks,
}
etag = filer.ETagEntry(filerEntry)
if !strings.HasPrefix(etag, "\"") {
etag = "\"" + etag + "\""
}
dstEntry.Extended[s3_constants.ExtETagKey] = []byte(etag)
// Create version file
versionFileName := s3a.getVersionFileName(dstVersionId)
versionObjectPath := dstObject + ".versions/" + versionFileName
bucketDir := s3a.option.BucketsPath + "/" + dstBucket
if err := s3a.mkFile(bucketDir, versionObjectPath, dstEntry.Chunks, func(entry *filer_pb.Entry) {
entry.Attributes = dstEntry.Attributes
entry.Extended = dstEntry.Extended
}); err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Update the .versions directory metadata
err = s3a.updateLatestVersionInDirectory(dstBucket, dstObject, dstVersionId, versionFileName)
if err != nil {
glog.Errorf("CopyObjectHandler: failed to update latest version in directory: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Set version ID in response header
w.Header().Set("x-amz-version-id", dstVersionId)
} else {
// For non-versioned destination, use regular copy
dstPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject))
dstDir, dstName := dstPath.DirAndName()
// Check if destination exists and remove it first (S3 copy overwrites)
if exists, _ := s3a.exists(dstDir, dstName, false); exists {
if err := s3a.rm(dstDir, dstName, false, false); err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
}
// Create the new file
if err := s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) {
entry.Attributes = dstEntry.Attributes
entry.Extended = dstEntry.Extended
}); err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Calculate ETag
filerEntry := &filer.Entry{
FullPath: dstPath,
Attr: filer.Attr{
FileSize: dstEntry.Attributes.FileSize,
Mtime: time.Unix(dstEntry.Attributes.Mtime, 0),
Crtime: time.Unix(dstEntry.Attributes.Crtime, 0),
Mime: dstEntry.Attributes.Mime,
},
Chunks: dstEntry.Chunks,
}
etag = filer.ETagEntry(filerEntry)
}
setEtag(w, filer.ETagEntry(filerEntry))
setEtag(w, etag)
response := CopyObjectResult{
ETag: filer.ETagEntry(filerEntry),
ETag: etag,
LastModified: time.Now().UTC(),
}
@@ -191,6 +288,18 @@ func pathToBucketAndObject(path string) (bucket, object string) {
return parts[0], "/"
}
func pathToBucketObjectAndVersion(path string) (bucket, object, versionId string) {
// Parse versionId from query string if present
// Format: /bucket/object?versionId=version-id
if idx := strings.Index(path, "?versionId="); idx != -1 {
versionId = path[idx+len("?versionId="):] // dynamically calculate length
path = path[:idx]
}
bucket, object = pathToBucketAndObject(path)
return bucket, object, versionId
}
type CopyPartResult struct {
LastModified time.Time `xml:"LastModified"`
ETag string `xml:"ETag"`
@@ -208,7 +317,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
cpSrcPath = r.Header.Get("X-Amz-Copy-Source")
}
srcBucket, srcObject := pathToBucketAndObject(cpSrcPath)
srcBucket, srcObject, srcVersionId := pathToBucketObjectAndVersion(cpSrcPath)
// If source object is empty or bucket is empty, reply back invalid copy source.
if srcObject == "" || srcBucket == "" {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
@@ -239,10 +348,40 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
return
}
// Get source entry
srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
dir, name := srcPath.DirAndName()
entry, err := s3a.getEntry(dir, name)
// Get detailed versioning state for source bucket
srcVersioningState, err := s3a.getVersioningState(srcBucket)
if err != nil {
glog.Errorf("Error checking versioning state for source bucket %s: %v", srcBucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
// Get the source entry with version awareness based on versioning state
var entry *filer_pb.Entry
if srcVersionId != "" {
// Specific version requested - always use version-aware retrieval
entry, err = s3a.getSpecificObjectVersion(srcBucket, srcObject, srcVersionId)
} else if srcVersioningState == s3_constants.VersioningEnabled {
// Versioning enabled - get latest version from .versions directory
entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
} else if srcVersioningState == s3_constants.VersioningSuspended {
// Versioning suspended - current object is stored as regular file ("null" version)
// Try regular file first, fall back to latest version if needed
srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
dir, name := srcPath.DirAndName()
entry, err = s3a.getEntry(dir, name)
if err != nil {
// If regular file doesn't exist, try latest version as fallback
glog.V(2).Infof("CopyObjectPart: regular file not found for suspended versioning, trying latest version")
entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
}
} else {
// No versioning configured - use regular retrieval
srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
dir, name := srcPath.DirAndName()
entry, err = s3a.getEntry(dir, name)
}
if err != nil || entry.IsDirectory {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return

View File

@@ -32,8 +32,8 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
// Check for specific version ID in query parameters
versionId := r.URL.Query().Get("versionId")
// Check if versioning is enabled for the bucket
versioningEnabled, err := s3a.isVersioningEnabled(bucket)
// Check if versioning is configured for the bucket (Enabled or Suspended)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
@@ -49,7 +49,7 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
}
if versioningEnabled {
if versioningConfigured {
// Handle versioned delete
if versionId != "" {
// Check object lock permissions before deleting specific version
@@ -137,8 +137,10 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
// ObjectIdentifier represents an object to be deleted with its key name and optional version ID.
type ObjectIdentifier struct {
Key string `xml:"Key"`
VersionId string `xml:"VersionId,omitempty"`
Key string `xml:"Key"`
VersionId string `xml:"VersionId,omitempty"`
DeleteMarker bool `xml:"DeleteMarker,omitempty"`
DeleteMarkerVersionId string `xml:"DeleteMarkerVersionId,omitempty"`
}
// DeleteObjectsRequest - xml carrying the object key names which needs to be deleted.
@@ -201,8 +203,8 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
}
// Check if versioning is enabled for the bucket (needed for object lock checks)
versioningEnabled, err := s3a.isVersioningEnabled(bucket)
// Check if versioning is configured for the bucket (needed for object lock checks)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
@@ -222,7 +224,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
// Check object lock permissions before deletion (only for versioned buckets)
if versioningEnabled {
if versioningConfigured {
// Validate governance bypass for this specific object
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object.Key)
if err := s3a.enforceObjectLockProtections(r, bucket, object.Key, object.VersionId, governanceBypassAllowed); err != nil {
@@ -236,31 +238,90 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
continue
}
}
lastSeparator := strings.LastIndex(object.Key, "/")
parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.Key, true, false
if lastSeparator > 0 && lastSeparator+1 < len(object.Key) {
entryName = object.Key[lastSeparator+1:]
parentDirectoryPath = "/" + object.Key[:lastSeparator]
}
parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath)
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err == nil {
directoriesWithDeletion[parentDirectoryPath]++
deletedObjects = append(deletedObjects, object)
} else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
deletedObjects = append(deletedObjects, object)
var deleteVersionId string
var isDeleteMarker bool
if versioningConfigured {
// Handle versioned delete
if object.VersionId != "" {
// Delete specific version
err := s3a.deleteSpecificObjectVersion(bucket, object.Key, object.VersionId)
if err != nil {
deleteErrors = append(deleteErrors, DeleteError{
Code: "",
Message: err.Error(),
Key: object.Key,
VersionId: object.VersionId,
})
continue
}
deleteVersionId = object.VersionId
} else {
// Create delete marker (logical delete)
deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object.Key)
if err != nil {
deleteErrors = append(deleteErrors, DeleteError{
Code: "",
Message: err.Error(),
Key: object.Key,
VersionId: object.VersionId,
})
continue
}
deleteVersionId = deleteMarkerVersionId
isDeleteMarker = true
}
// Add to successful deletions with version info
deletedObject := ObjectIdentifier{
Key: object.Key,
VersionId: deleteVersionId,
DeleteMarker: isDeleteMarker,
}
// For delete markers, also set DeleteMarkerVersionId field
if isDeleteMarker {
deletedObject.DeleteMarkerVersionId = deleteVersionId
// Don't set VersionId for delete markers, use DeleteMarkerVersionId instead
deletedObject.VersionId = ""
}
if !deleteObjects.Quiet {
deletedObjects = append(deletedObjects, deletedObject)
}
if isDeleteMarker {
// For delete markers, we don't need to track directories for cleanup
continue
}
} else {
delete(directoriesWithDeletion, parentDirectoryPath)
deleteErrors = append(deleteErrors, DeleteError{
Code: "",
Message: err.Error(),
Key: object.Key,
VersionId: object.VersionId,
})
// Handle non-versioned delete (original logic)
lastSeparator := strings.LastIndex(object.Key, "/")
parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.Key, true, false
if lastSeparator > 0 && lastSeparator+1 < len(object.Key) {
entryName = object.Key[lastSeparator+1:]
parentDirectoryPath = "/" + object.Key[:lastSeparator]
}
parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath)
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err == nil {
directoriesWithDeletion[parentDirectoryPath]++
deletedObjects = append(deletedObjects, object)
} else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
deletedObjects = append(deletedObjects, object)
} else {
delete(directoriesWithDeletion, parentDirectoryPath)
deleteErrors = append(deleteErrors, DeleteError{
Code: "",
Message: err.Error(),
Key: object.Key,
VersionId: object.VersionId,
})
}
}
if auditLog != nil {
auditLog.Key = entryName
auditLog.Key = object.Key
s3err.PostAccessLog(*auditLog)
}
}

View File

@@ -95,8 +95,8 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
return
}
} else {
// Check if versioning is enabled for the bucket
versioningEnabled, err := s3a.isVersioningEnabled(bucket)
// Get detailed versioning state for the bucket
versioningState, err := s3a.getVersioningState(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
@@ -107,7 +107,10 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
return
}
glog.V(1).Infof("PutObjectHandler: bucket %s, object %s, versioningEnabled=%v", bucket, object, versioningEnabled)
versioningEnabled := (versioningState == s3_constants.VersioningEnabled)
versioningConfigured := (versioningState != "")
glog.V(1).Infof("PutObjectHandler: bucket %s, object %s, versioningState=%s", bucket, object, versioningState)
// Validate object lock headers before processing
if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil {
@@ -118,7 +121,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
// For non-versioned buckets, check if existing object has object lock protections
// that would prevent overwrite (PUT operations overwrite existing objects in non-versioned buckets)
if !versioningEnabled {
if !versioningConfigured {
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
if err := s3a.enforceObjectLockProtections(r, bucket, object, "", governanceBypassAllowed); err != nil {
glog.V(2).Infof("PutObjectHandler: object lock permissions check failed for %s/%s: %v", bucket, object, err)
@@ -127,8 +130,8 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
}
}
if versioningEnabled {
// Handle versioned PUT
if versioningState == s3_constants.VersioningEnabled {
// Handle enabled versioning - create new versions with real version IDs
glog.V(1).Infof("PutObjectHandler: using versioned PUT for %s/%s", bucket, object)
versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
if errCode != s3err.ErrNone {
@@ -141,10 +144,24 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
w.Header().Set("x-amz-version-id", versionId)
}
// Set ETag in response
setEtag(w, etag)
} else if versioningState == s3_constants.VersioningSuspended {
// Handle suspended versioning - overwrite with "null" version ID but preserve existing versions
glog.V(1).Infof("PutObjectHandler: using suspended versioning PUT for %s/%s", bucket, object)
etag, errCode := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
// Note: Suspended versioning should NOT return x-amz-version-id header according to AWS S3 spec
// The object is stored with "null" version internally but no version header is returned
// Set ETag in response
setEtag(w, etag)
} else {
// Handle regular PUT (non-versioned)
// Handle regular PUT (never configured versioning)
glog.V(1).Infof("PutObjectHandler: using regular PUT for %s/%s", bucket, object)
uploadUrl := s3a.toFilerUrl(bucket, object)
if objectContentType == "" {
@@ -158,6 +175,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
return
}
// No version ID header for never-configured versioning
setEtag(w, etag)
}
}
@@ -274,6 +292,133 @@ func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string
// putVersionedObject handles PUT operations for versioned buckets using the new layout
// where all versions (including latest) are stored in the .versions directory
func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) {
// For suspended versioning, store as regular object (version ID "null") but preserve existing versions
glog.V(2).Infof("putSuspendedVersioningObject: creating null version for %s/%s", bucket, object)
uploadUrl := s3a.toFilerUrl(bucket, object)
if objectContentType == "" {
dataReader = mimeDetect(r, dataReader)
}
etag, errCode = s3a.putToFiler(r, uploadUrl, dataReader, "", bucket)
if errCode != s3err.ErrNone {
glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode)
return "", errCode
}
// Get the uploaded entry to add version metadata indicating this is "null" version
bucketDir := s3a.option.BucketsPath + "/" + bucket
entry, err := s3a.getEntry(bucketDir, object)
if err != nil {
glog.Errorf("putSuspendedVersioningObject: failed to get object entry: %v", err)
return "", s3err.ErrInternalError
}
// Add metadata to indicate this is a "null" version for suspended versioning
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
// Extract and store object lock metadata from request headers (if any)
if err := s3a.extractObjectLockMetadataFromRequest(r, entry); err != nil {
glog.Errorf("putSuspendedVersioningObject: failed to extract object lock metadata: %v", err)
return "", s3err.ErrInvalidRequest
}
// Update the entry with metadata
err = s3a.mkFile(bucketDir, object, entry.Chunks, func(updatedEntry *filer_pb.Entry) {
updatedEntry.Extended = entry.Extended
updatedEntry.Attributes = entry.Attributes
updatedEntry.Chunks = entry.Chunks
})
if err != nil {
glog.Errorf("putSuspendedVersioningObject: failed to update object metadata: %v", err)
return "", s3err.ErrInternalError
}
// Update all existing versions/delete markers to set IsLatest=false since "null" is now latest
err = s3a.updateIsLatestFlagsForSuspendedVersioning(bucket, object)
if err != nil {
glog.Warningf("putSuspendedVersioningObject: failed to update IsLatest flags: %v", err)
// Don't fail the request, but log the warning
}
glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object)
return etag, s3err.ErrNone
}
// updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers
// when a new "null" version becomes the latest during suspended versioning
func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object string) error {
bucketDir := s3a.option.BucketsPath + "/" + bucket
cleanObject := strings.TrimPrefix(object, "/")
versionsObjectPath := cleanObject + ".versions"
versionsDir := bucketDir + "/" + versionsObjectPath
glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: updating flags for %s/%s", bucket, cleanObject)
// Check if .versions directory exists
_, err := s3a.getEntry(bucketDir, versionsObjectPath)
if err != nil {
// No .versions directory exists, nothing to update
glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: no .versions directory for %s/%s", bucket, cleanObject)
return nil
}
// List all entries in .versions directory
entries, _, err := s3a.list(versionsDir, "", "", false, 1000)
if err != nil {
return fmt.Errorf("failed to list versions directory: %v", err)
}
glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: found %d entries to update", len(entries))
// Update each version/delete marker to set IsLatest=false
for _, entry := range entries {
if entry.Extended == nil {
continue
}
// Check if this entry has a version ID (it should be a version or delete marker)
versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
if !hasVersionId {
continue
}
versionId := string(versionIdBytes)
glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: setting IsLatest=false for version %s", versionId)
// Update the entry to set IsLatest=false (we don't explicitly store this flag,
// it's determined by comparison with latest version metadata)
// We need to clear the latest version metadata from the .versions directory
// so that our getObjectVersionList function will correctly show IsLatest=false
}
// Clear the latest version metadata from .versions directory since "null" is now latest
versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
if err == nil && versionsEntry.Extended != nil {
// Remove latest version metadata so all versions show IsLatest=false
delete(versionsEntry.Extended, s3_constants.ExtLatestVersionIdKey)
delete(versionsEntry.Extended, s3_constants.ExtLatestVersionFileNameKey)
// Update the .versions directory entry
err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
updatedEntry.Extended = versionsEntry.Extended
updatedEntry.Attributes = versionsEntry.Attributes
updatedEntry.Chunks = versionsEntry.Chunks
})
if err != nil {
return fmt.Errorf("failed to update .versions directory metadata: %v", err)
}
glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: cleared latest version metadata for %s/%s", bucket, cleanObject)
}
return nil
}
func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) {
// Generate version ID
versionId = generateVersionId()

View File

@@ -2,7 +2,6 @@ package s3api
import (
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"encoding/xml"
"fmt"
@@ -48,20 +47,26 @@ type ListObjectVersionsResult struct {
CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
}
// generateVersionId creates a unique version ID
// generateVersionId creates a unique version ID that preserves chronological order
func generateVersionId() string {
// Generate a random 16-byte value
randBytes := make([]byte, 16)
// Use nanosecond timestamp to ensure chronological ordering
// Format as 16-digit hex (first 16 chars of version ID)
now := time.Now().UnixNano()
timestampHex := fmt.Sprintf("%016x", now)
// Generate random 8 bytes for uniqueness (last 16 chars of version ID)
randBytes := make([]byte, 8)
if _, err := rand.Read(randBytes); err != nil {
glog.Errorf("Failed to generate random bytes for version ID: %v", err)
return ""
// Fallback to timestamp-only if random generation fails
return timestampHex + "0000000000000000"
}
// Hash with current timestamp for uniqueness
hash := sha256.Sum256(append(randBytes, []byte(fmt.Sprintf("%d", time.Now().UnixNano()))...))
// Combine timestamp (16 chars) + random (16 chars) = 32 chars total
randomHex := hex.EncodeToString(randBytes)
versionId := timestampHex + randomHex
// Return first 32 characters of hex string (same length as AWS S3 version IDs)
return hex.EncodeToString(hash[:])[:32]
return versionId
}
// getVersionedObjectDir returns the directory path for storing object versions
@@ -122,59 +127,20 @@ func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error
func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int) (*ListObjectVersionsResult, error) {
var allVersions []interface{} // Can contain VersionEntry or DeleteMarkerEntry
// List all entries in bucket
entries, _, err := s3a.list(path.Join(s3a.option.BucketsPath, bucket), prefix, keyMarker, false, uint32(maxKeys*2))
// Track objects that have been processed to avoid duplicates
processedObjects := make(map[string]bool)
// Track version IDs globally to prevent duplicates throughout the listing
seenVersionIds := make(map[string]bool)
// Recursively find all .versions directories in the bucket
bucketPath := path.Join(s3a.option.BucketsPath, bucket)
err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix)
if err != nil {
return nil, err
}
// For each entry, check if it's a .versions directory
for _, entry := range entries {
if !entry.IsDirectory {
continue
}
// Check if this is a .versions directory
if !strings.HasSuffix(entry.Name, ".versions") {
continue
}
// Extract object name from .versions directory name
objectKey := strings.TrimSuffix(entry.Name, ".versions")
versions, err := s3a.getObjectVersionList(bucket, objectKey)
if err != nil {
glog.Warningf("Failed to get versions for object %s: %v", objectKey, err)
continue
}
for _, version := range versions {
if version.IsDeleteMarker {
deleteMarker := &DeleteMarkerEntry{
Key: objectKey,
VersionId: version.VersionId,
IsLatest: version.IsLatest,
LastModified: version.LastModified,
Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
}
allVersions = append(allVersions, deleteMarker)
} else {
versionEntry := &VersionEntry{
Key: objectKey,
VersionId: version.VersionId,
IsLatest: version.IsLatest,
LastModified: version.LastModified,
ETag: version.ETag,
Size: version.Size,
Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
StorageClass: "STANDARD",
}
allVersions = append(allVersions, versionEntry)
}
}
}
// Sort by key, then by LastModified and VersionId
// Sort by key, then by LastModified (newest first), then by VersionId for deterministic ordering
sort.Slice(allVersions, func(i, j int) bool {
var keyI, keyJ string
var lastModifiedI, lastModifiedJ time.Time
@@ -202,13 +168,20 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
versionIdJ = v.VersionId
}
// First sort by object key
if keyI != keyJ {
return keyI < keyJ
}
if !lastModifiedI.Equal(lastModifiedJ) {
// Then by modification time (newest first) - but use nanosecond precision for ties
timeDiff := lastModifiedI.Sub(lastModifiedJ)
if timeDiff.Abs() > time.Millisecond {
return lastModifiedI.After(lastModifiedJ)
}
return versionIdI < versionIdJ
// For very close timestamps (within 1ms), use version ID for deterministic ordering
// Sort version IDs in reverse lexicographic order to maintain newest-first semantics
return versionIdI > versionIdJ
})
// Build result
@@ -237,6 +210,10 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
}
}
// Always initialize empty slices so boto3 gets the expected fields even when empty
result.Versions = make([]VersionEntry, 0)
result.DeleteMarkers = make([]DeleteMarkerEntry, 0)
// Add versions to result
for _, version := range allVersions {
switch v := version.(type) {
@@ -250,6 +227,128 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
return result, nil
}
// findVersionsRecursively searches for all .versions directories and regular files recursively
func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix string) error {
// List entries in current directory
entries, _, err := s3a.list(currentPath, "", "", false, 1000)
if err != nil {
return err
}
for _, entry := range entries {
entryPath := path.Join(relativePath, entry.Name)
// Skip if this doesn't match the prefix filter
if prefix != "" && !strings.HasPrefix(entryPath, strings.TrimPrefix(prefix, "/")) {
continue
}
if entry.IsDirectory {
// Skip .uploads directory (multipart upload temporary files)
if strings.HasPrefix(entry.Name, ".uploads") {
continue
}
// Check if this is a .versions directory
if strings.HasSuffix(entry.Name, ".versions") {
// Extract object name from .versions directory name
objectKey := strings.TrimSuffix(entryPath, ".versions")
processedObjects[objectKey] = true
glog.V(2).Infof("findVersionsRecursively: found .versions directory for object %s", objectKey)
versions, err := s3a.getObjectVersionList(bucket, objectKey)
if err != nil {
glog.Warningf("Failed to get versions for object %s: %v", objectKey, err)
continue
}
for _, version := range versions {
// Check for duplicate version IDs and skip if already seen
versionKey := objectKey + ":" + version.VersionId
if seenVersionIds[versionKey] {
glog.Warningf("findVersionsRecursively: duplicate version %s for object %s detected, skipping", version.VersionId, objectKey)
continue
}
seenVersionIds[versionKey] = true
if version.IsDeleteMarker {
deleteMarker := &DeleteMarkerEntry{
Key: objectKey,
VersionId: version.VersionId,
IsLatest: version.IsLatest,
LastModified: version.LastModified,
Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
}
*allVersions = append(*allVersions, deleteMarker)
} else {
versionEntry := &VersionEntry{
Key: objectKey,
VersionId: version.VersionId,
IsLatest: version.IsLatest,
LastModified: version.LastModified,
ETag: version.ETag,
Size: version.Size,
Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
StorageClass: "STANDARD",
}
*allVersions = append(*allVersions, versionEntry)
}
}
} else {
// Recursively search subdirectories
fullPath := path.Join(currentPath, entry.Name)
err := s3a.findVersionsRecursively(fullPath, entryPath, allVersions, processedObjects, seenVersionIds, bucket, prefix)
if err != nil {
glog.Warningf("Error searching subdirectory %s: %v", entryPath, err)
continue
}
}
} else {
// This is a regular file - check if it's a pre-versioning object
objectKey := entryPath
// Skip if this object already has a .versions directory (already processed)
if processedObjects[objectKey] {
continue
}
// This is a pre-versioning object - treat it as a version with VersionId="null"
glog.V(2).Infof("findVersionsRecursively: found pre-versioning object %s", objectKey)
// Check if this null version should be marked as latest
// It's only latest if there's no .versions directory OR no latest version metadata
isLatest := true
versionsObjectPath := objectKey + ".versions"
if versionsEntry, err := s3a.getEntry(currentPath, versionsObjectPath); err == nil {
// .versions directory exists, check if there's latest version metadata
if versionsEntry.Extended != nil {
if _, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest {
// There is a latest version in the .versions directory, so null is not latest
isLatest = false
glog.V(2).Infof("findVersionsRecursively: null version for %s is not latest due to versioned objects", objectKey)
}
}
}
etag := s3a.calculateETagFromChunks(entry.Chunks)
versionEntry := &VersionEntry{
Key: objectKey,
VersionId: "null",
IsLatest: isLatest,
LastModified: time.Unix(entry.Attributes.Mtime, 0),
ETag: etag,
Size: int64(entry.Attributes.FileSize),
Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
StorageClass: "STANDARD",
}
*allVersions = append(*allVersions, versionEntry)
}
}
return nil
}
// getObjectVersionList returns all versions of a specific object
func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVersion, error) {
var versions []*ObjectVersion
@@ -287,6 +386,9 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe
glog.V(2).Infof("getObjectVersionList: found %d entries in versions directory", len(entries))
// Use a map to detect and prevent duplicate version IDs
seenVersionIds := make(map[string]bool)
for i, entry := range entries {
if entry.Extended == nil {
glog.V(2).Infof("getObjectVersionList: entry %d has no Extended metadata, skipping", i)
@@ -301,6 +403,13 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe
versionId := string(versionIdBytes)
// Check for duplicate version IDs and skip if already seen
if seenVersionIds[versionId] {
glog.Warningf("getObjectVersionList: duplicate version ID %s detected for object %s/%s, skipping", versionId, bucket, object)
continue
}
seenVersionIds[versionId] = true
// Check if this version is the latest by comparing with directory metadata
isLatest := (versionId == latestVersionId)
@@ -331,12 +440,9 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe
versions = append(versions, version)
}
// Sort by modification time (newest first)
sort.Slice(versions, func(i, j int) bool {
return versions[i].LastModified.After(versions[j].LastModified)
})
// Don't sort here - let the main listObjectVersions function handle sorting consistently
glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s", len(versions), bucket, object)
glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s (after deduplication from %d entries)", len(versions), bucket, object, len(entries))
for i, version := range versions {
glog.V(2).Infof("getObjectVersionList: version %d: %s (isLatest=%v, isDeleteMarker=%v)", i, version.VersionId, version.IsLatest, version.IsDeleteMarker)
}
@@ -366,6 +472,16 @@ func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId strin
return s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), strings.TrimPrefix(object, "/"))
}
if versionId == "null" {
// "null" version ID refers to pre-versioning objects stored as regular files
bucketDir := s3a.option.BucketsPath + "/" + bucket
entry, err := s3a.getEntry(bucketDir, object)
if err != nil {
return nil, fmt.Errorf("null version object %s not found: %v", object, err)
}
return entry, nil
}
// Get specific version from .versions directory
versionsDir := s3a.getVersionedObjectDir(bucket, object)
versionFile := s3a.getVersionFileName(versionId)
@@ -384,6 +500,32 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st
return fmt.Errorf("version ID is required for version-specific deletion")
}
if versionId == "null" {
// Delete "null" version (pre-versioning object stored as regular file)
bucketDir := s3a.option.BucketsPath + "/" + bucket
cleanObject := strings.TrimPrefix(object, "/")
// Check if the object exists
_, err := s3a.getEntry(bucketDir, cleanObject)
if err != nil {
// Object doesn't exist - this is OK for delete operations (idempotent)
glog.V(2).Infof("deleteSpecificObjectVersion: null version object %s already deleted or doesn't exist", cleanObject)
return nil
}
// Delete the regular file
deleteErr := s3a.rm(bucketDir, cleanObject, true, false)
if deleteErr != nil {
// Check if file was already deleted by another process
if _, checkErr := s3a.getEntry(bucketDir, cleanObject); checkErr != nil {
// File doesn't exist anymore, deletion was successful
return nil
}
return fmt.Errorf("failed to delete null version %s: %v", cleanObject, deleteErr)
}
return nil
}
versionsDir := s3a.getVersionedObjectDir(bucket, object)
versionFile := s3a.getVersionFileName(versionId)
@@ -393,16 +535,120 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st
return fmt.Errorf("version %s not found: %v", versionId, err)
}
// Version exists, delete it
// Check if this is the latest version before deleting
versionsEntry, dirErr := s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), object+".versions")
isLatestVersion := false
if dirErr == nil && versionsEntry.Extended != nil {
if latestVersionIdBytes, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest {
isLatestVersion = (string(latestVersionIdBytes) == versionId)
}
}
// Delete the version file
deleteErr := s3a.rm(versionsDir, versionFile, true, false)
if deleteErr != nil {
// Check if file was already deleted by another process
if _, checkErr := s3a.getEntry(versionsDir, versionFile); checkErr != nil {
// File doesn't exist anymore, deletion was successful
return nil
} else {
return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr)
}
return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr)
}
// If we deleted the latest version, update the .versions directory metadata to point to the new latest
if isLatestVersion {
err := s3a.updateLatestVersionAfterDeletion(bucket, object)
if err != nil {
glog.Warningf("deleteSpecificObjectVersion: failed to update latest version after deletion: %v", err)
// Don't return error since the deletion was successful
}
}
return nil
}
// updateLatestVersionAfterDeletion finds the new latest version after deleting the current latest
func (s3a *S3ApiServer) updateLatestVersionAfterDeletion(bucket, object string) error {
bucketDir := s3a.option.BucketsPath + "/" + bucket
cleanObject := strings.TrimPrefix(object, "/")
versionsObjectPath := cleanObject + ".versions"
versionsDir := bucketDir + "/" + versionsObjectPath
glog.V(1).Infof("updateLatestVersionAfterDeletion: updating latest version for %s/%s, listing %s", bucket, object, versionsDir)
// List all remaining version files in the .versions directory
entries, _, err := s3a.list(versionsDir, "", "", false, 1000)
if err != nil {
glog.Errorf("updateLatestVersionAfterDeletion: failed to list versions in %s: %v", versionsDir, err)
return fmt.Errorf("failed to list versions: %v", err)
}
glog.V(1).Infof("updateLatestVersionAfterDeletion: found %d entries in %s", len(entries), versionsDir)
// Find the most recent remaining version (latest timestamp in version ID)
var latestVersionId string
var latestVersionFileName string
for _, entry := range entries {
if entry.Extended == nil {
continue
}
versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
if !hasVersionId {
continue
}
versionId := string(versionIdBytes)
// Skip delete markers when finding latest content version
isDeleteMarkerBytes, _ := entry.Extended[s3_constants.ExtDeleteMarkerKey]
if string(isDeleteMarkerBytes) == "true" {
continue
}
// Compare version IDs chronologically (our version IDs start with timestamp)
if latestVersionId == "" || versionId > latestVersionId {
glog.V(1).Infof("updateLatestVersionAfterDeletion: found newer version %s (file: %s)", versionId, entry.Name)
latestVersionId = versionId
latestVersionFileName = entry.Name
} else {
glog.V(1).Infof("updateLatestVersionAfterDeletion: skipping older version %s", versionId)
}
}
// Update the .versions directory metadata
versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
if err != nil {
return fmt.Errorf("failed to get .versions directory: %v", err)
}
if versionsEntry.Extended == nil {
versionsEntry.Extended = make(map[string][]byte)
}
if latestVersionId != "" {
// Update metadata to point to new latest version
versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(latestVersionId)
versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(latestVersionFileName)
glog.V(2).Infof("updateLatestVersionAfterDeletion: new latest version for %s/%s is %s", bucket, object, latestVersionId)
} else {
// No versions left, remove latest version metadata
delete(versionsEntry.Extended, s3_constants.ExtLatestVersionIdKey)
delete(versionsEntry.Extended, s3_constants.ExtLatestVersionFileNameKey)
glog.V(2).Infof("updateLatestVersionAfterDeletion: no versions left for %s/%s", bucket, object)
}
// Update the .versions directory entry
err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
updatedEntry.Extended = versionsEntry.Extended
updatedEntry.Attributes = versionsEntry.Attributes
updatedEntry.Chunks = versionsEntry.Chunks
})
if err != nil {
return fmt.Errorf("failed to update .versions directory metadata: %v", err)
}
return nil
}
@@ -450,24 +696,56 @@ func (s3a *S3ApiServer) ListObjectVersionsHandler(w http.ResponseWriter, r *http
// getLatestObjectVersion finds the latest version of an object by reading .versions directory metadata
func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb.Entry, error) {
bucketDir := s3a.option.BucketsPath + "/" + bucket
versionsObjectPath := object + ".versions"
cleanObject := strings.TrimPrefix(object, "/")
versionsObjectPath := cleanObject + ".versions"
// Get the .versions directory entry to read latest version metadata
versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
if err != nil {
return nil, fmt.Errorf("failed to get .versions directory: %w", err)
// .versions directory doesn't exist - this can happen for objects that existed
// before versioning was enabled on the bucket. Fall back to checking for a
// regular (non-versioned) object file.
glog.V(2).Infof("getLatestObjectVersion: no .versions directory for %s/%s, checking for pre-versioning object", bucket, object)
regularEntry, regularErr := s3a.getEntry(bucketDir, cleanObject)
if regularErr != nil {
return nil, fmt.Errorf("failed to get %s/%s .versions directory and no regular object found: %w", bucket, cleanObject, err)
}
glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s", bucket, cleanObject)
return regularEntry, nil
}
// Check if directory has latest version metadata
if versionsEntry.Extended == nil {
return nil, fmt.Errorf("no version metadata found in .versions directory for %s/%s", bucket, object)
// No metadata means all versioned objects have been deleted.
// Fall back to checking for a pre-versioning object.
glog.V(2).Infof("getLatestObjectVersion: no Extended metadata in .versions directory for %s/%s, checking for pre-versioning object", bucket, cleanObject)
regularEntry, regularErr := s3a.getEntry(bucketDir, cleanObject)
if regularErr != nil {
return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s/%s", bucket, cleanObject)
}
glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s (no Extended metadata case)", bucket, cleanObject)
return regularEntry, nil
}
latestVersionIdBytes, hasLatestVersionId := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]
latestVersionFileBytes, hasLatestVersionFile := versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey]
if !hasLatestVersionId || !hasLatestVersionFile {
return nil, fmt.Errorf("incomplete latest version metadata in .versions directory for %s/%s", bucket, object)
// No version metadata means all versioned objects have been deleted.
// Fall back to checking for a pre-versioning object.
glog.V(2).Infof("getLatestObjectVersion: no version metadata in .versions directory for %s/%s, checking for pre-versioning object", bucket, object)
regularEntry, regularErr := s3a.getEntry(bucketDir, cleanObject)
if regularErr != nil {
return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s/%s", bucket, cleanObject)
}
glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s after version deletion", bucket, cleanObject)
return regularEntry, nil
}
latestVersionId := string(latestVersionIdBytes)