fix: S3 versioning memory leak in ListObjectVersions pagination (#7813)
* fix: S3 versioning memory leak in ListObjectVersions pagination This commit fixes a memory leak issue in S3 versioning buckets where ListObjectVersions with pagination (key-marker set) would collect ALL versions in the bucket before filtering, causing O(N) memory usage. Root cause: - When keyMarker was set, maxCollect was set to 0 (unlimited) - This caused findVersionsRecursively to traverse the entire bucket - All versions were collected into memory, sorted, then filtered Fix: - Updated findVersionsRecursively to accept keyMarker and versionIdMarker - Skips objects/versions before the marker during recursion (not after) - Always respects maxCollect limit (never unlimited) - Memory usage is now O(maxKeys) instead of O(total versions) Refactoring: - Introduced versionCollector struct to encapsulate collection state - Extracted helper methods for cleaner, more testable code: - matchesPrefixFilter: prefix matching logic - shouldSkipObjectForMarker: keyMarker filtering - shouldSkipVersionForMarker: versionIdMarker filtering - processVersionsDirectory: .versions directory handling - processExplicitDirectory: S3 directory object handling - processRegularFile: pre-versioning file handling - collectVersions: main recursive collection loop - processDirectory: directory entry dispatch This reduces the high QPS on 'find' and 'prefixList' operations by skipping irrelevant objects during traversal. Fixes customer-reported memory leak with high find/prefixList QPS in Grafana for S3 versioning buckets. * s3: infer version ID format from ExtLatestVersionIdKey metadata Simplified version format detection: - Removed ExtVersionIdFormatKey - no longer needed - getVersionIdFormat() now infers format from ExtLatestVersionIdKey - Uses isNewFormatVersionId() to check if latest version uses inverted format This approach is simpler because: - ExtLatestVersionIdKey is already stored in .versions directory metadata - No need for separate format metadata field - Format is naturally determined by the existing version IDs
This commit is contained in:
@@ -1,8 +1,9 @@
|
||||
package s3api
|
||||
|
||||
// This file contains the core S3 versioning operations.
|
||||
// Version ID format handling is in s3api_version_id.go
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"net/http"
|
||||
@@ -73,50 +74,21 @@ type ObjectVersion struct {
|
||||
OwnerID string // Owner ID extracted from entry metadata
|
||||
}
|
||||
|
||||
// generateVersionId creates a unique version ID that preserves chronological order
|
||||
func generateVersionId() string {
|
||||
// Use nanosecond timestamp to ensure chronological ordering
|
||||
// Format as 16-digit hex (first 16 chars of version ID)
|
||||
now := time.Now().UnixNano()
|
||||
timestampHex := fmt.Sprintf("%016x", now)
|
||||
|
||||
// Generate random 8 bytes for uniqueness (last 16 chars of version ID)
|
||||
randBytes := make([]byte, 8)
|
||||
if _, err := rand.Read(randBytes); err != nil {
|
||||
glog.Errorf("Failed to generate random bytes for version ID: %v", err)
|
||||
// Fallback to timestamp-only if random generation fails
|
||||
return timestampHex + "0000000000000000"
|
||||
}
|
||||
|
||||
// Combine timestamp (16 chars) + random (16 chars) = 32 chars total
|
||||
randomHex := hex.EncodeToString(randBytes)
|
||||
versionId := timestampHex + randomHex
|
||||
|
||||
return versionId
|
||||
}
|
||||
|
||||
// getVersionedObjectDir returns the directory path for storing object versions
|
||||
func (s3a *S3ApiServer) getVersionedObjectDir(bucket, object string) string {
|
||||
return path.Join(s3a.option.BucketsPath, bucket, object+s3_constants.VersionsFolder)
|
||||
}
|
||||
|
||||
// getVersionFileName returns the filename for a specific version
|
||||
func (s3a *S3ApiServer) getVersionFileName(versionId string) string {
|
||||
return fmt.Sprintf("v_%s", versionId)
|
||||
}
|
||||
|
||||
// createDeleteMarker creates a delete marker for versioned delete operations
|
||||
func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error) {
|
||||
versionId := generateVersionId()
|
||||
// Clean up the object path first
|
||||
cleanObject := strings.TrimPrefix(object, "/")
|
||||
|
||||
glog.V(2).Infof("createDeleteMarker: creating delete marker %s for %s/%s", versionId, bucket, object)
|
||||
// Check if .versions directory exists to determine format
|
||||
useInvertedFormat := s3a.getVersionIdFormat(bucket, cleanObject)
|
||||
versionId := generateVersionId(useInvertedFormat)
|
||||
|
||||
glog.V(2).Infof("createDeleteMarker: creating delete marker %s for %s/%s (inverted=%v)", versionId, bucket, object, useInvertedFormat)
|
||||
|
||||
// Create the version file name for the delete marker
|
||||
versionFileName := s3a.getVersionFileName(versionId)
|
||||
|
||||
// Store delete marker in the .versions directory
|
||||
// Make sure to clean up the object path to remove leading slashes
|
||||
cleanObject := strings.TrimPrefix(object, "/")
|
||||
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
||||
versionsDir := bucketDir + "/" + cleanObject + s3_constants.VersionsFolder
|
||||
|
||||
@@ -159,7 +131,7 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
|
||||
// The extra 1 is for truncation detection
|
||||
allVersions := make([]interface{}, 0, maxKeys+1)
|
||||
|
||||
glog.V(1).Infof("listObjectVersions: listing versions for bucket %s, prefix '%s'", bucket, prefix)
|
||||
glog.V(1).Infof("listObjectVersions: listing versions for bucket %s, prefix '%s', keyMarker '%s', versionIdMarker '%s'", bucket, prefix, keyMarker, versionIdMarker)
|
||||
|
||||
// Track objects that have been processed to avoid duplicates
|
||||
processedObjects := make(map[string]bool)
|
||||
@@ -168,14 +140,19 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
|
||||
seenVersionIds := make(map[string]bool)
|
||||
|
||||
// Recursively find all .versions directories in the bucket
|
||||
// When keyMarker is set, we need to collect all versions since filtering happens after sorting
|
||||
// Pass 0 (unlimited) when keyMarker is set, otherwise maxKeys+1 for truncation detection
|
||||
// Pass keyMarker and versionIdMarker to enable efficient pagination (skip entries before marker)
|
||||
bucketPath := path.Join(s3a.option.BucketsPath, bucket)
|
||||
maxCollect := maxKeys + 1
|
||||
if keyMarker != "" {
|
||||
maxCollect = 0 // Collect all versions when paginating, filter after sort
|
||||
}
|
||||
err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix, maxCollect)
|
||||
|
||||
// Memory optimization: limit collection to maxKeys+1 versions.
|
||||
// This works correctly for objects using the NEW inverted-timestamp format, where
|
||||
// filesystem order (lexicographic) matches sorted order (newest-first).
|
||||
// For OLD format objects (raw timestamps), filesystem order is oldest-first, so
|
||||
// limiting collection may return older versions instead of newest. However:
|
||||
// - New objects going forward use the new format
|
||||
// - The alternative (collecting all) causes memory issues for buckets with many versions
|
||||
// - Pagination continues correctly; users can page through to see all versions
|
||||
maxCollect := maxKeys + 1 // +1 to detect truncation
|
||||
err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix, keyMarker, versionIdMarker, maxCollect)
|
||||
if err != nil {
|
||||
glog.Errorf("listObjectVersions: findVersionsRecursively failed: %v", err)
|
||||
return nil, err
|
||||
@@ -187,31 +164,27 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
|
||||
|
||||
glog.V(1).Infof("listObjectVersions: found %d total versions", len(allVersions))
|
||||
|
||||
// Sort by key, then by LastModified (newest first), then by VersionId for deterministic ordering
|
||||
// Sort by key, then by version (newest first)
|
||||
// Uses compareVersionIds to handle both old and new format version IDs
|
||||
sort.Slice(allVersions, func(i, j int) bool {
|
||||
var keyI, keyJ string
|
||||
var lastModifiedI, lastModifiedJ time.Time
|
||||
var versionIdI, versionIdJ string
|
||||
|
||||
switch v := allVersions[i].(type) {
|
||||
case *VersionEntry:
|
||||
keyI = v.Key
|
||||
lastModifiedI = v.LastModified
|
||||
versionIdI = v.VersionId
|
||||
case *DeleteMarkerEntry:
|
||||
keyI = v.Key
|
||||
lastModifiedI = v.LastModified
|
||||
versionIdI = v.VersionId
|
||||
}
|
||||
|
||||
switch v := allVersions[j].(type) {
|
||||
case *VersionEntry:
|
||||
keyJ = v.Key
|
||||
lastModifiedJ = v.LastModified
|
||||
versionIdJ = v.VersionId
|
||||
case *DeleteMarkerEntry:
|
||||
keyJ = v.Key
|
||||
lastModifiedJ = v.LastModified
|
||||
versionIdJ = v.VersionId
|
||||
}
|
||||
|
||||
@@ -220,53 +193,11 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
|
||||
return keyI < keyJ
|
||||
}
|
||||
|
||||
// Then by modification time (newest first) - but use nanosecond precision for ties
|
||||
timeDiff := lastModifiedI.Sub(lastModifiedJ)
|
||||
if timeDiff.Abs() > time.Millisecond {
|
||||
return lastModifiedI.After(lastModifiedJ)
|
||||
}
|
||||
|
||||
// For very close timestamps (within 1ms), use version ID for deterministic ordering
|
||||
// Sort version IDs in reverse lexicographic order to maintain newest-first semantics
|
||||
return versionIdI > versionIdJ
|
||||
// Then by version ID (newest first)
|
||||
// compareVersionIds handles both old (raw timestamp) and new (inverted timestamp) formats
|
||||
return compareVersionIds(versionIdI, versionIdJ) < 0
|
||||
})
|
||||
|
||||
// Apply key-marker and version-id-marker filtering
|
||||
// S3 pagination: skip versions at or before the marker, return versions AFTER the marker
|
||||
// Versions are sorted: key ascending, then versionId descending (newest first for same key)
|
||||
//
|
||||
// S3 behavior:
|
||||
// - If key-marker is specified without version-id-marker: start after ALL versions of key-marker
|
||||
// - If both are specified: start after the specific version of key-marker
|
||||
if keyMarker != "" {
|
||||
filteredVersions := make([]interface{}, 0, len(allVersions))
|
||||
for _, version := range allVersions {
|
||||
var key, versionId string
|
||||
switch v := version.(type) {
|
||||
case *VersionEntry:
|
||||
key = v.Key
|
||||
versionId = v.VersionId
|
||||
case *DeleteMarkerEntry:
|
||||
key = v.Key
|
||||
versionId = v.VersionId
|
||||
}
|
||||
|
||||
// Include this version if it's AFTER the marker
|
||||
if key > keyMarker {
|
||||
// Key is after marker key: always include
|
||||
filteredVersions = append(filteredVersions, version)
|
||||
} else if key == keyMarker && versionIdMarker != "" && versionId < versionIdMarker {
|
||||
// Same key, but version is after the marker version (versionIds sorted descending)
|
||||
filteredVersions = append(filteredVersions, version)
|
||||
}
|
||||
// else: key < keyMarker OR (key == keyMarker with no versionIdMarker or version already seen)
|
||||
// skip this version (it was in a previous page)
|
||||
}
|
||||
glog.V(1).Infof("listObjectVersions: after applying markers (key=%s, versionId=%s), %d -> %d versions",
|
||||
keyMarker, versionIdMarker, len(allVersions), len(filteredVersions))
|
||||
allVersions = filteredVersions
|
||||
}
|
||||
|
||||
// Build result using S3ListObjectVersionsResult to avoid conflicts with XSD structs
|
||||
result := &S3ListObjectVersionsResult{
|
||||
Name: bucket,
|
||||
@@ -320,255 +251,310 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// findVersionsRecursively searches for all .versions directories and regular files recursively
|
||||
// maxCollect limits the number of versions to collect for memory efficiency (0 = unlimited)
|
||||
func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix string, maxCollect int) error {
|
||||
// List entries in current directory with pagination
|
||||
startFrom := ""
|
||||
for {
|
||||
// Early termination: stop if we've collected enough versions
|
||||
if maxCollect > 0 && len(*allVersions) >= maxCollect {
|
||||
// versionCollector holds state for collecting object versions during recursive traversal
|
||||
type versionCollector struct {
|
||||
s3a *S3ApiServer
|
||||
bucket string
|
||||
prefix string
|
||||
keyMarker string
|
||||
versionIdMarker string
|
||||
maxCollect int
|
||||
allVersions *[]interface{}
|
||||
processedObjects map[string]bool
|
||||
seenVersionIds map[string]bool
|
||||
}
|
||||
|
||||
// isFull returns true if we've collected enough versions
|
||||
func (vc *versionCollector) isFull() bool {
|
||||
return vc.maxCollect > 0 && len(*vc.allVersions) >= vc.maxCollect
|
||||
}
|
||||
|
||||
// matchesPrefixFilter checks if an entry path matches the prefix filter
|
||||
func (vc *versionCollector) matchesPrefixFilter(entryPath string, isDirectory bool) bool {
|
||||
normalizedPrefix := strings.TrimPrefix(vc.prefix, "/")
|
||||
if normalizedPrefix == "" {
|
||||
return true
|
||||
}
|
||||
|
||||
// Entry matches if its path starts with the prefix
|
||||
isMatch := strings.HasPrefix(entryPath, normalizedPrefix)
|
||||
if !isMatch && isDirectory {
|
||||
// Directory might match with trailing slash
|
||||
isMatch = strings.HasPrefix(entryPath+"/", normalizedPrefix)
|
||||
}
|
||||
|
||||
// For directories, also check if we need to descend (prefix is deeper)
|
||||
canDescend := isDirectory && strings.HasPrefix(normalizedPrefix, entryPath)
|
||||
|
||||
return isMatch || canDescend
|
||||
}
|
||||
|
||||
// shouldSkipObjectForMarker returns true if the object should be skipped based on keyMarker
|
||||
func (vc *versionCollector) shouldSkipObjectForMarker(objectKey string) bool {
|
||||
if vc.keyMarker == "" {
|
||||
return false
|
||||
}
|
||||
return objectKey < vc.keyMarker
|
||||
}
|
||||
|
||||
// shouldSkipVersionForMarker returns true if a version should be skipped based on markers
|
||||
// For the keyMarker object, skip versions that are newer than or equal to versionIdMarker
|
||||
// (these were already returned in previous pages).
|
||||
// Handles both old (raw timestamp) and new (inverted timestamp) version ID formats.
|
||||
func (vc *versionCollector) shouldSkipVersionForMarker(objectKey, versionId string) bool {
|
||||
if vc.keyMarker == "" || objectKey != vc.keyMarker {
|
||||
return false
|
||||
}
|
||||
// Object matches keyMarker - apply version filtering
|
||||
if vc.versionIdMarker == "" {
|
||||
// No versionIdMarker means skip ALL versions of this key (they were all returned in previous pages)
|
||||
return true
|
||||
}
|
||||
// Skip versions that are newer than or equal to versionIdMarker
|
||||
// compareVersionIds returns negative if versionId is newer than marker
|
||||
// We skip if versionId is newer (negative) or equal (zero) to the marker
|
||||
cmp := compareVersionIds(versionId, vc.versionIdMarker)
|
||||
return cmp <= 0
|
||||
}
|
||||
|
||||
// addVersion adds a version or delete marker to results
|
||||
func (vc *versionCollector) addVersion(version *ObjectVersion, objectKey string) {
|
||||
if version.IsDeleteMarker {
|
||||
deleteMarker := &DeleteMarkerEntry{
|
||||
Key: objectKey,
|
||||
VersionId: version.VersionId,
|
||||
IsLatest: version.IsLatest,
|
||||
LastModified: version.LastModified,
|
||||
Owner: vc.s3a.getObjectOwnerFromVersion(version, vc.bucket, objectKey),
|
||||
}
|
||||
*vc.allVersions = append(*vc.allVersions, deleteMarker)
|
||||
} else {
|
||||
versionEntry := &VersionEntry{
|
||||
Key: objectKey,
|
||||
VersionId: version.VersionId,
|
||||
IsLatest: version.IsLatest,
|
||||
LastModified: version.LastModified,
|
||||
ETag: version.ETag,
|
||||
Size: version.Size,
|
||||
Owner: vc.s3a.getObjectOwnerFromVersion(version, vc.bucket, objectKey),
|
||||
StorageClass: "STANDARD",
|
||||
}
|
||||
*vc.allVersions = append(*vc.allVersions, versionEntry)
|
||||
}
|
||||
}
|
||||
|
||||
// processVersionsDirectory handles a .versions directory entry
|
||||
func (vc *versionCollector) processVersionsDirectory(entryPath string) error {
|
||||
objectKey := strings.TrimSuffix(entryPath, s3_constants.VersionsFolder)
|
||||
normalizedObjectKey := removeDuplicateSlashes(objectKey)
|
||||
|
||||
// Mark as processed
|
||||
vc.processedObjects[objectKey] = true
|
||||
vc.processedObjects[normalizedObjectKey] = true
|
||||
|
||||
// Skip objects before keyMarker
|
||||
if vc.shouldSkipObjectForMarker(normalizedObjectKey) {
|
||||
glog.V(4).Infof("processVersionsDirectory: skipping object %s (before keyMarker %s)", normalizedObjectKey, vc.keyMarker)
|
||||
return nil
|
||||
}
|
||||
|
||||
glog.V(2).Infof("processVersionsDirectory: found object %s", normalizedObjectKey)
|
||||
|
||||
versions, err := vc.s3a.getObjectVersionList(vc.bucket, normalizedObjectKey)
|
||||
if err != nil {
|
||||
glog.Warningf("processVersionsDirectory: failed to get versions for %s: %v", normalizedObjectKey, err)
|
||||
return nil // Continue with other entries
|
||||
}
|
||||
|
||||
for _, version := range versions {
|
||||
if vc.isFull() {
|
||||
return nil
|
||||
}
|
||||
|
||||
entries, isLast, err := s3a.list(currentPath, "", startFrom, false, filer.PaginationSize)
|
||||
versionKey := normalizedObjectKey + ":" + version.VersionId
|
||||
if vc.seenVersionIds[versionKey] {
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip versions that were already returned in previous pages
|
||||
if vc.shouldSkipVersionForMarker(normalizedObjectKey, version.VersionId) {
|
||||
continue
|
||||
}
|
||||
|
||||
vc.seenVersionIds[versionKey] = true
|
||||
vc.addVersion(version, normalizedObjectKey)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// processExplicitDirectory handles an explicit S3 directory object
|
||||
func (vc *versionCollector) processExplicitDirectory(entryPath string, entry *filer_pb.Entry) {
|
||||
directoryKey := entryPath
|
||||
if !strings.HasSuffix(directoryKey, "/") {
|
||||
directoryKey += "/"
|
||||
}
|
||||
|
||||
// Skip directories at or before keyMarker
|
||||
if vc.keyMarker != "" && directoryKey <= vc.keyMarker {
|
||||
return
|
||||
}
|
||||
|
||||
versionEntry := &VersionEntry{
|
||||
Key: directoryKey,
|
||||
VersionId: "null",
|
||||
IsLatest: true,
|
||||
LastModified: time.Unix(entry.Attributes.Mtime, 0),
|
||||
ETag: "\"d41d8cd98f00b204e9800998ecf8427e\"", // Empty content ETag
|
||||
Size: 0,
|
||||
Owner: vc.s3a.getObjectOwnerFromEntry(entry),
|
||||
StorageClass: "STANDARD",
|
||||
}
|
||||
*vc.allVersions = append(*vc.allVersions, versionEntry)
|
||||
}
|
||||
|
||||
// processRegularFile handles a regular file entry (pre-versioning or suspended-versioning object)
|
||||
func (vc *versionCollector) processRegularFile(currentPath, entryPath string, entry *filer_pb.Entry) {
|
||||
objectKey := entryPath
|
||||
normalizedObjectKey := removeDuplicateSlashes(objectKey)
|
||||
|
||||
// Skip files before keyMarker
|
||||
if vc.shouldSkipObjectForMarker(normalizedObjectKey) {
|
||||
return
|
||||
}
|
||||
|
||||
// For keyMarker match, skip if this null version was already returned
|
||||
if vc.shouldSkipVersionForMarker(normalizedObjectKey, "null") {
|
||||
return
|
||||
}
|
||||
|
||||
// Skip if already processed via .versions directory
|
||||
if vc.processedObjects[objectKey] || vc.processedObjects[normalizedObjectKey] {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if this file has version metadata
|
||||
hasVersionMeta := entry.Extended != nil && entry.Extended[s3_constants.ExtVersionIdKey] != nil
|
||||
|
||||
// Check if a .versions directory exists for this object
|
||||
versionsEntryName := entry.Name + s3_constants.VersionsFolder
|
||||
_, versionsErr := vc.s3a.getEntry(currentPath, versionsEntryName)
|
||||
if versionsErr == nil && !hasVersionMeta {
|
||||
// .versions exists but file has no version metadata - check for null version in .versions
|
||||
versions, err := vc.s3a.getObjectVersionList(vc.bucket, normalizedObjectKey)
|
||||
if err == nil {
|
||||
for _, v := range versions {
|
||||
if v.VersionId == "null" {
|
||||
// Null version exists in .versions, skip this file
|
||||
vc.processedObjects[objectKey] = true
|
||||
vc.processedObjects[normalizedObjectKey] = true
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check for duplicate
|
||||
versionKey := normalizedObjectKey + ":null"
|
||||
if vc.seenVersionIds[versionKey] {
|
||||
return
|
||||
}
|
||||
vc.seenVersionIds[versionKey] = true
|
||||
|
||||
versionEntry := &VersionEntry{
|
||||
Key: normalizedObjectKey,
|
||||
VersionId: "null",
|
||||
IsLatest: true,
|
||||
LastModified: time.Unix(entry.Attributes.Mtime, 0),
|
||||
ETag: vc.s3a.calculateETagFromChunks(entry.Chunks),
|
||||
Size: int64(entry.Attributes.FileSize),
|
||||
Owner: vc.s3a.getObjectOwnerFromEntry(entry),
|
||||
StorageClass: "STANDARD",
|
||||
}
|
||||
*vc.allVersions = append(*vc.allVersions, versionEntry)
|
||||
}
|
||||
|
||||
// findVersionsRecursively searches for .versions directories and regular files recursively
|
||||
// with efficient pagination support. It skips objects before keyMarker and applies versionIdMarker filtering.
|
||||
// maxCollect limits the number of versions to collect for memory efficiency (must be > 0)
|
||||
func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix, keyMarker, versionIdMarker string, maxCollect int) error {
|
||||
vc := &versionCollector{
|
||||
s3a: s3a,
|
||||
bucket: bucket,
|
||||
prefix: prefix,
|
||||
keyMarker: keyMarker,
|
||||
versionIdMarker: versionIdMarker,
|
||||
maxCollect: maxCollect,
|
||||
allVersions: allVersions,
|
||||
processedObjects: processedObjects,
|
||||
seenVersionIds: seenVersionIds,
|
||||
}
|
||||
|
||||
return vc.collectVersions(currentPath, relativePath)
|
||||
}
|
||||
|
||||
// collectVersions recursively collects versions from the given path
|
||||
func (vc *versionCollector) collectVersions(currentPath, relativePath string) error {
|
||||
startFrom := ""
|
||||
for {
|
||||
if vc.isFull() {
|
||||
return nil
|
||||
}
|
||||
|
||||
entries, isLast, err := vc.s3a.list(currentPath, "", startFrom, false, filer.PaginationSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
// Early termination check inside loop
|
||||
if maxCollect > 0 && len(*allVersions) >= maxCollect {
|
||||
if vc.isFull() {
|
||||
return nil
|
||||
}
|
||||
// Track last entry name for pagination
|
||||
startFrom = entry.Name
|
||||
|
||||
entryPath := path.Join(relativePath, entry.Name)
|
||||
|
||||
// Skip if this doesn't match the prefix filter
|
||||
if normalizedPrefix := strings.TrimPrefix(prefix, "/"); normalizedPrefix != "" {
|
||||
// An entry is a candidate if:
|
||||
// 1. Its path is a match for the prefix.
|
||||
// 2. It is a directory that is an ancestor of the prefix path, so we must descend into it.
|
||||
|
||||
// Condition 1: The entry's path starts with the prefix.
|
||||
isMatch := strings.HasPrefix(entryPath, normalizedPrefix)
|
||||
if !isMatch && entry.IsDirectory {
|
||||
// Also check if a directory entry matches a directory-style prefix (e.g., prefix "a/", entry "a").
|
||||
isMatch = strings.HasPrefix(entryPath+"/", normalizedPrefix)
|
||||
}
|
||||
|
||||
// Condition 2: The prefix path starts with the entry's path (and it's a directory).
|
||||
canDescend := entry.IsDirectory && strings.HasPrefix(normalizedPrefix, entryPath)
|
||||
|
||||
if !isMatch && !canDescend {
|
||||
continue
|
||||
}
|
||||
if !vc.matchesPrefixFilter(entryPath, entry.IsDirectory) {
|
||||
continue
|
||||
}
|
||||
|
||||
if entry.IsDirectory {
|
||||
// Skip .uploads directory (multipart upload temporary files)
|
||||
if strings.HasPrefix(entry.Name, ".uploads") {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if this is a .versions directory
|
||||
if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) {
|
||||
// Extract object name from .versions directory name
|
||||
objectKey := strings.TrimSuffix(entryPath, s3_constants.VersionsFolder)
|
||||
normalizedObjectKey := removeDuplicateSlashes(objectKey)
|
||||
// Mark both keys as processed for backward compatibility
|
||||
processedObjects[objectKey] = true
|
||||
processedObjects[normalizedObjectKey] = true
|
||||
|
||||
glog.V(2).Infof("Found .versions directory for object %s (normalized: %s)", objectKey, normalizedObjectKey)
|
||||
|
||||
versions, err := s3a.getObjectVersionList(bucket, normalizedObjectKey)
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to get versions for object %s (normalized: %s): %v", objectKey, normalizedObjectKey, err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, version := range versions {
|
||||
// Check for duplicate version IDs and skip if already seen
|
||||
// Use normalized key for deduplication
|
||||
versionKey := normalizedObjectKey + ":" + version.VersionId
|
||||
if seenVersionIds[versionKey] {
|
||||
glog.Warningf("findVersionsRecursively: duplicate version %s for object %s detected, skipping", version.VersionId, normalizedObjectKey)
|
||||
continue
|
||||
}
|
||||
seenVersionIds[versionKey] = true
|
||||
|
||||
if version.IsDeleteMarker {
|
||||
glog.V(4).Infof("Adding delete marker from .versions: objectKey=%s, versionId=%s, isLatest=%v, versionKey=%s",
|
||||
normalizedObjectKey, version.VersionId, version.IsLatest, versionKey)
|
||||
deleteMarker := &DeleteMarkerEntry{
|
||||
Key: normalizedObjectKey, // Use normalized key for consistency
|
||||
VersionId: version.VersionId,
|
||||
IsLatest: version.IsLatest,
|
||||
LastModified: version.LastModified,
|
||||
Owner: s3a.getObjectOwnerFromVersion(version, bucket, normalizedObjectKey),
|
||||
}
|
||||
*allVersions = append(*allVersions, deleteMarker)
|
||||
} else {
|
||||
glog.V(4).Infof("Adding version from .versions: objectKey=%s, versionId=%s, isLatest=%v, versionKey=%s",
|
||||
normalizedObjectKey, version.VersionId, version.IsLatest, versionKey)
|
||||
versionEntry := &VersionEntry{
|
||||
Key: normalizedObjectKey, // Use normalized key for consistency
|
||||
VersionId: version.VersionId,
|
||||
IsLatest: version.IsLatest,
|
||||
LastModified: version.LastModified,
|
||||
ETag: version.ETag,
|
||||
Size: version.Size,
|
||||
Owner: s3a.getObjectOwnerFromVersion(version, bucket, normalizedObjectKey),
|
||||
StorageClass: "STANDARD",
|
||||
}
|
||||
*allVersions = append(*allVersions, versionEntry)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// This is a regular directory - check if it's an explicit S3 directory object
|
||||
// Only include directories that were explicitly created via S3 API (have FolderMimeType)
|
||||
// This excludes implicit directories created when uploading files like "test1/a"
|
||||
if entry.Attributes.Mime == s3_constants.FolderMimeType {
|
||||
directoryKey := entryPath
|
||||
if !strings.HasSuffix(directoryKey, "/") {
|
||||
directoryKey += "/"
|
||||
}
|
||||
|
||||
// Add directory as a version entry with VersionId "null" (following S3/Minio behavior)
|
||||
glog.V(2).Infof("findVersionsRecursively: found explicit S3 directory %s", directoryKey)
|
||||
|
||||
// Calculate ETag for empty directory
|
||||
directoryETag := "\"d41d8cd98f00b204e9800998ecf8427e\""
|
||||
|
||||
versionEntry := &VersionEntry{
|
||||
Key: directoryKey,
|
||||
VersionId: "null",
|
||||
IsLatest: true,
|
||||
LastModified: time.Unix(entry.Attributes.Mtime, 0),
|
||||
ETag: directoryETag,
|
||||
Size: 0, // Directories have size 0
|
||||
Owner: s3a.getObjectOwnerFromEntry(entry),
|
||||
StorageClass: "STANDARD",
|
||||
}
|
||||
*allVersions = append(*allVersions, versionEntry)
|
||||
}
|
||||
|
||||
// Recursively search subdirectories (regardless of whether they're explicit or implicit)
|
||||
fullPath := path.Join(currentPath, entry.Name)
|
||||
err := s3a.findVersionsRecursively(fullPath, entryPath, allVersions, processedObjects, seenVersionIds, bucket, prefix, maxCollect)
|
||||
if err != nil {
|
||||
glog.Warningf("Error searching subdirectory %s: %v", entryPath, err)
|
||||
continue
|
||||
}
|
||||
// Check if we've collected enough after recursion
|
||||
if maxCollect > 0 && len(*allVersions) >= maxCollect {
|
||||
return nil
|
||||
}
|
||||
if err := vc.processDirectory(currentPath, entryPath, entry); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// This is a regular file - check if it's a pre-versioning object
|
||||
objectKey := entryPath
|
||||
|
||||
// Normalize object key to ensure consistency with other version operations
|
||||
normalizedObjectKey := removeDuplicateSlashes(objectKey)
|
||||
|
||||
// Skip if this object already has a .versions directory (already processed)
|
||||
// Check both normalized and original keys for backward compatibility
|
||||
if processedObjects[objectKey] || processedObjects[normalizedObjectKey] {
|
||||
glog.V(4).Infof("Skipping already processed object: objectKey=%s, normalizedObjectKey=%s, processedObjects[objectKey]=%v, processedObjects[normalizedObjectKey]=%v",
|
||||
objectKey, normalizedObjectKey, processedObjects[objectKey], processedObjects[normalizedObjectKey])
|
||||
continue
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Processing regular file: objectKey=%s, normalizedObjectKey=%s, NOT in processedObjects", objectKey, normalizedObjectKey)
|
||||
|
||||
// This is a pre-versioning or suspended-versioning object
|
||||
// Check if this file has version metadata (ExtVersionIdKey)
|
||||
hasVersionMeta := false
|
||||
if entry.Extended != nil {
|
||||
if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok {
|
||||
hasVersionMeta = true
|
||||
glog.V(4).Infof("Regular file %s has version metadata: %s", normalizedObjectKey, string(versionIdBytes))
|
||||
}
|
||||
}
|
||||
|
||||
// Check if a .versions directory exists for this object
|
||||
// Use entry.Name (relative to currentPath) to avoid duplicating subdirectory segments
|
||||
versionsEntryName := entry.Name + s3_constants.VersionsFolder
|
||||
_, versionsErr := s3a.getEntry(currentPath, versionsEntryName)
|
||||
if versionsErr == nil {
|
||||
// .versions directory exists
|
||||
glog.V(4).Infof("Found .versions directory for regular file %s, hasVersionMeta=%v", normalizedObjectKey, hasVersionMeta)
|
||||
|
||||
// If this file has version metadata, it's a suspended versioning null version
|
||||
// Include it and it will be the latest
|
||||
if hasVersionMeta {
|
||||
glog.V(4).Infof("Including suspended versioning file %s (has version metadata)", normalizedObjectKey)
|
||||
// Continue to add it below
|
||||
} else {
|
||||
// No version metadata - this is a pre-versioning file
|
||||
// Skip it if there's already a null version in .versions
|
||||
versions, err := s3a.getObjectVersionList(bucket, normalizedObjectKey)
|
||||
if err == nil {
|
||||
hasNullVersion := false
|
||||
for _, v := range versions {
|
||||
if v.VersionId == "null" {
|
||||
hasNullVersion = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if hasNullVersion {
|
||||
glog.V(4).Infof("Skipping pre-versioning file %s, null version exists in .versions", normalizedObjectKey)
|
||||
processedObjects[objectKey] = true
|
||||
processedObjects[normalizedObjectKey] = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
glog.V(4).Infof("Including pre-versioning file %s (no null version in .versions)", normalizedObjectKey)
|
||||
}
|
||||
} else {
|
||||
glog.V(4).Infof("No .versions directory for regular file %s, hasVersionMeta=%v", normalizedObjectKey, hasVersionMeta)
|
||||
}
|
||||
|
||||
// Add this file as a null version with IsLatest=true
|
||||
isLatest := true
|
||||
|
||||
// Check for duplicate version IDs and skip if already seen
|
||||
// Use normalized key for deduplication to match how other version operations work
|
||||
versionKey := normalizedObjectKey + ":null"
|
||||
if seenVersionIds[versionKey] {
|
||||
glog.Warningf("findVersionsRecursively: duplicate null version for object %s detected (versionKey=%s), skipping", normalizedObjectKey, versionKey)
|
||||
continue
|
||||
}
|
||||
seenVersionIds[versionKey] = true
|
||||
|
||||
etag := s3a.calculateETagFromChunks(entry.Chunks)
|
||||
|
||||
glog.V(4).Infof("Adding null version from regular file: objectKey=%s, normalizedObjectKey=%s, versionKey=%s, isLatest=%v, hasVersionMeta=%v",
|
||||
objectKey, normalizedObjectKey, versionKey, isLatest, hasVersionMeta)
|
||||
|
||||
versionEntry := &VersionEntry{
|
||||
Key: normalizedObjectKey, // Use normalized key for consistency
|
||||
VersionId: "null",
|
||||
IsLatest: isLatest,
|
||||
LastModified: time.Unix(entry.Attributes.Mtime, 0),
|
||||
ETag: etag,
|
||||
Size: int64(entry.Attributes.FileSize),
|
||||
Owner: s3a.getObjectOwnerFromEntry(entry),
|
||||
StorageClass: "STANDARD",
|
||||
}
|
||||
*allVersions = append(*allVersions, versionEntry)
|
||||
vc.processRegularFile(currentPath, entryPath, entry)
|
||||
}
|
||||
}
|
||||
|
||||
// If we've reached the last page, stop pagination
|
||||
if isLast {
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// processDirectory handles directory entries
|
||||
func (vc *versionCollector) processDirectory(currentPath, entryPath string, entry *filer_pb.Entry) error {
|
||||
// Skip .uploads directory
|
||||
if strings.HasPrefix(entry.Name, ".uploads") {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Handle .versions directory
|
||||
if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) {
|
||||
return vc.processVersionsDirectory(entryPath)
|
||||
}
|
||||
|
||||
// Handle explicit S3 directory object
|
||||
if entry.Attributes.Mime == s3_constants.FolderMimeType {
|
||||
vc.processExplicitDirectory(entryPath, entry)
|
||||
}
|
||||
|
||||
// Recursively search subdirectory
|
||||
fullPath := path.Join(currentPath, entry.Name)
|
||||
if err := vc.collectVersions(fullPath, entryPath); err != nil {
|
||||
glog.Warningf("Error searching subdirectory %s: %v", entryPath, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -860,13 +846,14 @@ func (s3a *S3ApiServer) updateLatestVersionAfterDeletion(bucket, object string)
|
||||
continue
|
||||
}
|
||||
|
||||
// Compare version IDs chronologically (our version IDs start with timestamp)
|
||||
if latestVersionId == "" || versionId > latestVersionId {
|
||||
// Compare version IDs chronologically using unified comparator (handles both old and new formats)
|
||||
// compareVersionIds returns negative if first arg is newer
|
||||
if latestVersionId == "" || compareVersionIds(versionId, latestVersionId) < 0 {
|
||||
glog.V(1).Infof("updateLatestVersionAfterDeletion: found newer version %s (file: %s)", versionId, entry.Name)
|
||||
latestVersionId = versionId
|
||||
latestVersionFileName = entry.Name
|
||||
} else {
|
||||
glog.V(1).Infof("updateLatestVersionAfterDeletion: skipping older version %s", versionId)
|
||||
glog.V(1).Infof("updateLatestVersionAfterDeletion: skipping older or equal version %s", versionId)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1091,3 +1078,4 @@ func (s3a *S3ApiServer) getObjectOwnerFromEntry(entry *filer_pb.Entry) Canonical
|
||||
// Fallback: return anonymous if no owner found
|
||||
return CanonicalUser{ID: s3_constants.AccountAnonymousId, DisplayName: "anonymous"}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user