fix(s3): lifecycle TTL rules inherit replication and volumeGrowthCount from filer config (#8321)
* fix(s3): lifecycle TTL rules inherit replication from parent path and filer config PutBucketLifecycleConfiguration wrote filer.conf entries with empty replication, so effective replication could differ from operator default. Now we resolve replication from parent path rule (MatchStorageRule) then filer global config; only Replication is set on the rule (no DataCenter/Rack/DataNode for S3). * add volumeGrowthCount * review --------- Co-authored-by: Dmitiy Gushchin <dag@fivegen.ru>
This commit is contained in:
@@ -21,6 +21,7 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
||||
|
||||
@@ -832,6 +833,28 @@ func (s3a *S3ApiServer) GetBucketLifecycleConfigurationHandler(w http.ResponseWr
|
||||
writeSuccessResponseXML(w, r, response)
|
||||
}
|
||||
|
||||
// resolveLifecycleDefaultsFromFilerConf returns replication and volumeGrowthCount for use when adding a lifecycle TTL rule.
|
||||
// S3 does not set DataCenter/Rack/DataNode so placement is not pinned to a specific DC/rack.
|
||||
// Precedence: parent path rule first, then filer global. If volumeGrowthCount is 0 but replication is set,
|
||||
// use replication's copy count so the rule is valid (volumeGrowthCount must be divisible by copy count).
|
||||
func resolveLifecycleDefaultsFromFilerConf(fc *filer.FilerConf, filerConfigReplication, bucketsPath, bucket string) (replication string, volumeGrowthCount uint32, err error) {
|
||||
bucketPath := fmt.Sprintf("%s/%s/", bucketsPath, bucket)
|
||||
parentRule := fc.MatchStorageRule(bucketPath)
|
||||
replication = parentRule.Replication
|
||||
if replication == "" {
|
||||
replication = filerConfigReplication
|
||||
}
|
||||
volumeGrowthCount = parentRule.VolumeGrowthCount
|
||||
if volumeGrowthCount == 0 && replication != "" {
|
||||
var rp *super_block.ReplicaPlacement
|
||||
rp, err = super_block.NewReplicaPlacementFromString(replication)
|
||||
if err == nil {
|
||||
volumeGrowthCount = uint32(rp.GetCopyCount())
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// PutBucketLifecycleConfigurationHandler Put Bucket Lifecycle configuration
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLifecycleConfiguration.html
|
||||
func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -857,6 +880,24 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
|
||||
// Resolve replication so lifecycle rules do not create filer.conf entries with empty replication.
|
||||
var filerConfigReplication string
|
||||
if filerErr := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
resp, err := client.GetFilerConfiguration(r.Context(), &filer_pb.GetFilerConfigurationRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
filerConfigReplication = resp.GetReplication()
|
||||
return nil
|
||||
}); filerErr != nil {
|
||||
glog.V(2).Infof("PutBucketLifecycleConfigurationHandler: could not get filer config: %v", filerErr)
|
||||
}
|
||||
defaultReplication, defaultVolumeGrowthCount, err := resolveLifecycleDefaultsFromFilerConf(fc, filerConfigReplication, s3a.option.BucketsPath, bucket)
|
||||
if err != nil {
|
||||
glog.Warningf("PutBucketLifecycleConfigurationHandler bucket %s: invalid replication %q: %v", bucket, defaultReplication, err)
|
||||
}
|
||||
|
||||
collectionName := s3a.getCollectionName(bucket)
|
||||
collectionTtls := fc.GetCollectionTtls(collectionName)
|
||||
changed := false
|
||||
@@ -884,6 +925,10 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
|
||||
LocationPrefix: locationPrefix,
|
||||
Collection: collectionName,
|
||||
Ttl: fmt.Sprintf("%dd", rule.Expiration.Days),
|
||||
Replication: defaultReplication,
|
||||
VolumeGrowthCount: defaultVolumeGrowthCount,
|
||||
// DataCenter/Rack/DataNode intentionally not set: S3 is not tied to a specific DC/rack,
|
||||
// requests can hit any filer; setting them would pin placement unnecessarily.
|
||||
}
|
||||
if ttl, ok := collectionTtls[locConf.LocationPrefix]; ok && ttl == locConf.Ttl {
|
||||
continue
|
||||
|
||||
80
weed/s3api/s3api_bucket_handlers_lifecycle_test.go
Normal file
80
weed/s3api/s3api_bucket_handlers_lifecycle_test.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/filer"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestResolveLifecycleDefaultsFromFilerConf(t *testing.T) {
|
||||
// Precedence: global (lowest), then path rules top-down (parent overrides global), then query (highest).
|
||||
// So parent path rule has priority over filer global config.
|
||||
|
||||
t.Run("parent_rule_replication_takes_precedence_over_filer_config", func(t *testing.T) {
|
||||
fc := filer.NewFilerConf()
|
||||
fc.SetLocationConf(&filer_pb.FilerConf_PathConf{
|
||||
LocationPrefix: "/buckets/",
|
||||
Replication: "001",
|
||||
})
|
||||
repl, vgc, err := resolveLifecycleDefaultsFromFilerConf(fc, "010", "/buckets", "mybucket")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "001", repl, "parent path rule must override filer global config")
|
||||
assert.Equal(t, uint32(2), vgc, "volumeGrowthCount derived from replication 001 copy count (SameRackCount=1 -> 2 copies)")
|
||||
})
|
||||
|
||||
t.Run("falls_back_to_filer_config_when_parent_rule_replication_empty", func(t *testing.T) {
|
||||
fc := filer.NewFilerConf()
|
||||
fc.SetLocationConf(&filer_pb.FilerConf_PathConf{
|
||||
LocationPrefix: "/buckets/",
|
||||
Replication: "", // no replication on parent
|
||||
})
|
||||
repl, vgc, err := resolveLifecycleDefaultsFromFilerConf(fc, "010", "/buckets", "mybucket")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "010", repl, "replication should come from filer config when parent rule has none")
|
||||
assert.Equal(t, uint32(2), vgc, "volumeGrowthCount derived from replication 010 copy count")
|
||||
})
|
||||
|
||||
t.Run("parent_rule_empty_when_no_matching_prefix_uses_filer_config", func(t *testing.T) {
|
||||
fc := filer.NewFilerConf()
|
||||
// no rules; parent path /buckets/mybucket/ matches nothing
|
||||
repl, vgc, err := resolveLifecycleDefaultsFromFilerConf(fc, "010", "/buckets", "mybucket")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "010", repl, "when no path rule, use filer config replication")
|
||||
assert.Equal(t, uint32(2), vgc, "volumeGrowthCount derived from replication 010")
|
||||
})
|
||||
|
||||
t.Run("all_empty_when_no_parent_rule_and_no_filer_config", func(t *testing.T) {
|
||||
fc := filer.NewFilerConf()
|
||||
repl, vgc, err := resolveLifecycleDefaultsFromFilerConf(fc, "", "/buckets", "mybucket")
|
||||
assert.NoError(t, err)
|
||||
assert.Empty(t, repl)
|
||||
assert.Equal(t, uint32(0), vgc)
|
||||
})
|
||||
|
||||
t.Run("parent_rule_volume_growth_count_used_when_set", func(t *testing.T) {
|
||||
fc := filer.NewFilerConf()
|
||||
fc.SetLocationConf(&filer_pb.FilerConf_PathConf{
|
||||
LocationPrefix: "/buckets/",
|
||||
Replication: "010",
|
||||
VolumeGrowthCount: 4,
|
||||
})
|
||||
repl, vgc, err := resolveLifecycleDefaultsFromFilerConf(fc, "010", "/buckets", "mybucket")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "010", repl)
|
||||
assert.Equal(t, uint32(4), vgc, "parent VolumeGrowthCount must be used when set")
|
||||
})
|
||||
|
||||
t.Run("invalid_replication_returns_error", func(t *testing.T) {
|
||||
fc := filer.NewFilerConf()
|
||||
fc.SetLocationConf(&filer_pb.FilerConf_PathConf{
|
||||
LocationPrefix: "/buckets/",
|
||||
Replication: "0x1", // invalid: non-digit
|
||||
})
|
||||
repl, vgc, err := resolveLifecycleDefaultsFromFilerConf(fc, "", "/buckets", "mybucket")
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, "0x1", repl, "replication string is still returned")
|
||||
assert.Equal(t, uint32(0), vgc, "volumeGrowthCount remains 0 when parse fails")
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user