fix listing object versions (#7006)

* fix listing object versions

* Update s3api_object_versioning.go

* Update s3_directory_versioning_test.go

* check previous skipped tests

* fix test_versioning_stack_delete_merkers

* address test_bucket_list_return_data_versioning

* Update s3_directory_versioning_test.go

* fix test_versioning_concurrent_multi_object_delete

* fix test_versioning_obj_suspend_versions test

* fix empty owner

* fix listing versioned objects

* default owner

* fix path
This commit is contained in:
Chris Lu
2025-07-21 00:23:22 -07:00
committed by GitHub
parent bfe68984d5
commit c196d03951
8 changed files with 1190 additions and 79 deletions

View File

@@ -198,9 +198,33 @@ func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bu
StorageClass: StorageClass(storageClass),
}
if fetchOwner {
// Extract owner from S3 metadata (Extended attributes) instead of file system attributes
var ownerID, displayName string
if entry.Extended != nil {
if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
ownerID = string(ownerBytes)
}
}
// Fallback to anonymous if no S3 owner found
if ownerID == "" {
ownerID = s3_constants.AccountAnonymousId
displayName = "anonymous"
} else {
// Try to resolve display name from IAM system
displayName = "unknown"
// Note: IAM resolution would require access to the S3ApiServer instance
// For now, use a simple fallback or could be enhanced later
}
// Additional fallback to file system username if available and no display name resolved
if displayName == "unknown" && entry.Attributes.UserName != "" {
displayName = entry.Attributes.UserName
}
listEntry.Owner = CanonicalUser{
ID: fmt.Sprintf("%x", entry.Attributes.Uid),
DisplayName: entry.Attributes.UserName,
ID: ownerID,
DisplayName: displayName,
}
}
return listEntry

View File

