lifecycle worker: scan-time rule evaluation for object expiration (#8809)

* s3api: extend lifecycle XML types with NoncurrentVersionExpiration, AbortIncompleteMultipartUpload

Add missing S3 lifecycle rule types to the XML data model:
- NoncurrentVersionExpiration with NoncurrentDays and NewerNoncurrentVersions
- NoncurrentVersionTransition with NoncurrentDays and StorageClass
- AbortIncompleteMultipartUpload with DaysAfterInitiation
- Filter.ObjectSizeGreaterThan and ObjectSizeLessThan
- And.ObjectSizeGreaterThan and ObjectSizeLessThan
- Filter.UnmarshalXML to properly parse Tag, And, and size filter elements

Each new type follows the existing set-field pattern for conditional
XML marshaling. No behavior changes - these types are not yet wired
into handlers or the lifecycle worker.

* s3lifecycle: add lifecycle rule evaluator package

New package weed/s3api/s3lifecycle/ provides a pure-function lifecycle
rule evaluation engine. The evaluator accepts flattened Rule structs and
ObjectInfo metadata, and returns the appropriate Action.

Components:
- evaluator.go: Evaluate() for per-object actions with S3 priority
  ordering (delete marker > noncurrent version > current expiration),
  ShouldExpireNoncurrentVersion() with NewerNoncurrentVersions support,
  EvaluateMPUAbort() for multipart upload rules
- filter.go: prefix, tag, and size-based filter matching
- tags.go: ExtractTags() extracts S3 tags from filer Extended metadata,
  HasTagRules() for scan-time optimization
- version_time.go: GetVersionTimestamp() extracts timestamps from
  SeaweedFS version IDs (both old and new format)

Comprehensive test coverage: 54 tests covering all action types,
filter combinations, edge cases, and version ID formats.

* s3api: add UnmarshalXML for Expiration, Transition, ExpireDeleteMarker

Add UnmarshalXML methods that set the internal 'set' flag during XML
parsing. Previously these flags were only set programmatically, causing
XML round-trip to drop elements. This ensures lifecycle configurations
stored as XML survive unmarshal/marshal cycles correctly.

Add comprehensive XML round-trip tests for all lifecycle rule types
including NoncurrentVersionExpiration, AbortIncompleteMultipartUpload,
Filter with Tag/And/size constraints, and a complete Terraform-style
lifecycle configuration.

* s3lifecycle: address review feedback

- Fix version_time.go overflow: guard timestampPart > MaxInt64 before
  the inversion subtraction to prevent uint64 wrap
- Make all expiry checks inclusive (!now.Before instead of now.After)
  so actions trigger at the exact scheduled instant
- Add NoncurrentIndex to ObjectInfo so Evaluate() can properly handle
  NewerNoncurrentVersions via ShouldExpireNoncurrentVersion()
- Add test for high-bit overflow version ID

* s3lifecycle: guard ShouldExpireNoncurrentVersion against zero SuccessorModTime

Add early return when obj.IsLatest or obj.SuccessorModTime.IsZero()
to prevent premature expiration of versions with uninitialized
successor timestamps (zero value would compute to epoch, always expired).

* lifecycle worker: detect buckets with lifecycle XML, not just filer.conf TTLs

Update the detection phase to check for stored lifecycle XML in bucket
metadata (key: s3-bucket-lifecycle-configuration-xml) in addition to
filer.conf TTL entries. A bucket is proposed for lifecycle processing if
it has lifecycle XML OR filer.conf TTLs (backward compatible).

New proposal parameters:
- has_lifecycle_xml: whether the bucket has stored lifecycle XML
- versioning_status: the bucket's versioning state (Enabled/Suspended/"")

These parameters will be used by the execution phase (subsequent PR)
to determine which evaluation path to use.

* lifecycle worker: update detection function comment to reflect XML support

* lifecycle worker: add lifecycle XML parsing and rule conversion

Add rules.go with:
- parseLifecycleXML() converts stored lifecycle XML to evaluator-friendly
  s3lifecycle.Rule structs, handling Filter.Prefix, Filter.Tag, Filter.And,
  size constraints, NoncurrentVersionExpiration, AbortIncompleteMultipartUpload,
  Expiration.Date, and ExpiredObjectDeleteMarker
- loadLifecycleRulesFromBucket() reads lifecycle XML from bucket metadata
- parseExpirationDate() supports RFC3339 and ISO 8601 date-only formats

Comprehensive tests for all XML variants, filter types, and date formats.

* lifecycle worker: add scan-time rule evaluation for object expiration

Update executeLifecycleForBucket to try lifecycle XML evaluation first,
falling back to TTL-only evaluation when no lifecycle XML exists.

New listExpiredObjectsByRules() function:
- Walks the bucket directory tree
- Builds s3lifecycle.ObjectInfo from each filer entry
- Calls s3lifecycle.Evaluate() to check lifecycle rules
- Skips objects already handled by TTL fast path (TtlSec set)
- Extracts tags only when rules use tag-based filters (optimization)
- Skips .uploads and .versions directories (handled by other phases)

Supports Expiration.Days, Expiration.Date, Filter.Prefix, Filter.Tag,
Filter.And, and Filter.ObjectSize* in the scan-time evaluation path.
Existing TTL-based path remains for backward compatibility.

* lifecycle worker: address review feedback

- Use sentinel error (errLimitReached) instead of string matching
  for scan limit detection
- Fix loadLifecycleRulesFromBucket path: use bucketsPath directly
  as directory for LookupEntry instead of path.Dir which produced
  the wrong parent

* lifecycle worker: fix And filter detection for size-only constraints

The And branch condition only triggered when Prefix or Tags were present,
missing the case where And contains only ObjectSizeGreaterThan or
ObjectSizeLessThan without a prefix or tags.

* lifecycle worker: address review feedback round 3

- rules.go: pass through Filter-level size constraints when Tag is
  present without And (Tag+size combination was dropping sizes)
- execution.go: add doc comment to listExpiredObjectsByRules noting
  that it handles non-versioned objects only; versioned objects are
  handled by processVersionsDirectory
- rules_test.go: add bounds checks before indexing rules[0]

---------

Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
Chris Lu
2026-03-28 11:39:50 -07:00
committed by GitHub
parent 98f545c7fa
commit 9c3bc138a0
3 changed files with 555 additions and 12 deletions

View File

@@ -2,6 +2,7 @@ package lifecycle
import (
"context"
"errors"
"fmt"
"math"
"path"
@@ -13,8 +14,11 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle"
)
var errLimitReached = errors.New("limit reached")
type executionResult struct {
objectsExpired int64
objectsScanned int64
@@ -37,17 +41,26 @@ func (h *Handler) executeLifecycleForBucket(
) (*executionResult, error) {
result := &executionResult{}
// Load filer.conf to verify TTL rules still exist.
fc, err := loadFilerConf(ctx, filerClient)
if err != nil {
return result, fmt.Errorf("load filer conf: %w", err)
// Try to load lifecycle rules from stored XML first (full rule evaluation).
// Fall back to filer.conf TTL-only evaluation if no XML exists.
lifecycleRules, xmlErr := loadLifecycleRulesFromBucket(ctx, filerClient, bucketsPath, bucket)
if xmlErr != nil {
glog.V(1).Infof("s3_lifecycle: bucket %s: failed to load lifecycle XML: %v, falling back to TTL", bucket, xmlErr)
}
useRuleEval := xmlErr == nil && len(lifecycleRules) > 0
collection := bucket
ttlRules := fc.GetCollectionTtls(collection)
if len(ttlRules) == 0 {
glog.V(1).Infof("s3_lifecycle: bucket %s has no lifecycle rules, skipping", bucket)
return result, nil
if !useRuleEval {
// Fall back: check filer.conf TTL rules.
fc, err := loadFilerConf(ctx, filerClient)
if err != nil {
return result, fmt.Errorf("load filer conf: %w", err)
}
collection := bucket
ttlRules := fc.GetCollectionTtls(collection)
if len(ttlRules) == 0 {
glog.V(1).Infof("s3_lifecycle: bucket %s has no lifecycle rules, skipping", bucket)
return result, nil
}
}
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
@@ -56,14 +69,21 @@ func (h *Handler) executeLifecycleForBucket(
State: plugin_pb.JobState_JOB_STATE_RUNNING,
ProgressPercent: 10,
Stage: "scanning",
Message: fmt.Sprintf("scanning bucket %s for expired objects (%d rules)", bucket, len(ttlRules)),
Message: fmt.Sprintf("scanning bucket %s for expired objects", bucket),
})
// Shared budget across all phases so we don't exceed MaxDeletesPerBucket.
remaining := config.MaxDeletesPerBucket
// Find expired objects.
expired, scanned, err := listExpiredObjects(ctx, filerClient, bucketsPath, bucket, remaining)
// Find expired objects using rule-based evaluation or TTL fallback.
var expired []expiredObject
var scanned int64
var err error
if useRuleEval {
expired, scanned, err = listExpiredObjectsByRules(ctx, filerClient, bucketsPath, bucket, lifecycleRules, remaining)
} else {
expired, scanned, err = listExpiredObjects(ctx, filerClient, bucketsPath, bucket, remaining)
}
result.objectsScanned = scanned
if err != nil {
return result, fmt.Errorf("list expired objects: %w", err)
@@ -326,3 +346,93 @@ func deleteExpiredObjects(
func nowUnix() int64 {
return time.Now().Unix()
}
// listExpiredObjectsByRules scans a bucket directory tree and evaluates
// lifecycle rules against each object using the s3lifecycle evaluator.
// This function handles non-versioned objects (IsLatest=true). Versioned
// objects in .versions directories are handled by processVersionsDirectory
// (added in a separate change for NoncurrentVersionExpiration support).
func listExpiredObjectsByRules(
ctx context.Context,
client filer_pb.SeaweedFilerClient,
bucketsPath, bucket string,
rules []s3lifecycle.Rule,
limit int64,
) ([]expiredObject, int64, error) {
var expired []expiredObject
var scanned int64
bucketPath := path.Join(bucketsPath, bucket)
now := time.Now()
needTags := s3lifecycle.HasTagRules(rules)
dirsToProcess := []string{bucketPath}
for len(dirsToProcess) > 0 {
select {
case <-ctx.Done():
return expired, scanned, ctx.Err()
default:
}
dir := dirsToProcess[0]
dirsToProcess = dirsToProcess[1:]
limitReached := false
err := filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error {
if entry.IsDirectory {
// Skip .uploads and .versions directories.
if entry.Name != s3_constants.MultipartUploadsFolder &&
!strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) {
dirsToProcess = append(dirsToProcess, path.Join(dir, entry.Name))
}
return nil
}
scanned++
// Skip objects already handled by TTL fast path.
if entry.Attributes != nil && entry.Attributes.TtlSec > 0 {
expirationUnix := entry.Attributes.Crtime + int64(entry.Attributes.TtlSec)
if expirationUnix > nowUnix() {
return nil // will be expired by RocksDB compaction
}
}
// Build ObjectInfo for the evaluator.
relKey := strings.TrimPrefix(path.Join(dir, entry.Name), bucketPath+"/")
objInfo := s3lifecycle.ObjectInfo{
Key: relKey,
IsLatest: true, // non-versioned objects are always "latest"
Size: int64(entry.Attributes.GetFileSize()),
}
if entry.Attributes != nil && entry.Attributes.Mtime > 0 {
objInfo.ModTime = time.Unix(entry.Attributes.Mtime, 0)
} else if entry.Attributes != nil && entry.Attributes.Crtime > 0 {
objInfo.ModTime = time.Unix(entry.Attributes.Crtime, 0)
}
if needTags {
objInfo.Tags = s3lifecycle.ExtractTags(entry.Extended)
}
result := s3lifecycle.Evaluate(rules, objInfo, now)
if result.Action == s3lifecycle.ActionDeleteObject {
expired = append(expired, expiredObject{dir: dir, name: entry.Name})
}
if limit > 0 && int64(len(expired)) >= limit {
limitReached = true
return errLimitReached
}
return nil
}, "", false, 10000)
if err != nil && !errors.Is(err, errLimitReached) {
return expired, scanned, fmt.Errorf("list %s: %w", dir, err)
}
if limitReached || (limit > 0 && int64(len(expired)) >= limit) {
break
}
}
return expired, scanned, nil
}