s3: add pagination to getObjectVersionList and reduce memory (#7787)
* s3: add pagination to getObjectVersionList and reduce memory
This commit makes two improvements to S3 version listing:
1. Add pagination to getObjectVersionList:
- Previously hardcoded limit of 1000 versions per object
- Now paginates through all versions using startFrom marker
- Fixes correctness issue where objects with >1000 versions would
have some versions missing from list results
2. Reduce memory by not retaining full Entry:
- Replace Entry field with OwnerID string in ObjectVersion struct
- Extract owner ID when creating ObjectVersion
- Avoids retaining Chunks array which can be large for big files
- Clear seenVersionIds map after use to help GC
3. Update getObjectOwnerFromVersion:
- Use new OwnerID field instead of accessing Entry.Extended
- Maintains backward compatibility with fallback lookups
* s3: propagate errors from list operation instead of returning partial results
Address review feedback: when s3a.list fails during version listing,
the function was logging at V(2) level and returning partial results
with nil error. This hides the error and could lead to silent data
inconsistencies.
Fix by:
1. Log at Warningf level for better visibility
2. Return nil versions slice with the error to prevent partial results
from being processed as complete
This commit is contained in:
@@ -61,6 +61,8 @@ type ListObjectVersionsResult struct {
|
||||
}
|
||||
|
||||
// ObjectVersion represents a version of an S3 object
|
||||
// Note: We intentionally do not store the full filer_pb.Entry here to avoid
|
||||
// retaining large Chunks arrays in memory during list operations.
|
||||
type ObjectVersion struct {
|
||||
VersionId string
|
||||
IsLatest bool
|
||||
@@ -68,7 +70,7 @@ type ObjectVersion struct {
|
||||
LastModified time.Time
|
||||
ETag string
|
||||
Size int64
|
||||
Entry *filer_pb.Entry
|
||||
OwnerID string // Owner ID extracted from entry metadata
|
||||
}
|
||||
|
||||
// generateVersionId creates a unique version ID that preserves chronological order
|
||||
@@ -531,6 +533,7 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string
|
||||
}
|
||||
|
||||
// getObjectVersionList returns all versions of a specific object
|
||||
// Uses pagination to handle objects with more than 1000 versions
|
||||
func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVersion, error) {
|
||||
var versions []*ObjectVersion
|
||||
|
||||
@@ -558,72 +561,96 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe
|
||||
}
|
||||
}
|
||||
|
||||
// List all version files in the .versions directory
|
||||
entries, _, err := s3a.list(bucketDir+"/"+versionsObjectPath, "", "", false, 1000)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("getObjectVersionList: failed to list version files: %v", err)
|
||||
return versions, nil
|
||||
}
|
||||
|
||||
glog.V(2).Infof("getObjectVersionList: found %d entries in versions directory", len(entries))
|
||||
|
||||
// Use a map to detect and prevent duplicate version IDs
|
||||
seenVersionIds := make(map[string]bool)
|
||||
versionsDir := bucketDir + "/" + versionsObjectPath
|
||||
|
||||
for i, entry := range entries {
|
||||
if entry.Extended == nil {
|
||||
glog.V(2).Infof("getObjectVersionList: entry %d has no Extended metadata, skipping", i)
|
||||
continue
|
||||
// Paginate through all version files in the .versions directory
|
||||
startFrom := ""
|
||||
const pageSize = 1000
|
||||
totalEntries := 0
|
||||
|
||||
for {
|
||||
entries, isLast, err := s3a.list(versionsDir, "", startFrom, false, pageSize)
|
||||
if err != nil {
|
||||
glog.Warningf("getObjectVersionList: failed to list version files in %s: %v", versionsDir, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
|
||||
if !hasVersionId {
|
||||
glog.V(2).Infof("getObjectVersionList: entry %d has no version ID, skipping", i)
|
||||
continue
|
||||
}
|
||||
totalEntries += len(entries)
|
||||
|
||||
versionId := string(versionIdBytes)
|
||||
for i, entry := range entries {
|
||||
// Track last entry for pagination
|
||||
startFrom = entry.Name
|
||||
|
||||
// Check for duplicate version IDs and skip if already seen
|
||||
if seenVersionIds[versionId] {
|
||||
glog.Warningf("getObjectVersionList: duplicate version ID %s detected for object %s/%s, skipping", versionId, bucket, object)
|
||||
continue
|
||||
}
|
||||
seenVersionIds[versionId] = true
|
||||
|
||||
// Check if this version is the latest by comparing with directory metadata
|
||||
isLatest := (versionId == latestVersionId)
|
||||
|
||||
isDeleteMarkerBytes, _ := entry.Extended[s3_constants.ExtDeleteMarkerKey]
|
||||
isDeleteMarker := string(isDeleteMarkerBytes) == "true"
|
||||
|
||||
glog.V(2).Infof("getObjectVersionList: found version %s, isLatest=%v, isDeleteMarker=%v", versionId, isLatest, isDeleteMarker)
|
||||
|
||||
version := &ObjectVersion{
|
||||
VersionId: versionId,
|
||||
IsLatest: isLatest,
|
||||
IsDeleteMarker: isDeleteMarker,
|
||||
LastModified: time.Unix(entry.Attributes.Mtime, 0),
|
||||
Entry: entry,
|
||||
}
|
||||
|
||||
if !isDeleteMarker {
|
||||
// Try to get ETag from Extended attributes first
|
||||
if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag {
|
||||
version.ETag = string(etagBytes)
|
||||
} else {
|
||||
// Fallback: calculate ETag from chunks
|
||||
version.ETag = s3a.calculateETagFromChunks(entry.Chunks)
|
||||
if entry.Extended == nil {
|
||||
glog.V(2).Infof("getObjectVersionList: entry %d has no Extended metadata, skipping", i)
|
||||
continue
|
||||
}
|
||||
version.Size = int64(entry.Attributes.FileSize)
|
||||
|
||||
versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
|
||||
if !hasVersionId {
|
||||
glog.V(2).Infof("getObjectVersionList: entry %d has no version ID, skipping", i)
|
||||
continue
|
||||
}
|
||||
|
||||
versionId := string(versionIdBytes)
|
||||
|
||||
// Check for duplicate version IDs and skip if already seen
|
||||
if seenVersionIds[versionId] {
|
||||
glog.Warningf("getObjectVersionList: duplicate version ID %s detected for object %s/%s, skipping", versionId, bucket, object)
|
||||
continue
|
||||
}
|
||||
seenVersionIds[versionId] = true
|
||||
|
||||
// Check if this version is the latest by comparing with directory metadata
|
||||
isLatest := (versionId == latestVersionId)
|
||||
|
||||
isDeleteMarkerBytes, _ := entry.Extended[s3_constants.ExtDeleteMarkerKey]
|
||||
isDeleteMarker := string(isDeleteMarkerBytes) == "true"
|
||||
|
||||
glog.V(2).Infof("getObjectVersionList: found version %s, isLatest=%v, isDeleteMarker=%v", versionId, isLatest, isDeleteMarker)
|
||||
|
||||
// Extract owner ID from entry metadata to avoid retaining full Entry with Chunks
|
||||
var ownerID string
|
||||
if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
|
||||
ownerID = string(ownerBytes)
|
||||
}
|
||||
|
||||
version := &ObjectVersion{
|
||||
VersionId: versionId,
|
||||
IsLatest: isLatest,
|
||||
IsDeleteMarker: isDeleteMarker,
|
||||
LastModified: time.Unix(entry.Attributes.Mtime, 0),
|
||||
OwnerID: ownerID,
|
||||
}
|
||||
|
||||
if !isDeleteMarker {
|
||||
// Try to get ETag from Extended attributes first
|
||||
if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag {
|
||||
version.ETag = string(etagBytes)
|
||||
} else {
|
||||
// Fallback: calculate ETag from chunks
|
||||
version.ETag = s3a.calculateETagFromChunks(entry.Chunks)
|
||||
}
|
||||
version.Size = int64(entry.Attributes.FileSize)
|
||||
}
|
||||
|
||||
versions = append(versions, version)
|
||||
}
|
||||
|
||||
versions = append(versions, version)
|
||||
// Stop if we've reached the last page
|
||||
if isLast || len(entries) < pageSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Clear map to help GC
|
||||
clear(seenVersionIds)
|
||||
|
||||
// Don't sort here - let the main listObjectVersions function handle sorting consistently
|
||||
|
||||
glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s (after deduplication from %d entries)", len(versions), bucket, object, len(entries))
|
||||
glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s (after deduplication from %d entries)", len(versions), bucket, object, totalEntries)
|
||||
for i, version := range versions {
|
||||
glog.V(2).Infof("getObjectVersionList: version %d: %s (isLatest=%v, isDeleteMarker=%v)", i, version.VersionId, version.IsLatest, version.IsDeleteMarker)
|
||||
}
|
||||
@@ -988,15 +1015,12 @@ func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb
|
||||
return latestVersionEntry, nil
|
||||
}
|
||||
|
||||
// getObjectOwnerFromVersion extracts object owner information from version entry metadata
|
||||
// getObjectOwnerFromVersion extracts object owner information from version metadata
|
||||
func (s3a *S3ApiServer) getObjectOwnerFromVersion(version *ObjectVersion, bucket, objectKey string) CanonicalUser {
|
||||
// First try to get owner from the version entry itself
|
||||
if version.Entry != nil && version.Entry.Extended != nil {
|
||||
if ownerBytes, exists := version.Entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
|
||||
ownerId := string(ownerBytes)
|
||||
ownerDisplayName := s3a.iam.GetAccountNameById(ownerId)
|
||||
return CanonicalUser{ID: ownerId, DisplayName: ownerDisplayName}
|
||||
}
|
||||
// First try to get owner from the version's OwnerID field (extracted during listing)
|
||||
if version.OwnerID != "" {
|
||||
ownerDisplayName := s3a.iam.GetAccountNameById(version.OwnerID)
|
||||
return CanonicalUser{ID: version.OwnerID, DisplayName: ownerDisplayName}
|
||||
}
|
||||
|
||||
// Fallback: try to get owner from the current version of the object
|
||||
|
||||
Reference in New Issue
Block a user