Files
seaweedFS/weed/plugin/worker/lifecycle/detection.go
Chris Lu 98f545c7fa lifecycle worker: detect buckets via lifecycle XML metadata (#8808)
* s3api: extend lifecycle XML types with NoncurrentVersionExpiration, AbortIncompleteMultipartUpload

Add missing S3 lifecycle rule types to the XML data model:
- NoncurrentVersionExpiration with NoncurrentDays and NewerNoncurrentVersions
- NoncurrentVersionTransition with NoncurrentDays and StorageClass
- AbortIncompleteMultipartUpload with DaysAfterInitiation
- Filter.ObjectSizeGreaterThan and ObjectSizeLessThan
- And.ObjectSizeGreaterThan and ObjectSizeLessThan
- Filter.UnmarshalXML to properly parse Tag, And, and size filter elements

Each new type follows the existing set-field pattern for conditional
XML marshaling. No behavior changes - these types are not yet wired
into handlers or the lifecycle worker.

* s3lifecycle: add lifecycle rule evaluator package

New package weed/s3api/s3lifecycle/ provides a pure-function lifecycle
rule evaluation engine. The evaluator accepts flattened Rule structs and
ObjectInfo metadata, and returns the appropriate Action.

Components:
- evaluator.go: Evaluate() for per-object actions with S3 priority
  ordering (delete marker > noncurrent version > current expiration),
  ShouldExpireNoncurrentVersion() with NewerNoncurrentVersions support,
  EvaluateMPUAbort() for multipart upload rules
- filter.go: prefix, tag, and size-based filter matching
- tags.go: ExtractTags() extracts S3 tags from filer Extended metadata,
  HasTagRules() for scan-time optimization
- version_time.go: GetVersionTimestamp() extracts timestamps from
  SeaweedFS version IDs (both old and new format)

Comprehensive test coverage: 54 tests covering all action types,
filter combinations, edge cases, and version ID formats.

* s3api: add UnmarshalXML for Expiration, Transition, ExpireDeleteMarker

Add UnmarshalXML methods that set the internal 'set' flag during XML
parsing. Previously these flags were only set programmatically, causing
XML round-trip to drop elements. This ensures lifecycle configurations
stored as XML survive unmarshal/marshal cycles correctly.

Add comprehensive XML round-trip tests for all lifecycle rule types
including NoncurrentVersionExpiration, AbortIncompleteMultipartUpload,
Filter with Tag/And/size constraints, and a complete Terraform-style
lifecycle configuration.

* s3lifecycle: address review feedback

- Fix version_time.go overflow: guard timestampPart > MaxInt64 before
  the inversion subtraction to prevent uint64 wrap
- Make all expiry checks inclusive (!now.Before instead of now.After)
  so actions trigger at the exact scheduled instant
- Add NoncurrentIndex to ObjectInfo so Evaluate() can properly handle
  NewerNoncurrentVersions via ShouldExpireNoncurrentVersion()
- Add test for high-bit overflow version ID

* s3lifecycle: guard ShouldExpireNoncurrentVersion against zero SuccessorModTime

Add early return when obj.IsLatest or obj.SuccessorModTime.IsZero()
to prevent premature expiration of versions with uninitialized
successor timestamps (zero value would compute to epoch, always expired).

* lifecycle worker: detect buckets with lifecycle XML, not just filer.conf TTLs

Update the detection phase to check for stored lifecycle XML in bucket
metadata (key: s3-bucket-lifecycle-configuration-xml) in addition to
filer.conf TTL entries. A bucket is proposed for lifecycle processing if
it has lifecycle XML OR filer.conf TTLs (backward compatible).

New proposal parameters:
- has_lifecycle_xml: whether the bucket has stored lifecycle XML
- versioning_status: the bucket's versioning state (Enabled/Suspended/"")

These parameters will be used by the execution phase (subsequent PR)
to determine which evaluation path to use.

* lifecycle worker: update detection function comment to reflect XML support

---------

Co-authored-by: Copilot <copilot@github.com>
2026-03-28 11:16:58 -07:00

222 lines
6.4 KiB
Go

package lifecycle
import (
"context"
"fmt"
"path"
"strings"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
)
const lifecycleXMLKey = "s3-bucket-lifecycle-configuration-xml"
// detectBucketsWithLifecycleRules scans all S3 buckets to find those
// with lifecycle rules, either TTL entries in filer.conf or lifecycle
// XML stored in bucket metadata.
func (h *Handler) detectBucketsWithLifecycleRules(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
config Config,
bucketFilter string,
maxResults int,
) ([]*plugin_pb.JobProposal, error) {
// Load filer configuration to find TTL rules.
fc, err := loadFilerConf(ctx, filerClient)
if err != nil {
return nil, fmt.Errorf("load filer conf: %w", err)
}
bucketsPath := defaultBucketsPath
bucketMatchers := wildcard.CompileWildcardMatchers(bucketFilter)
// List all buckets.
bucketEntries, err := listFilerEntries(ctx, filerClient, bucketsPath, "")
if err != nil {
return nil, fmt.Errorf("list buckets at %s: %w", bucketsPath, err)
}
var proposals []*plugin_pb.JobProposal
for _, entry := range bucketEntries {
select {
case <-ctx.Done():
return proposals, ctx.Err()
default:
}
if !entry.IsDirectory {
continue
}
bucketName := entry.Name
if !wildcard.MatchesAnyWildcard(bucketMatchers, bucketName) {
continue
}
// Check for lifecycle rules from two sources:
// 1. filer.conf TTLs (legacy Expiration.Days fast path)
// 2. Stored lifecycle XML in bucket metadata (full rule support)
collection := bucketName
ttls := fc.GetCollectionTtls(collection)
hasLifecycleXML := entry.Extended != nil && len(entry.Extended[lifecycleXMLKey]) > 0
versioningStatus := ""
if entry.Extended != nil {
versioningStatus = string(entry.Extended[s3_constants.ExtVersioningKey])
}
ruleCount := int64(len(ttls))
if !hasLifecycleXML && ruleCount == 0 {
continue
}
glog.V(2).Infof("s3_lifecycle: bucket %s has %d TTL rule(s), lifecycle_xml=%v, versioning=%s",
bucketName, ruleCount, hasLifecycleXML, versioningStatus)
proposal := &plugin_pb.JobProposal{
ProposalId: fmt.Sprintf("s3_lifecycle:%s", bucketName),
JobType: jobType,
Summary: fmt.Sprintf("Lifecycle management for bucket %s", bucketName),
DedupeKey: fmt.Sprintf("s3_lifecycle:%s", bucketName),
Parameters: map[string]*plugin_pb.ConfigValue{
"bucket": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: bucketName}},
"buckets_path": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: bucketsPath}},
"collection": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: collection}},
"rule_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: ruleCount}},
"has_lifecycle_xml": {Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: hasLifecycleXML}},
"versioning_status": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: versioningStatus}},
},
Labels: map[string]string{
"bucket": bucketName,
},
}
proposals = append(proposals, proposal)
if maxResults > 0 && len(proposals) >= maxResults {
break
}
}
return proposals, nil
}
const defaultBucketsPath = "/buckets"
// loadFilerConf reads the filer configuration from the filer.
func loadFilerConf(ctx context.Context, client filer_pb.SeaweedFilerClient) (*filer.FilerConf, error) {
fc := filer.NewFilerConf()
content, err := filer.ReadInsideFiler(ctx, client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName)
if err != nil {
// filer.conf may not exist yet - return empty config.
glog.V(1).Infof("s3_lifecycle: filer.conf not found or unreadable: %v (using empty config)", err)
return fc, nil
}
if err := fc.LoadFromBytes(content); err != nil {
return nil, fmt.Errorf("parse filer.conf: %w", err)
}
return fc, nil
}
// listFilerEntries lists directory entries from the filer.
func listFilerEntries(ctx context.Context, client filer_pb.SeaweedFilerClient, dir, startFrom string) ([]*filer_pb.Entry, error) {
var entries []*filer_pb.Entry
err := filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error {
entries = append(entries, entry)
return nil
}, startFrom, false, 10000)
return entries, err
}
type expiredObject struct {
dir string
name string
}
// listExpiredObjects scans a bucket directory tree for objects whose TTL
// has expired based on their TtlSec attribute set by PutBucketLifecycle.
func listExpiredObjects(
ctx context.Context,
client filer_pb.SeaweedFilerClient,
bucketsPath, bucket string,
limit int64,
) ([]expiredObject, int64, error) {
var expired []expiredObject
var scanned int64
bucketPath := path.Join(bucketsPath, bucket)
// Walk the bucket directory tree using breadth-first traversal.
dirsToProcess := []string{bucketPath}
for len(dirsToProcess) > 0 {
select {
case <-ctx.Done():
return expired, scanned, ctx.Err()
default:
}
dir := dirsToProcess[0]
dirsToProcess = dirsToProcess[1:]
limitReached := false
err := filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error {
if entry.IsDirectory {
dirsToProcess = append(dirsToProcess, path.Join(dir, entry.Name))
return nil
}
scanned++
if isExpiredByTTL(entry) {
expired = append(expired, expiredObject{
dir: dir,
name: entry.Name,
})
}
if limit > 0 && int64(len(expired)) >= limit {
limitReached = true
return fmt.Errorf("limit reached")
}
return nil
}, "", false, 10000)
if err != nil && !strings.Contains(err.Error(), "limit reached") {
return expired, scanned, fmt.Errorf("list %s: %w", dir, err)
}
if limitReached || (limit > 0 && int64(len(expired)) >= limit) {
break
}
}
return expired, scanned, nil
}
// isExpiredByTTL checks if an entry is expired based on its TTL attribute.
// SeaweedFS sets TtlSec on entries when lifecycle rules are applied via
// PutBucketLifecycleConfiguration. An entry is expired when
// creation_time + TTL < now.
func isExpiredByTTL(entry *filer_pb.Entry) bool {
if entry == nil || entry.Attributes == nil {
return false
}
ttlSec := entry.Attributes.TtlSec
if ttlSec <= 0 {
return false
}
crTime := entry.Attributes.Crtime
if crTime <= 0 {
return false
}
expirationUnix := crTime + int64(ttlSec)
return expirationUnix < nowUnix()
}