@@ -24,18 +24,63 @@ func (s3a *S3ApiServer) GetObjectAclHandler(w http.ResponseWriter, r *http.Reque
return
}
// Check if object exists and get its metadata
bucketDir := s3a.option.BucketsPath + "/" + bucket
entry, err := s3a.getEntry(bucketDir, object)
// Check for specific version ID in query parameters
versionId := r.URL.Query().Get("versionId")
// Check if versioning is configured for the bucket (Enabled or Suspended)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
glog.Errorf("GetObjectAclHandler: error checking object %s/%s: %v", bucket, object, err)
glog.Errorf("GetObjectAclHandler: Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
var entry *filer_pb.Entry
if versioningConfigured {
// Handle versioned object ACL retrieval - use same logic as GetObjectHandler
if versionId != "" {
// Request for specific version
glog.V(2).Infof("GetObjectAclHandler: requesting ACL for specific version %s of %s%s", versionId, bucket, object)
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
} else {
// Request for latest version
glog.V(2).Infof("GetObjectAclHandler: requesting ACL for latest version of %s%s", bucket, object)
entry, err = s3a.getLatestObjectVersion(bucket, object)
}
if err != nil {
glog.Errorf("GetObjectAclHandler: Failed to get object version %s for %s%s: %v", versionId, bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
// Check if this is a delete marker
if entry.Extended != nil {
if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
}
} else {
// Handle regular (non-versioned) object ACL retrieval
bucketDir := s3a.option.BucketsPath + "/" + bucket
entry, err = s3a.getEntry(bucketDir, object)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
glog.Errorf("GetObjectAclHandler: error checking object %s/%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
}
if entry == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
@@ -123,18 +168,63 @@ func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Reque
return
}
// Check if object exists and get its metadata
bucketDir := s3a.option.BucketsPath + "/" + bucket
entry, err := s3a.getEntry(bucketDir, object)
// Check for specific version ID in query parameters
versionId := r.URL.Query().Get("versionId")
// Check if versioning is configured for the bucket (Enabled or Suspended)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
glog.Errorf("PutObjectAclHandler: error checking object %s/%s: %v", bucket, object, err)
glog.Errorf("PutObjectAclHandler: Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
var entry *filer_pb.Entry
if versioningConfigured {
// Handle versioned object ACL modification - use same logic as GetObjectHandler
if versionId != "" {
// Request for specific version
glog.V(2).Infof("PutObjectAclHandler: modifying ACL for specific version %s of %s%s", versionId, bucket, object)
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
} else {
// Request for latest version
glog.V(2).Infof("PutObjectAclHandler: modifying ACL for latest version of %s%s", bucket, object)
entry, err = s3a.getLatestObjectVersion(bucket, object)
}
if err != nil {
glog.Errorf("PutObjectAclHandler: Failed to get object version %s for %s%s: %v", versionId, bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
// Check if this is a delete marker
if entry.Extended != nil {
if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
}
} else {
// Handle regular (non-versioned) object ACL modification
bucketDir := s3a.option.BucketsPath + "/" + bucket
entry, err = s3a.getEntry(bucketDir, object)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
glog.Errorf("PutObjectAclHandler: error checking object %s/%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
}
if entry == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
@@ -208,14 +298,44 @@ func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Reque
// Store ACL in object metadata
if errCode := AssembleEntryWithAcp(entry, objectOwner, grants); errCode != s3err.ErrNone {
glog.Errorf("PutObjectAclHandler: failed to assemble entry with ACP: %v", errCode)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
s3err.WriteErrorResponse(w, r, errCode)
return
}
// Calculate the correct directory for ACL update
var updateDirectory string
if versioningConfigured {
if versionId != "" && versionId != "null" {
// Versioned object - update the specific version file in .versions directory
updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + ".versions"
} else {
// Latest version in versioned bucket - could be null version or versioned object
// Extract version ID from the entry to determine where it's stored
var actualVersionId string
if entry.Extended != nil {
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
actualVersionId = string(versionIdBytes)
}
}
if actualVersionId == "null" || actualVersionId == "" {
// Null version (pre-versioning object) - stored as regular file
updateDirectory = s3a.option.BucketsPath + "/" + bucket
} else {
// Versioned object - stored in .versions directory
updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + ".versions"
}
}
} else {
// Non-versioned object - stored as regular file
updateDirectory = s3a.option.BucketsPath + "/" + bucket
}
// Update the object with new ACL metadata
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
Directory: bucketDir,
Directory: updateDirectory,
Entry: entry,
}

View File

@@ -32,8 +32,8 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
// Check for specific version ID in query parameters
versionId := r.URL.Query().Get("versionId")
// Check if versioning is configured for the bucket (Enabled or Suspended)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
// Get detailed versioning state for proper handling of suspended vs enabled versioning
versioningState, err := s3a.getVersioningState(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
@@ -44,14 +44,19 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
return
}
versioningEnabled := (versioningState == s3_constants.VersioningEnabled)
versioningSuspended := (versioningState == s3_constants.VersioningSuspended)
versioningConfigured := (versioningState != "")
var auditLog *s3err.AccessLog
if s3err.Logger != nil {
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
}
if versioningConfigured {
// Handle versioned delete
// Handle versioned delete based on specific versioning state
if versionId != "" {
// Delete specific version (same for both enabled and suspended)
// Check object lock permissions before deleting specific version
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
if err := s3a.enforceObjectLockProtections(r, bucket, object, versionId, governanceBypassAllowed); err != nil {
@@ -71,19 +76,44 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
// Set version ID in response header
w.Header().Set("x-amz-version-id", versionId)
} else {
// Create delete marker (logical delete)
// AWS S3 behavior: Delete marker creation is NOT blocked by object retention
// because it's a logical delete that doesn't actually remove the retained version
deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object)
if err != nil {
glog.Errorf("Failed to create delete marker: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Delete without version ID - behavior depends on versioning state
if versioningEnabled {
// Enabled versioning: Create delete marker (logical delete)
// AWS S3 behavior: Delete marker creation is NOT blocked by object retention
// because it's a logical delete that doesn't actually remove the retained version
deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object)
if err != nil {
glog.Errorf("Failed to create delete marker: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Set delete marker version ID in response header
w.Header().Set("x-amz-version-id", deleteMarkerVersionId)
w.Header().Set("x-amz-delete-marker", "true")
// Set delete marker version ID in response header
w.Header().Set("x-amz-version-id", deleteMarkerVersionId)
w.Header().Set("x-amz-delete-marker", "true")
} else if versioningSuspended {
// Suspended versioning: Actually delete the "null" version object
glog.V(2).Infof("DeleteObjectHandler: deleting null version for suspended versioning %s/%s", bucket, object)
// Check object lock permissions before deleting "null" version
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
if err := s3a.enforceObjectLockProtections(r, bucket, object, "null", governanceBypassAllowed); err != nil {
glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
}
// Delete the "null" version (the regular file)
err := s3a.deleteSpecificObjectVersion(bucket, object, "null")
if err != nil {
glog.Errorf("Failed to delete null version: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Note: According to AWS S3 spec, suspended versioning should NOT return version ID headers
// The object is deleted but no version information is returned
}
}
} else {
// Handle regular delete (non-versioned)
@@ -203,8 +233,8 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
}
// Check if versioning is configured for the bucket (needed for object lock checks)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
// Get detailed versioning state for proper handling of suspended vs enabled versioning
versioningState, err := s3a.getVersioningState(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
@@ -215,6 +245,10 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
return
}
versioningEnabled := (versioningState == s3_constants.VersioningEnabled)
versioningSuspended := (versioningState == s3_constants.VersioningSuspended)
versioningConfigured := (versioningState != "")
s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// delete file entries
@@ -243,9 +277,9 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
var isDeleteMarker bool
if versioningConfigured {
// Handle versioned delete
// Handle versioned delete based on specific versioning state
if object.VersionId != "" {
// Delete specific version
// Delete specific version (same for both enabled and suspended)
err := s3a.deleteSpecificObjectVersion(bucket, object.Key, object.VersionId)
if err != nil {
deleteErrors = append(deleteErrors, DeleteError{
@@ -258,19 +292,39 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
deleteVersionId = object.VersionId
} else {
// Create delete marker (logical delete)
deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object.Key)
if err != nil {
deleteErrors = append(deleteErrors, DeleteError{
Code: "",
Message: err.Error(),
Key: object.Key,
VersionId: object.VersionId,
})
continue
// Delete without version ID - behavior depends on versioning state
if versioningEnabled {
// Enabled versioning: Create delete marker (logical delete)
deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object.Key)
if err != nil {
deleteErrors = append(deleteErrors, DeleteError{
Code: "",
Message: err.Error(),
Key: object.Key,
VersionId: object.VersionId,
})
continue
}
deleteVersionId = deleteMarkerVersionId
isDeleteMarker = true
} else if versioningSuspended {
// Suspended versioning: Actually delete the "null" version object
glog.V(2).Infof("DeleteMultipleObjectsHandler: deleting null version for suspended versioning %s/%s", bucket, object.Key)
err := s3a.deleteSpecificObjectVersion(bucket, object.Key, "null")
if err != nil {
deleteErrors = append(deleteErrors, DeleteError{
Code: "",
Message: err.Error(),
Key: object.Key,
VersionId: "null",
})
continue
}
deleteVersionId = "null"
// Note: For suspended versioning, we don't set isDeleteMarker=true
// because we actually deleted the object, not created a delete marker
}
deleteVersionId = deleteMarkerVersionId
isDeleteMarker = true
}
// Add to successful deletions with version info

View File

@@ -4,16 +4,17 @@ import (
"context"
"encoding/xml"
"fmt"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
type OptionalString struct {
@@ -356,6 +357,9 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
return
}
// Track .versions directories found in this directory for later processing
var versionsDirs []string
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
@@ -386,6 +390,14 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
if entry.Name == s3_constants.MultipartUploadsFolder { // FIXME no need to apply to all directories. this extra also affects maxKeys
continue
}
// Skip .versions directories in regular list operations but track them for logical object creation
if strings.HasSuffix(entry.Name, ".versions") {
glog.V(4).Infof("Found .versions directory: %s", entry.Name)
versionsDirs = append(versionsDirs, entry.Name)
continue
}
if delimiter != "/" || cursor.prefixEndsOnDelimiter {
if cursor.prefixEndsOnDelimiter {
cursor.prefixEndsOnDelimiter = false
@@ -425,6 +437,48 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
cursor.prefixEndsOnDelimiter = false
}
}
// After processing all regular entries, handle versioned objects
// Create logical entries for objects that have .versions directories
for _, versionsDir := range versionsDirs {
if cursor.maxKeys <= 0 {
cursor.isTruncated = true
break
}
// Extract object name from .versions directory name (remove .versions suffix)
baseObjectName := strings.TrimSuffix(versionsDir, ".versions")
// Construct full object path relative to bucket
// dir is something like "/buckets/sea-test-1/Veeam/Backup/vbr/Config"
// we need to get the path relative to bucket: "Veeam/Backup/vbr/Config/Owner"
bucketPath := strings.TrimPrefix(dir, s3a.option.BucketsPath+"/")
bucketName := strings.Split(bucketPath, "/")[0]
// Remove bucket name from path to get directory within bucket
bucketRelativePath := strings.Join(strings.Split(bucketPath, "/")[1:], "/")
var fullObjectPath string
if bucketRelativePath == "" {
// Object is at bucket root
fullObjectPath = baseObjectName
} else {
// Object is in subdirectory
fullObjectPath = bucketRelativePath + "/" + baseObjectName
}
glog.V(4).Infof("Processing versioned object: baseObjectName=%s, bucketRelativePath=%s, fullObjectPath=%s",
baseObjectName, bucketRelativePath, fullObjectPath)
// Get the latest version information for this object
if latestVersionEntry, latestVersionErr := s3a.getLatestVersionEntryForListOperation(bucketName, fullObjectPath); latestVersionErr == nil {
glog.V(4).Infof("Creating logical entry for versioned object: %s", fullObjectPath)
eachEntryFn(dir, latestVersionEntry)
} else {
glog.V(4).Infof("Failed to get latest version for %s: %v", fullObjectPath, latestVersionErr)
}
}
return
}
@@ -513,3 +567,32 @@ func (s3a *S3ApiServer) ensureDirectoryAllEmpty(filerClient filer_pb.SeaweedFile
return true, nil
}
// getLatestVersionEntryForListOperation gets the latest version of an object and creates a logical entry for list operations
// This is used to show versioned objects as logical object names in regular list operations
func (s3a *S3ApiServer) getLatestVersionEntryForListOperation(bucket, object string) (*filer_pb.Entry, error) {
// Get the latest version entry
latestVersionEntry, err := s3a.getLatestObjectVersion(bucket, object)
if err != nil {
return nil, fmt.Errorf("failed to get latest version: %w", err)
}
// Check if this is a delete marker (should not be shown in regular list)
if latestVersionEntry.Extended != nil {
if deleteMarker, exists := latestVersionEntry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
return nil, fmt.Errorf("latest version is a delete marker")
}
}
// Create a logical entry that appears to be stored at the object path (not the versioned path)
// This allows the list operation to show the logical object name while preserving all metadata
logicalEntry := &filer_pb.Entry{
Name: strings.TrimPrefix(object, "/"),
IsDirectory: false,
Attributes: latestVersionEntry.Attributes,
Extended: latestVersionEntry.Extended,
Chunks: latestVersionEntry.Chunks,
}
return logicalEntry, nil
}

View File

@@ -90,6 +90,9 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
entry.Content, _ = io.ReadAll(r.Body)
}
entry.Attributes.Mime = objectContentType
// Set object owner for directory objects (same as regular objects)
s3a.setObjectOwnerFromRequest(r, entry)
}); err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return

View File

@@ -19,18 +19,31 @@ import (
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
// ObjectVersion represents a version of an S3 object
type ObjectVersion struct {
VersionId string
IsLatest bool
IsDeleteMarker bool
LastModified time.Time
ETag string
Size int64
Entry *filer_pb.Entry
// S3ListObjectVersionsResult - Custom struct for S3 list-object-versions response
// This avoids conflicts with the XSD generated ListVersionsResult struct
// and ensures proper separation of versions and delete markers into arrays
type S3ListObjectVersionsResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListVersionsResult"`
Name string `xml:"Name"`
Prefix string `xml:"Prefix,omitempty"`
KeyMarker string `xml:"KeyMarker,omitempty"`
VersionIdMarker string `xml:"VersionIdMarker,omitempty"`
NextKeyMarker string `xml:"NextKeyMarker,omitempty"`
NextVersionIdMarker string `xml:"NextVersionIdMarker,omitempty"`
MaxKeys int `xml:"MaxKeys"`
Delimiter string `xml:"Delimiter,omitempty"`
IsTruncated bool `xml:"IsTruncated"`
// These are the critical fields - arrays instead of single elements
Versions []VersionEntry `xml:"Version,omitempty"` // Array for versions
DeleteMarkers []DeleteMarkerEntry `xml:"DeleteMarker,omitempty"` // Array for delete markers
CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
EncodingType string `xml:"EncodingType,omitempty"`
}
// ListObjectVersionsResult represents the response for ListObjectVersions
// Original struct - keeping for compatibility but will use S3ListObjectVersionsResult for XML response
type ListObjectVersionsResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListVersionsResult"`
Name string `xml:"Name"`
@@ -47,6 +60,17 @@ type ListObjectVersionsResult struct {
CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
}
// ObjectVersion represents a version of an S3 object
type ObjectVersion struct {
VersionId string
IsLatest bool
IsDeleteMarker bool
LastModified time.Time
ETag string
Size int64
Entry *filer_pb.Entry
}
// generateVersionId creates a unique version ID that preserves chronological order
func generateVersionId() string {
// Use nanosecond timestamp to ensure chronological ordering
@@ -124,7 +148,7 @@ func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error
}
// listObjectVersions lists all versions of an object
func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int) (*ListObjectVersionsResult, error) {
func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int) (*S3ListObjectVersionsResult, error) {
var allVersions []interface{} // Can contain VersionEntry or DeleteMarkerEntry
// Track objects that have been processed to avoid duplicates
@@ -184,8 +208,8 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
return versionIdI > versionIdJ
})
// Build result
result := &ListObjectVersionsResult{
// Build result using S3ListObjectVersionsResult to avoid conflicts with XSD structs
result := &S3ListObjectVersionsResult{
Name: bucket,
Prefix: prefix,
KeyMarker: keyMarker,
@@ -296,7 +320,35 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string
}
}
} else {
// Recursively search subdirectories
// This is a regular directory - check if it's an explicit S3 directory object
// Only include directories that were explicitly created via S3 API (have FolderMimeType)
// This excludes implicit directories created when uploading files like "test1/a"
if entry.Attributes.Mime == s3_constants.FolderMimeType {
directoryKey := entryPath
if !strings.HasSuffix(directoryKey, "/") {
directoryKey += "/"
}
// Add directory as a version entry with VersionId "null" (following S3/Minio behavior)
glog.V(2).Infof("findVersionsRecursively: found explicit S3 directory %s", directoryKey)
// Calculate ETag for empty directory
directoryETag := "\"d41d8cd98f00b204e9800998ecf8427e\""
versionEntry := &VersionEntry{
Key: directoryKey,
VersionId: "null",
IsLatest: true,
LastModified: time.Unix(entry.Attributes.Mtime, 0),
ETag: directoryETag,
Size: 0, // Directories have size 0
Owner: s3a.getObjectOwnerFromEntry(entry),
StorageClass: "STANDARD",
}
*allVersions = append(*allVersions, versionEntry)
}
// Recursively search subdirectories (regardless of whether they're explicit or implicit)
fullPath := path.Join(currentPath, entry.Name)
err := s3a.findVersionsRecursively(fullPath, entryPath, allVersions, processedObjects, seenVersionIds, bucket, prefix)
if err != nil {
@@ -529,13 +581,7 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st
versionsDir := s3a.getVersionedObjectDir(bucket, object)
versionFile := s3a.getVersionFileName(versionId)
// Delete the specific version from .versions directory
_, err := s3a.getEntry(versionsDir, versionFile)
if err != nil {
return fmt.Errorf("version %s not found: %v", versionId, err)
}
// Check if this is the latest version before deleting
// Check if this is the latest version before attempting deletion (for potential metadata update)
versionsEntry, dirErr := s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), object+".versions")
isLatestVersion := false
if dirErr == nil && versionsEntry.Extended != nil {
@@ -544,15 +590,19 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st
}
}
// Delete the version file
// Attempt to delete the version file
// Note: We don't check if the file exists first to avoid race conditions
// The deletion operation should be idempotent
deleteErr := s3a.rm(versionsDir, versionFile, true, false)
if deleteErr != nil {
// Check if file was already deleted by another process
// Check if file was already deleted by another process (race condition handling)
if _, checkErr := s3a.getEntry(versionsDir, versionFile); checkErr != nil {
// File doesn't exist anymore, deletion was successful
} else {
return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr)
// File doesn't exist anymore, deletion was successful (another thread deleted it)
glog.V(2).Infof("deleteSpecificObjectVersion: version %s for %s%s already deleted by another process", versionId, bucket, object)
return nil
}
// File still exists but deletion failed for another reason
return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr)
}
// If we deleted the latest version, update the .versions directory metadata to point to the new latest