Fix error on deleting non-empty bucket (#8376)
* Move check for non-empty bucket deletion out of `WithFilerClient` call * Added proper checking if a bucket has "user" objects
This commit is contained in:
committed by
GitHub
parent
36c469e34e
commit
2f837c4780
@@ -371,20 +371,18 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
|
||||
}
|
||||
}
|
||||
|
||||
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
if !s3a.option.AllowDeleteBucketNotEmpty {
|
||||
entries, _, err := s3a.list(s3a.option.BucketsPath+"/"+bucket, "", "", false, 2)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to list bucket %s: %v", bucket, err)
|
||||
}
|
||||
for _, entry := range entries {
|
||||
// Allow bucket deletion if only special directories remain
|
||||
if entry.Name != s3_constants.MultipartUploadsFolder &&
|
||||
!strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) {
|
||||
return errors.New(s3err.GetAPIError(s3err.ErrBucketNotEmpty).Code)
|
||||
}
|
||||
}
|
||||
if !s3a.option.AllowDeleteBucketNotEmpty {
|
||||
if hasUserObjects, err := s3a.bucketHasUserObjects(bucket); err != nil {
|
||||
glog.Errorf("failed to list bucket %s: %v", bucket, err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
} else if hasUserObjects {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrBucketNotEmpty)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
|
||||
// delete collection
|
||||
deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{
|
||||
@@ -400,11 +398,7 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
s3ErrorCode := s3err.ErrInternalError
|
||||
if err.Error() == s3err.GetAPIError(s3err.ErrBucketNotEmpty).Code {
|
||||
s3ErrorCode = s3err.ErrBucketNotEmpty
|
||||
}
|
||||
s3err.WriteErrorResponse(w, r, s3ErrorCode)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -421,6 +415,33 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
|
||||
s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
|
||||
}
|
||||
|
||||
// bucketHasUserObjects checks whether a bucket contains any non-special entries.
|
||||
// Special entries (.uploads, *.versions) are internal to S3 and don't count as user objects.
|
||||
func (s3a *S3ApiServer) bucketHasUserObjects(bucket string) (bool, error) {
|
||||
bucketPath := s3a.option.BucketsPath + "/" + bucket
|
||||
startFrom := ""
|
||||
// Start with a small batch — most non-empty buckets have a real object early.
|
||||
// If we only find special entries, switch to larger batches to page through quickly.
|
||||
limit := uint32(10)
|
||||
for {
|
||||
entries, isLast, err := s3a.list(bucketPath, "", startFrom, false, limit)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, entry := range entries {
|
||||
if entry.Name != s3_constants.MultipartUploadsFolder &&
|
||||
!strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) {
|
||||
return true, nil
|
||||
}
|
||||
startFrom = entry.Name
|
||||
}
|
||||
if isLast {
|
||||
return false, nil
|
||||
}
|
||||
limit = 1000
|
||||
}
|
||||
}
|
||||
|
||||
// hasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold
|
||||
// Delegates to the shared HasObjectsWithActiveLocks function in object_lock_utils.go
|
||||
func (s3a *S3ApiServer) hasObjectsWithActiveLocks(ctx context.Context, bucket string) (bool, error) {
|
||||
|
||||
Reference in New Issue
Block a user