S3: prevent deleting buckets with object locking (#7434)
* prevent deleting buckets with object locking * addressing comments * Update s3api_bucket_handlers.go * address comments * early return * refactor * simplify * constant * go fmt
This commit is contained in:
239
test/s3/retention/s3_bucket_delete_with_lock_test.go
Normal file
239
test/s3/retention/s3_bucket_delete_with_lock_test.go
Normal file
@@ -0,0 +1,239 @@
|
|||||||
|
package retention
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go-v2/aws"
|
||||||
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||||
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestBucketDeletionWithObjectLock tests that buckets with object lock enabled
|
||||||
|
// cannot be deleted if they contain objects with active retention or legal hold
|
||||||
|
func TestBucketDeletionWithObjectLock(t *testing.T) {
|
||||||
|
client := getS3Client(t)
|
||||||
|
bucketName := getNewBucketName()
|
||||||
|
|
||||||
|
// Create bucket with object lock enabled
|
||||||
|
createBucketWithObjectLock(t, client, bucketName)
|
||||||
|
|
||||||
|
// Table-driven test for retention modes
|
||||||
|
retentionTestCases := []struct {
|
||||||
|
name string
|
||||||
|
lockMode types.ObjectLockMode
|
||||||
|
}{
|
||||||
|
{name: "ComplianceRetention", lockMode: types.ObjectLockModeCompliance},
|
||||||
|
{name: "GovernanceRetention", lockMode: types.ObjectLockModeGovernance},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range retentionTestCases {
|
||||||
|
t.Run(fmt.Sprintf("CannotDeleteBucketWith%s", tc.name), func(t *testing.T) {
|
||||||
|
key := fmt.Sprintf("test-%s", strings.ToLower(strings.ReplaceAll(tc.name, "Retention", "-retention")))
|
||||||
|
content := fmt.Sprintf("test content for %s", strings.ToLower(tc.name))
|
||||||
|
retainUntilDate := time.Now().Add(10 * time.Second) // 10 seconds in future
|
||||||
|
|
||||||
|
// Upload object with retention
|
||||||
|
_, err := client.PutObject(context.Background(), &s3.PutObjectInput{
|
||||||
|
Bucket: aws.String(bucketName),
|
||||||
|
Key: aws.String(key),
|
||||||
|
Body: strings.NewReader(content),
|
||||||
|
ObjectLockMode: tc.lockMode,
|
||||||
|
ObjectLockRetainUntilDate: aws.Time(retainUntilDate),
|
||||||
|
})
|
||||||
|
require.NoError(t, err, "PutObject with %s should succeed", tc.name)
|
||||||
|
|
||||||
|
// Try to delete bucket - should fail because object has active retention
|
||||||
|
_, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{
|
||||||
|
Bucket: aws.String(bucketName),
|
||||||
|
})
|
||||||
|
require.Error(t, err, "DeleteBucket should fail when objects have active retention")
|
||||||
|
assert.Contains(t, err.Error(), "BucketNotEmpty", "Error should be BucketNotEmpty")
|
||||||
|
t.Logf("Expected error: %v", err)
|
||||||
|
|
||||||
|
// Wait for retention to expire with dynamic sleep based on actual retention time
|
||||||
|
t.Logf("Waiting for %s to expire...", tc.name)
|
||||||
|
time.Sleep(time.Until(retainUntilDate) + time.Second)
|
||||||
|
|
||||||
|
// Delete the object
|
||||||
|
_, err = client.DeleteObject(context.Background(), &s3.DeleteObjectInput{
|
||||||
|
Bucket: aws.String(bucketName),
|
||||||
|
Key: aws.String(key),
|
||||||
|
})
|
||||||
|
require.NoError(t, err, "DeleteObject should succeed after retention expires")
|
||||||
|
|
||||||
|
// Clean up versions
|
||||||
|
deleteAllObjectVersions(t, client, bucketName)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test 3: Bucket deletion with legal hold should fail
|
||||||
|
t.Run("CannotDeleteBucketWithLegalHold", func(t *testing.T) {
|
||||||
|
key := "test-legal-hold"
|
||||||
|
content := "test content for legal hold"
|
||||||
|
|
||||||
|
// Upload object first
|
||||||
|
_, err := client.PutObject(context.Background(), &s3.PutObjectInput{
|
||||||
|
Bucket: aws.String(bucketName),
|
||||||
|
Key: aws.String(key),
|
||||||
|
Body: strings.NewReader(content),
|
||||||
|
})
|
||||||
|
require.NoError(t, err, "PutObject should succeed")
|
||||||
|
|
||||||
|
// Set legal hold on the object
|
||||||
|
_, err = client.PutObjectLegalHold(context.Background(), &s3.PutObjectLegalHoldInput{
|
||||||
|
Bucket: aws.String(bucketName),
|
||||||
|
Key: aws.String(key),
|
||||||
|
LegalHold: &types.ObjectLockLegalHold{Status: types.ObjectLockLegalHoldStatusOn},
|
||||||
|
})
|
||||||
|
require.NoError(t, err, "PutObjectLegalHold should succeed")
|
||||||
|
|
||||||
|
// Try to delete bucket - should fail because object has active legal hold
|
||||||
|
_, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{
|
||||||
|
Bucket: aws.String(bucketName),
|
||||||
|
})
|
||||||
|
require.Error(t, err, "DeleteBucket should fail when objects have active legal hold")
|
||||||
|
assert.Contains(t, err.Error(), "BucketNotEmpty", "Error should be BucketNotEmpty")
|
||||||
|
t.Logf("Expected error: %v", err)
|
||||||
|
|
||||||
|
// Remove legal hold
|
||||||
|
_, err = client.PutObjectLegalHold(context.Background(), &s3.PutObjectLegalHoldInput{
|
||||||
|
Bucket: aws.String(bucketName),
|
||||||
|
Key: aws.String(key),
|
||||||
|
LegalHold: &types.ObjectLockLegalHold{Status: types.ObjectLockLegalHoldStatusOff},
|
||||||
|
})
|
||||||
|
require.NoError(t, err, "Removing legal hold should succeed")
|
||||||
|
|
||||||
|
// Delete the object
|
||||||
|
_, err = client.DeleteObject(context.Background(), &s3.DeleteObjectInput{
|
||||||
|
Bucket: aws.String(bucketName),
|
||||||
|
Key: aws.String(key),
|
||||||
|
})
|
||||||
|
require.NoError(t, err, "DeleteObject should succeed after legal hold is removed")
|
||||||
|
|
||||||
|
// Clean up versions
|
||||||
|
deleteAllObjectVersions(t, client, bucketName)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Test 4: Bucket deletion should succeed when no objects have active locks
|
||||||
|
t.Run("CanDeleteBucketWithoutActiveLocks", func(t *testing.T) {
|
||||||
|
// Make sure all objects are deleted
|
||||||
|
deleteAllObjectVersions(t, client, bucketName)
|
||||||
|
|
||||||
|
// Use retry mechanism for eventual consistency instead of fixed sleep
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
_, err := client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{
|
||||||
|
Bucket: aws.String(bucketName),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("Retrying DeleteBucket due to: %v", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}, 5*time.Second, 500*time.Millisecond, "DeleteBucket should succeed when no objects have active locks")
|
||||||
|
|
||||||
|
t.Logf("Successfully deleted bucket without active locks")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestBucketDeletionWithVersionedLocks tests deletion with versioned objects under lock
|
||||||
|
func TestBucketDeletionWithVersionedLocks(t *testing.T) {
|
||||||
|
client := getS3Client(t)
|
||||||
|
bucketName := getNewBucketName()
|
||||||
|
|
||||||
|
// Create bucket with object lock enabled
|
||||||
|
createBucketWithObjectLock(t, client, bucketName)
|
||||||
|
defer deleteBucket(t, client, bucketName) // Best effort cleanup
|
||||||
|
|
||||||
|
key := "test-versioned-locks"
|
||||||
|
content1 := "version 1 content"
|
||||||
|
content2 := "version 2 content"
|
||||||
|
retainUntilDate := time.Now().Add(10 * time.Second)
|
||||||
|
|
||||||
|
// Upload first version with retention
|
||||||
|
putResp1, err := client.PutObject(context.Background(), &s3.PutObjectInput{
|
||||||
|
Bucket: aws.String(bucketName),
|
||||||
|
Key: aws.String(key),
|
||||||
|
Body: strings.NewReader(content1),
|
||||||
|
ObjectLockMode: types.ObjectLockModeGovernance,
|
||||||
|
ObjectLockRetainUntilDate: aws.Time(retainUntilDate),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
version1 := *putResp1.VersionId
|
||||||
|
|
||||||
|
// Upload second version with retention
|
||||||
|
putResp2, err := client.PutObject(context.Background(), &s3.PutObjectInput{
|
||||||
|
Bucket: aws.String(bucketName),
|
||||||
|
Key: aws.String(key),
|
||||||
|
Body: strings.NewReader(content2),
|
||||||
|
ObjectLockMode: types.ObjectLockModeGovernance,
|
||||||
|
ObjectLockRetainUntilDate: aws.Time(retainUntilDate),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
version2 := *putResp2.VersionId
|
||||||
|
|
||||||
|
t.Logf("Created two versions: %s, %s", version1, version2)
|
||||||
|
|
||||||
|
// Try to delete bucket - should fail because versions have active retention
|
||||||
|
_, err = client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{
|
||||||
|
Bucket: aws.String(bucketName),
|
||||||
|
})
|
||||||
|
require.Error(t, err, "DeleteBucket should fail when object versions have active retention")
|
||||||
|
assert.Contains(t, err.Error(), "BucketNotEmpty", "Error should be BucketNotEmpty")
|
||||||
|
t.Logf("Expected error: %v", err)
|
||||||
|
|
||||||
|
// Wait for retention to expire with dynamic sleep based on actual retention time
|
||||||
|
t.Logf("Waiting for retention to expire on all versions...")
|
||||||
|
time.Sleep(time.Until(retainUntilDate) + time.Second)
|
||||||
|
|
||||||
|
// Clean up all versions
|
||||||
|
deleteAllObjectVersions(t, client, bucketName)
|
||||||
|
|
||||||
|
// Wait for eventual consistency and attempt to delete the bucket with retry
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
_, err := client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{
|
||||||
|
Bucket: aws.String(bucketName),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("Retrying DeleteBucket due to: %v", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}, 5*time.Second, 500*time.Millisecond, "DeleteBucket should succeed after all locks expire")
|
||||||
|
|
||||||
|
t.Logf("Successfully deleted bucket after locks expired")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestBucketDeletionWithoutObjectLock tests that buckets without object lock can be deleted normally
|
||||||
|
func TestBucketDeletionWithoutObjectLock(t *testing.T) {
|
||||||
|
client := getS3Client(t)
|
||||||
|
bucketName := getNewBucketName()
|
||||||
|
|
||||||
|
// Create regular bucket without object lock
|
||||||
|
createBucket(t, client, bucketName)
|
||||||
|
|
||||||
|
// Upload some objects
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
_, err := client.PutObject(context.Background(), &s3.PutObjectInput{
|
||||||
|
Bucket: aws.String(bucketName),
|
||||||
|
Key: aws.String(fmt.Sprintf("test-object-%d", i)),
|
||||||
|
Body: strings.NewReader("test content"),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete all objects
|
||||||
|
deleteAllObjectVersions(t, client, bucketName)
|
||||||
|
|
||||||
|
// Delete bucket should succeed
|
||||||
|
_, err := client.DeleteBucket(context.Background(), &s3.DeleteBucketInput{
|
||||||
|
Bucket: aws.String(bucketName),
|
||||||
|
})
|
||||||
|
require.NoError(t, err, "DeleteBucket should succeed for regular bucket")
|
||||||
|
t.Logf("Successfully deleted regular bucket without object lock")
|
||||||
|
}
|
||||||
@@ -141,7 +141,7 @@ func runBenchmark(cmd *Command, args []string) bool {
|
|||||||
fmt.Fprintln(os.Stderr, "Error: -readOnly and -writeOnly are mutually exclusive.")
|
fmt.Fprintln(os.Stderr, "Error: -readOnly and -writeOnly are mutually exclusive.")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
doWrite := true
|
doWrite := true
|
||||||
doRead := true
|
doRead := true
|
||||||
if *b.readOnly {
|
if *b.readOnly {
|
||||||
|
|||||||
@@ -23,9 +23,9 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type DownloadOptions struct {
|
type DownloadOptions struct {
|
||||||
master *string
|
master *string
|
||||||
server *string // deprecated, for backward compatibility
|
server *string // deprecated, for backward compatibility
|
||||||
dir *string
|
dir *string
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|||||||
@@ -109,7 +109,7 @@ func (s3a *S3ApiServer) updateBucketConfigCacheFromEntry(entry *filer_pb.Entry)
|
|||||||
|
|
||||||
bucket := entry.Name
|
bucket := entry.Name
|
||||||
|
|
||||||
glog.V(3).Infof("updateBucketConfigCacheFromEntry: called for bucket %s, ExtObjectLockEnabledKey=%s",
|
glog.V(3).Infof("updateBucketConfigCacheFromEntry: called for bucket %s, ExtObjectLockEnabledKey=%s",
|
||||||
bucket, string(entry.Extended[s3_constants.ExtObjectLockEnabledKey]))
|
bucket, string(entry.Extended[s3_constants.ExtObjectLockEnabledKey]))
|
||||||
|
|
||||||
// Create new bucket config from the entry
|
// Create new bucket config from the entry
|
||||||
|
|||||||
@@ -491,7 +491,7 @@ func TestSignatureV4WithoutProxy(t *testing.T) {
|
|||||||
|
|
||||||
// Set forwarded headers
|
// Set forwarded headers
|
||||||
r.Header.Set("Host", tt.host)
|
r.Header.Set("Host", tt.host)
|
||||||
|
|
||||||
// First, verify that extractHostHeader returns the expected value
|
// First, verify that extractHostHeader returns the expected value
|
||||||
extractedHost := extractHostHeader(r)
|
extractedHost := extractHostHeader(r)
|
||||||
if extractedHost != tt.expectedHost {
|
if extractedHost != tt.expectedHost {
|
||||||
|
|||||||
@@ -313,7 +313,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
|
|||||||
// For versioned buckets, create a version and return the version ID
|
// For versioned buckets, create a version and return the version ID
|
||||||
versionId := generateVersionId()
|
versionId := generateVersionId()
|
||||||
versionFileName := s3a.getVersionFileName(versionId)
|
versionFileName := s3a.getVersionFileName(versionId)
|
||||||
versionDir := dirName + "/" + entryName + ".versions"
|
versionDir := dirName + "/" + entryName + s3_constants.VersionsFolder
|
||||||
|
|
||||||
// Move the completed object to the versions directory
|
// Move the completed object to the versions directory
|
||||||
err = s3a.mkFile(versionDir, versionFileName, finalParts, func(versionEntry *filer_pb.Entry) {
|
err = s3a.mkFile(versionDir, versionFileName, finalParts, func(versionEntry *filer_pb.Entry) {
|
||||||
|
|||||||
@@ -27,5 +27,6 @@ const (
|
|||||||
|
|
||||||
SeaweedStorageDestinationHeader = "x-seaweedfs-destination"
|
SeaweedStorageDestinationHeader = "x-seaweedfs-destination"
|
||||||
MultipartUploadsFolder = ".uploads"
|
MultipartUploadsFolder = ".uploads"
|
||||||
|
VersionsFolder = ".versions"
|
||||||
FolderMimeType = "httpd/unix-directory"
|
FolderMimeType = "httpd/unix-directory"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -350,7 +350,7 @@ func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.Err
|
|||||||
|
|
||||||
// Extract configuration from extended attributes
|
// Extract configuration from extended attributes
|
||||||
if entry.Extended != nil {
|
if entry.Extended != nil {
|
||||||
glog.V(3).Infof("getBucketConfig: checking extended attributes for bucket %s, ExtObjectLockEnabledKey value=%s",
|
glog.V(3).Infof("getBucketConfig: checking extended attributes for bucket %s, ExtObjectLockEnabledKey value=%s",
|
||||||
bucket, string(entry.Extended[s3_constants.ExtObjectLockEnabledKey]))
|
bucket, string(entry.Extended[s3_constants.ExtObjectLockEnabledKey]))
|
||||||
if versioning, exists := entry.Extended[s3_constants.ExtVersioningKey]; exists {
|
if versioning, exists := entry.Extended[s3_constants.ExtVersioningKey]; exists {
|
||||||
config.Versioning = string(versioning)
|
config.Versioning = string(versioning)
|
||||||
@@ -435,7 +435,7 @@ func (s3a *S3ApiServer) updateBucketConfig(bucket string, updateFn func(*BucketC
|
|||||||
glog.Errorf("updateBucketConfig: failed to store Object Lock configuration for bucket %s: %v", bucket, err)
|
glog.Errorf("updateBucketConfig: failed to store Object Lock configuration for bucket %s: %v", bucket, err)
|
||||||
return s3err.ErrInternalError
|
return s3err.ErrInternalError
|
||||||
}
|
}
|
||||||
glog.V(3).Infof("updateBucketConfig: stored Object Lock config in extended attributes for bucket %s, key=%s, value=%s",
|
glog.V(3).Infof("updateBucketConfig: stored Object Lock config in extended attributes for bucket %s, key=%s, value=%s",
|
||||||
bucket, s3_constants.ExtObjectLockEnabledKey, string(config.Entry.Extended[s3_constants.ExtObjectLockEnabledKey]))
|
bucket, s3_constants.ExtObjectLockEnabledKey, string(config.Entry.Extended[s3_constants.ExtObjectLockEnabledKey]))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,9 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"path"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -251,6 +253,28 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if bucket has object lock enabled
|
||||||
|
bucketConfig, errCode := s3a.getBucketConfig(bucket)
|
||||||
|
if errCode != s3err.ErrNone {
|
||||||
|
s3err.WriteErrorResponse(w, r, errCode)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If object lock is enabled, check for objects with active locks
|
||||||
|
if bucketConfig.ObjectLockConfig != nil {
|
||||||
|
hasLockedObjects, checkErr := s3a.hasObjectsWithActiveLocks(bucket)
|
||||||
|
if checkErr != nil {
|
||||||
|
glog.Errorf("DeleteBucketHandler: failed to check for locked objects in bucket %s: %v", bucket, checkErr)
|
||||||
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if hasLockedObjects {
|
||||||
|
glog.V(3).Infof("DeleteBucketHandler: bucket %s has objects with active object locks, cannot delete", bucket)
|
||||||
|
s3err.WriteErrorResponse(w, r, s3err.ErrBucketNotEmpty)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
if !s3a.option.AllowDeleteBucketNotEmpty {
|
if !s3a.option.AllowDeleteBucketNotEmpty {
|
||||||
entries, _, err := s3a.list(s3a.option.BucketsPath+"/"+bucket, "", "", false, 2)
|
entries, _, err := s3a.list(s3a.option.BucketsPath+"/"+bucket, "", "", false, 2)
|
||||||
@@ -258,7 +282,9 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
return fmt.Errorf("failed to list bucket %s: %v", bucket, err)
|
return fmt.Errorf("failed to list bucket %s: %v", bucket, err)
|
||||||
}
|
}
|
||||||
for _, entry := range entries {
|
for _, entry := range entries {
|
||||||
if entry.Name != s3_constants.MultipartUploadsFolder {
|
// Allow bucket deletion if only special directories remain
|
||||||
|
if entry.Name != s3_constants.MultipartUploadsFolder &&
|
||||||
|
!strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) {
|
||||||
return errors.New(s3err.GetAPIError(s3err.ErrBucketNotEmpty).Code)
|
return errors.New(s3err.GetAPIError(s3err.ErrBucketNotEmpty).Code)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -299,6 +325,159 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
|
s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// hasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold
|
||||||
|
func (s3a *S3ApiServer) hasObjectsWithActiveLocks(bucket string) (bool, error) {
|
||||||
|
bucketPath := s3a.option.BucketsPath + "/" + bucket
|
||||||
|
|
||||||
|
// Check all objects including versions for active locks
|
||||||
|
// Establish current time once at the start for consistency across the entire scan
|
||||||
|
hasLocks := false
|
||||||
|
currentTime := time.Now()
|
||||||
|
err := s3a.recursivelyCheckLocks(bucketPath, "", &hasLocks, currentTime)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("error checking for locked objects: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return hasLocks, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
// lockCheckPaginationSize is the page size for listing directories during lock checks
|
||||||
|
lockCheckPaginationSize = 10000
|
||||||
|
)
|
||||||
|
|
||||||
|
// errStopPagination is a sentinel error to signal early termination of pagination
|
||||||
|
var errStopPagination = errors.New("stop pagination")
|
||||||
|
|
||||||
|
// paginateEntries iterates through directory entries with pagination
|
||||||
|
// Calls fn for each page of entries. If fn returns errStopPagination, iteration stops successfully.
|
||||||
|
func (s3a *S3ApiServer) paginateEntries(dir string, fn func(entries []*filer_pb.Entry) error) error {
|
||||||
|
startFrom := ""
|
||||||
|
for {
|
||||||
|
entries, isLast, err := s3a.list(dir, "", startFrom, false, lockCheckPaginationSize)
|
||||||
|
if err != nil {
|
||||||
|
// Fail-safe: propagate error to prevent incorrect bucket deletion
|
||||||
|
return fmt.Errorf("failed to list directory %s: %w", dir, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := fn(entries); err != nil {
|
||||||
|
if errors.Is(err, errStopPagination) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if isLast || len(entries) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Use the last entry name as the start point for next page
|
||||||
|
startFrom = entries[len(entries)-1].Name
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// recursivelyCheckLocks recursively checks all objects and versions for active locks
|
||||||
|
// Uses pagination to handle directories with more than 10,000 entries
|
||||||
|
func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, hasLocks *bool, currentTime time.Time) error {
|
||||||
|
if *hasLocks {
|
||||||
|
// Early exit if we've already found a locked object
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process entries in the current directory with pagination
|
||||||
|
err := s3a.paginateEntries(dir, func(entries []*filer_pb.Entry) error {
|
||||||
|
for _, entry := range entries {
|
||||||
|
if *hasLocks {
|
||||||
|
// Early exit if we've already found a locked object
|
||||||
|
return errStopPagination
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip special directories (multipart uploads, etc)
|
||||||
|
if entry.Name == s3_constants.MultipartUploadsFolder {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if entry.IsDirectory {
|
||||||
|
subDir := path.Join(dir, entry.Name)
|
||||||
|
if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) {
|
||||||
|
// If it's a .versions directory, check all version files with pagination
|
||||||
|
err := s3a.paginateEntries(subDir, func(versionEntries []*filer_pb.Entry) error {
|
||||||
|
for _, versionEntry := range versionEntries {
|
||||||
|
if s3a.entryHasActiveLock(versionEntry, currentTime) {
|
||||||
|
*hasLocks = true
|
||||||
|
glog.V(2).Infof("Found object with active lock in versions: %s/%s", subDir, versionEntry.Name)
|
||||||
|
return errStopPagination
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Recursively check other subdirectories
|
||||||
|
subRelativePath := path.Join(relativePath, entry.Name)
|
||||||
|
if err := s3a.recursivelyCheckLocks(subDir, subRelativePath, hasLocks, currentTime); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Early exit if a locked object was found in the subdirectory
|
||||||
|
if *hasLocks {
|
||||||
|
return errStopPagination
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Check regular files for locks
|
||||||
|
if s3a.entryHasActiveLock(entry, currentTime) {
|
||||||
|
*hasLocks = true
|
||||||
|
objectPath := path.Join(relativePath, entry.Name)
|
||||||
|
glog.V(2).Infof("Found object with active lock: %s", objectPath)
|
||||||
|
return errStopPagination
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// entryHasActiveLock checks if an entry has an active retention or legal hold
|
||||||
|
func (s3a *S3ApiServer) entryHasActiveLock(entry *filer_pb.Entry, currentTime time.Time) bool {
|
||||||
|
if entry.Extended == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for active legal hold
|
||||||
|
if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists {
|
||||||
|
if string(legalHoldBytes) == s3_constants.LegalHoldOn {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for active retention
|
||||||
|
if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
|
||||||
|
mode := string(modeBytes)
|
||||||
|
if mode == s3_constants.RetentionModeCompliance || mode == s3_constants.RetentionModeGovernance {
|
||||||
|
// Check if retention is still active
|
||||||
|
if dateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists {
|
||||||
|
timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
// Fail-safe: if we can't parse the retention date, assume the object is locked
|
||||||
|
// to prevent accidental data loss
|
||||||
|
glog.Warningf("Failed to parse retention date '%s' for entry, assuming locked: %v", string(dateBytes), err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
retainUntil := time.Unix(timestamp, 0)
|
||||||
|
if retainUntil.After(currentTime) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) {
|
func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
bucket, _ := s3_constants.GetBucketAndObject(r)
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
||||||
|
|||||||
@@ -308,7 +308,7 @@ func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
if versioningConfigured {
|
if versioningConfigured {
|
||||||
if versionId != "" && versionId != "null" {
|
if versionId != "" && versionId != "null" {
|
||||||
// Versioned object - update the specific version file in .versions directory
|
// Versioned object - update the specific version file in .versions directory
|
||||||
updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + ".versions"
|
updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + s3_constants.VersionsFolder
|
||||||
} else {
|
} else {
|
||||||
// Latest version in versioned bucket - could be null version or versioned object
|
// Latest version in versioned bucket - could be null version or versioned object
|
||||||
// Extract version ID from the entry to determine where it's stored
|
// Extract version ID from the entry to determine where it's stored
|
||||||
@@ -324,7 +324,7 @@ func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Reque
|
|||||||
updateDirectory = s3a.option.BucketsPath + "/" + bucket
|
updateDirectory = s3a.option.BucketsPath + "/" + bucket
|
||||||
} else {
|
} else {
|
||||||
// Versioned object - stored in .versions directory
|
// Versioned object - stored in .versions directory
|
||||||
updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + ".versions"
|
updateDirectory = s3a.option.BucketsPath + "/" + bucket + "/" + object + s3_constants.VersionsFolder
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -511,7 +511,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Skip .versions directories in regular list operations but track them for logical object creation
|
// Skip .versions directories in regular list operations but track them for logical object creation
|
||||||
if strings.HasSuffix(entry.Name, ".versions") {
|
if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) {
|
||||||
glog.V(4).Infof("Found .versions directory: %s", entry.Name)
|
glog.V(4).Infof("Found .versions directory: %s", entry.Name)
|
||||||
versionsDirs = append(versionsDirs, entry.Name)
|
versionsDirs = append(versionsDirs, entry.Name)
|
||||||
continue
|
continue
|
||||||
@@ -566,7 +566,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Extract object name from .versions directory name (remove .versions suffix)
|
// Extract object name from .versions directory name (remove .versions suffix)
|
||||||
baseObjectName := strings.TrimSuffix(versionsDir, ".versions")
|
baseObjectName := strings.TrimSuffix(versionsDir, s3_constants.VersionsFolder)
|
||||||
|
|
||||||
// Construct full object path relative to bucket
|
// Construct full object path relative to bucket
|
||||||
// dir is something like "/buckets/sea-test-1/Veeam/Backup/vbr/Config"
|
// dir is something like "/buckets/sea-test-1/Veeam/Backup/vbr/Config"
|
||||||
|
|||||||
@@ -463,7 +463,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
|
|||||||
// Check if there's an existing null version in .versions directory and delete it
|
// Check if there's an existing null version in .versions directory and delete it
|
||||||
// This ensures suspended versioning properly overwrites the null version as per S3 spec
|
// This ensures suspended versioning properly overwrites the null version as per S3 spec
|
||||||
// Note: We only delete null versions, NOT regular versions (those should be preserved)
|
// Note: We only delete null versions, NOT regular versions (those should be preserved)
|
||||||
versionsObjectPath := normalizedObject + ".versions"
|
versionsObjectPath := normalizedObject + s3_constants.VersionsFolder
|
||||||
versionsDir := bucketDir + "/" + versionsObjectPath
|
versionsDir := bucketDir + "/" + versionsObjectPath
|
||||||
entries, _, err := s3a.list(versionsDir, "", "", false, 1000)
|
entries, _, err := s3a.list(versionsDir, "", "", false, 1000)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@@ -617,7 +617,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
|
|||||||
// when a new "null" version becomes the latest during suspended versioning
|
// when a new "null" version becomes the latest during suspended versioning
|
||||||
func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object string) error {
|
func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object string) error {
|
||||||
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
||||||
versionsObjectPath := object + ".versions"
|
versionsObjectPath := object + s3_constants.VersionsFolder
|
||||||
versionsDir := bucketDir + "/" + versionsObjectPath
|
versionsDir := bucketDir + "/" + versionsObjectPath
|
||||||
|
|
||||||
glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: updating flags for %s%s", bucket, object)
|
glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: updating flags for %s%s", bucket, object)
|
||||||
@@ -696,12 +696,12 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
|
|||||||
|
|
||||||
// Upload directly to the versions directory
|
// Upload directly to the versions directory
|
||||||
// We need to construct the object path relative to the bucket
|
// We need to construct the object path relative to the bucket
|
||||||
versionObjectPath := normalizedObject + ".versions/" + versionFileName
|
versionObjectPath := normalizedObject + s3_constants.VersionsFolder + "/" + versionFileName
|
||||||
versionUploadUrl := s3a.toFilerUrl(bucket, versionObjectPath)
|
versionUploadUrl := s3a.toFilerUrl(bucket, versionObjectPath)
|
||||||
|
|
||||||
// Ensure the .versions directory exists before uploading
|
// Ensure the .versions directory exists before uploading
|
||||||
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
||||||
versionsDir := normalizedObject + ".versions"
|
versionsDir := normalizedObject + s3_constants.VersionsFolder
|
||||||
err := s3a.mkdir(bucketDir, versionsDir, func(entry *filer_pb.Entry) {
|
err := s3a.mkdir(bucketDir, versionsDir, func(entry *filer_pb.Entry) {
|
||||||
entry.Attributes.Mime = s3_constants.FolderMimeType
|
entry.Attributes.Mime = s3_constants.FolderMimeType
|
||||||
})
|
})
|
||||||
@@ -791,7 +791,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
|
|||||||
// updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version
|
// updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version
|
||||||
func (s3a *S3ApiServer) updateLatestVersionInDirectory(bucket, object, versionId, versionFileName string) error {
|
func (s3a *S3ApiServer) updateLatestVersionInDirectory(bucket, object, versionId, versionFileName string) error {
|
||||||
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
||||||
versionsObjectPath := object + ".versions"
|
versionsObjectPath := object + s3_constants.VersionsFolder
|
||||||
|
|
||||||
// Get the current .versions directory entry with retry logic for filer consistency
|
// Get the current .versions directory entry with retry logic for filer consistency
|
||||||
var versionsEntry *filer_pb.Entry
|
var versionsEntry *filer_pb.Entry
|
||||||
|
|||||||
@@ -95,7 +95,7 @@ func generateVersionId() string {
|
|||||||
|
|
||||||
// getVersionedObjectDir returns the directory path for storing object versions
|
// getVersionedObjectDir returns the directory path for storing object versions
|
||||||
func (s3a *S3ApiServer) getVersionedObjectDir(bucket, object string) string {
|
func (s3a *S3ApiServer) getVersionedObjectDir(bucket, object string) string {
|
||||||
return path.Join(s3a.option.BucketsPath, bucket, object+".versions")
|
return path.Join(s3a.option.BucketsPath, bucket, object+s3_constants.VersionsFolder)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getVersionFileName returns the filename for a specific version
|
// getVersionFileName returns the filename for a specific version
|
||||||
@@ -116,7 +116,7 @@ func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error
|
|||||||
// Make sure to clean up the object path to remove leading slashes
|
// Make sure to clean up the object path to remove leading slashes
|
||||||
cleanObject := strings.TrimPrefix(object, "/")
|
cleanObject := strings.TrimPrefix(object, "/")
|
||||||
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
||||||
versionsDir := bucketDir + "/" + cleanObject + ".versions"
|
versionsDir := bucketDir + "/" + cleanObject + s3_constants.VersionsFolder
|
||||||
|
|
||||||
// Create the delete marker entry in the .versions directory
|
// Create the delete marker entry in the .versions directory
|
||||||
err := s3a.mkFile(versionsDir, versionFileName, nil, func(entry *filer_pb.Entry) {
|
err := s3a.mkFile(versionsDir, versionFileName, nil, func(entry *filer_pb.Entry) {
|
||||||
@@ -301,9 +301,9 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check if this is a .versions directory
|
// Check if this is a .versions directory
|
||||||
if strings.HasSuffix(entry.Name, ".versions") {
|
if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) {
|
||||||
// Extract object name from .versions directory name
|
// Extract object name from .versions directory name
|
||||||
objectKey := strings.TrimSuffix(entryPath, ".versions")
|
objectKey := strings.TrimSuffix(entryPath, s3_constants.VersionsFolder)
|
||||||
normalizedObjectKey := removeDuplicateSlashes(objectKey)
|
normalizedObjectKey := removeDuplicateSlashes(objectKey)
|
||||||
// Mark both keys as processed for backward compatibility
|
// Mark both keys as processed for backward compatibility
|
||||||
processedObjects[objectKey] = true
|
processedObjects[objectKey] = true
|
||||||
@@ -419,7 +419,7 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check if a .versions directory exists for this object
|
// Check if a .versions directory exists for this object
|
||||||
versionsObjectPath := normalizedObjectKey + ".versions"
|
versionsObjectPath := normalizedObjectKey + s3_constants.VersionsFolder
|
||||||
_, versionsErr := s3a.getEntry(currentPath, versionsObjectPath)
|
_, versionsErr := s3a.getEntry(currentPath, versionsObjectPath)
|
||||||
if versionsErr == nil {
|
if versionsErr == nil {
|
||||||
// .versions directory exists
|
// .versions directory exists
|
||||||
@@ -497,7 +497,7 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe
|
|||||||
|
|
||||||
// All versions are now stored in the .versions directory only
|
// All versions are now stored in the .versions directory only
|
||||||
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
||||||
versionsObjectPath := object + ".versions"
|
versionsObjectPath := object + s3_constants.VersionsFolder
|
||||||
glog.V(2).Infof("getObjectVersionList: checking versions directory %s", versionsObjectPath)
|
glog.V(2).Infof("getObjectVersionList: checking versions directory %s", versionsObjectPath)
|
||||||
|
|
||||||
// Get the .versions directory entry to read latest version metadata
|
// Get the .versions directory entry to read latest version metadata
|
||||||
@@ -676,7 +676,7 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st
|
|||||||
versionFile := s3a.getVersionFileName(versionId)
|
versionFile := s3a.getVersionFileName(versionId)
|
||||||
|
|
||||||
// Check if this is the latest version before attempting deletion (for potential metadata update)
|
// Check if this is the latest version before attempting deletion (for potential metadata update)
|
||||||
versionsEntry, dirErr := s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), normalizedObject+".versions")
|
versionsEntry, dirErr := s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), normalizedObject+s3_constants.VersionsFolder)
|
||||||
isLatestVersion := false
|
isLatestVersion := false
|
||||||
if dirErr == nil && versionsEntry.Extended != nil {
|
if dirErr == nil && versionsEntry.Extended != nil {
|
||||||
if latestVersionIdBytes, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest {
|
if latestVersionIdBytes, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest {
|
||||||
@@ -715,7 +715,7 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st
|
|||||||
func (s3a *S3ApiServer) updateLatestVersionAfterDeletion(bucket, object string) error {
|
func (s3a *S3ApiServer) updateLatestVersionAfterDeletion(bucket, object string) error {
|
||||||
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
||||||
cleanObject := strings.TrimPrefix(object, "/")
|
cleanObject := strings.TrimPrefix(object, "/")
|
||||||
versionsObjectPath := cleanObject + ".versions"
|
versionsObjectPath := cleanObject + s3_constants.VersionsFolder
|
||||||
versionsDir := bucketDir + "/" + versionsObjectPath
|
versionsDir := bucketDir + "/" + versionsObjectPath
|
||||||
|
|
||||||
glog.V(1).Infof("updateLatestVersionAfterDeletion: updating latest version for %s/%s, listing %s", bucket, object, versionsDir)
|
glog.V(1).Infof("updateLatestVersionAfterDeletion: updating latest version for %s/%s, listing %s", bucket, object, versionsDir)
|
||||||
@@ -847,7 +847,7 @@ func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb
|
|||||||
normalizedObject := removeDuplicateSlashes(object)
|
normalizedObject := removeDuplicateSlashes(object)
|
||||||
|
|
||||||
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
||||||
versionsObjectPath := normalizedObject + ".versions"
|
versionsObjectPath := normalizedObject + s3_constants.VersionsFolder
|
||||||
|
|
||||||
glog.V(1).Infof("getLatestObjectVersion: looking for latest version of %s/%s (normalized: %s)", bucket, object, normalizedObject)
|
glog.V(1).Infof("getLatestObjectVersion: looking for latest version of %s/%s (normalized: %s)", bucket, object, normalizedObject)
|
||||||
|
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ const (
|
|||||||
// minThroughputBytesPerSecond defines the minimum expected throughput (4KB/s)
|
// minThroughputBytesPerSecond defines the minimum expected throughput (4KB/s)
|
||||||
// Used to calculate timeout scaling based on data transferred
|
// Used to calculate timeout scaling based on data transferred
|
||||||
minThroughputBytesPerSecond = 4000
|
minThroughputBytesPerSecond = 4000
|
||||||
|
|
||||||
// graceTimeCapMultiplier caps the grace period for slow clients at 3x base timeout
|
// graceTimeCapMultiplier caps the grace period for slow clients at 3x base timeout
|
||||||
// This prevents indefinite connections while allowing time for server-side chunk fetches
|
// This prevents indefinite connections while allowing time for server-side chunk fetches
|
||||||
graceTimeCapMultiplier = 3
|
graceTimeCapMultiplier = 3
|
||||||
@@ -90,17 +90,17 @@ func (c *Conn) Write(b []byte) (count int, e error) {
|
|||||||
// Calculate timeout with two components:
|
// Calculate timeout with two components:
|
||||||
// 1. Base timeout scaled by cumulative data (minimum throughput of 4KB/s)
|
// 1. Base timeout scaled by cumulative data (minimum throughput of 4KB/s)
|
||||||
// 2. Additional grace period if there was a gap since last write (for chunk fetch delays)
|
// 2. Additional grace period if there was a gap since last write (for chunk fetch delays)
|
||||||
|
|
||||||
// Calculate expected bytes per timeout period based on minimum throughput (4KB/s)
|
// Calculate expected bytes per timeout period based on minimum throughput (4KB/s)
|
||||||
// Example: with WriteTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB
|
// Example: with WriteTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB
|
||||||
// After writing 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, baseTimeout = 30s * 9 = 270s
|
// After writing 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, baseTimeout = 30s * 9 = 270s
|
||||||
bytesPerTimeout := calculateBytesPerTimeout(c.WriteTimeout)
|
bytesPerTimeout := calculateBytesPerTimeout(c.WriteTimeout)
|
||||||
timeoutMultiplier := time.Duration(c.bytesWritten/bytesPerTimeout + 1)
|
timeoutMultiplier := time.Duration(c.bytesWritten/bytesPerTimeout + 1)
|
||||||
baseTimeout := c.WriteTimeout * timeoutMultiplier
|
baseTimeout := c.WriteTimeout * timeoutMultiplier
|
||||||
|
|
||||||
// If it's been a while since last write, add grace time for server-side chunk fetches
|
// If it's been a while since last write, add grace time for server-side chunk fetches
|
||||||
// But cap it to avoid keeping slow clients connected indefinitely
|
// But cap it to avoid keeping slow clients connected indefinitely
|
||||||
//
|
//
|
||||||
// The comparison uses unscaled WriteTimeout intentionally: triggers grace when idle time
|
// The comparison uses unscaled WriteTimeout intentionally: triggers grace when idle time
|
||||||
// exceeds base timeout, independent of throughput scaling.
|
// exceeds base timeout, independent of throughput scaling.
|
||||||
if !c.lastWrite.IsZero() {
|
if !c.lastWrite.IsZero() {
|
||||||
@@ -120,7 +120,7 @@ func (c *Conn) Write(b []byte) (count int, e error) {
|
|||||||
baseTimeout += graceTime
|
baseTimeout += graceTime
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.Conn.SetWriteDeadline(now.Add(baseTimeout))
|
err := c.Conn.SetWriteDeadline(now.Add(baseTimeout))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
|
|||||||
Reference in New Issue
Block a user