What problem are we solving? Fix: #6379 How are we solving the problem? We check for the AllowEmptyFolders option prior to cascade deleting parent folders in S3 DeleteMultipleObjectsHandler. How is the PR tested? We ran SeaweedFS in a Kubernetes Cluster with a joint Filer and S3 server in one container, with leveldb2 as the filer storage, and AllowEmptyFolders set to true. When using the Distribution Registry as the S3 client, it calls the DeleteMultipleObjectsHandler as part of the artifact upload process (uploads to a temp location, then performs a copy and delete). Without this fix, the deletion cascade deleted parent folder until the entire contents of the bucket were gone. With this fix, the existing content of the bucket remained, and the newly uploaded content was added. Checks [ ] I have added unit tests if possible. [ ] I will add related wiki document changes and link to this PR after merging. Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
205 lines
5.8 KiB
Go
205 lines
5.8 KiB
Go
package s3api
|
|
|
|
import (
|
|
"encoding/xml"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
"golang.org/x/exp/slices"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
const (
|
|
deleteMultipleObjectsLimit = 1000
|
|
)
|
|
|
|
func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
bucket, object := s3_constants.GetBucketAndObject(r)
|
|
glog.V(3).Infof("DeleteObjectHandler %s %s", bucket, object)
|
|
|
|
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
|
|
dir, name := target.DirAndName()
|
|
|
|
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
|
|
if err := doDeleteEntry(client, dir, name, true, false); err != nil {
|
|
return err
|
|
}
|
|
|
|
if s3a.option.AllowEmptyFolder {
|
|
return nil
|
|
}
|
|
|
|
directoriesWithDeletion := make(map[string]int)
|
|
if strings.LastIndex(object, "/") > 0 {
|
|
directoriesWithDeletion[dir]++
|
|
// purge empty folders, only checking folders with deletions
|
|
for len(directoriesWithDeletion) > 0 {
|
|
directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}
|
|
|
|
// / ObjectIdentifier carries key name for the object to delete.
|
|
type ObjectIdentifier struct {
|
|
ObjectName string `xml:"Key"`
|
|
}
|
|
|
|
// DeleteObjectsRequest - xml carrying the object key names which needs to be deleted.
|
|
type DeleteObjectsRequest struct {
|
|
// Element to enable quiet mode for the request
|
|
Quiet bool
|
|
// List of objects to be deleted
|
|
Objects []ObjectIdentifier `xml:"Object"`
|
|
}
|
|
|
|
// DeleteError structure.
|
|
type DeleteError struct {
|
|
Code string
|
|
Message string
|
|
Key string
|
|
}
|
|
|
|
// DeleteObjectsResponse container for multiple object deletes.
|
|
type DeleteObjectsResponse struct {
|
|
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ DeleteResult" json:"-"`
|
|
|
|
// Collection of all deleted objects
|
|
DeletedObjects []ObjectIdentifier `xml:"Deleted,omitempty"`
|
|
|
|
// Collection of errors deleting certain objects.
|
|
Errors []DeleteError `xml:"Error,omitempty"`
|
|
}
|
|
|
|
// DeleteMultipleObjectsHandler - Delete multiple objects
|
|
func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
bucket, _ := s3_constants.GetBucketAndObject(r)
|
|
glog.V(3).Infof("DeleteMultipleObjectsHandler %s", bucket)
|
|
|
|
deleteXMLBytes, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
deleteObjects := &DeleteObjectsRequest{}
|
|
if err := xml.Unmarshal(deleteXMLBytes, deleteObjects); err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
|
|
return
|
|
}
|
|
|
|
if len(deleteObjects.Objects) > deleteMultipleObjectsLimit {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxDeleteObjects)
|
|
return
|
|
}
|
|
|
|
var deletedObjects []ObjectIdentifier
|
|
var deleteErrors []DeleteError
|
|
var auditLog *s3err.AccessLog
|
|
|
|
directoriesWithDeletion := make(map[string]int)
|
|
|
|
if s3err.Logger != nil {
|
|
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
|
|
}
|
|
s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
|
|
// delete file entries
|
|
for _, object := range deleteObjects.Objects {
|
|
if object.ObjectName == "" {
|
|
continue
|
|
}
|
|
lastSeparator := strings.LastIndex(object.ObjectName, "/")
|
|
parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.ObjectName, true, false
|
|
if lastSeparator > 0 && lastSeparator+1 < len(object.ObjectName) {
|
|
entryName = object.ObjectName[lastSeparator+1:]
|
|
parentDirectoryPath = "/" + object.ObjectName[:lastSeparator]
|
|
}
|
|
parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath)
|
|
|
|
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
|
|
if err == nil {
|
|
directoriesWithDeletion[parentDirectoryPath]++
|
|
deletedObjects = append(deletedObjects, object)
|
|
} else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
|
|
deletedObjects = append(deletedObjects, object)
|
|
} else {
|
|
delete(directoriesWithDeletion, parentDirectoryPath)
|
|
deleteErrors = append(deleteErrors, DeleteError{
|
|
Code: "",
|
|
Message: err.Error(),
|
|
Key: object.ObjectName,
|
|
})
|
|
}
|
|
if auditLog != nil {
|
|
auditLog.Key = entryName
|
|
s3err.PostAccessLog(*auditLog)
|
|
}
|
|
}
|
|
|
|
if s3a.option.AllowEmptyFolder {
|
|
return nil
|
|
}
|
|
|
|
// purge empty folders, only checking folders with deletions
|
|
for len(directoriesWithDeletion) > 0 {
|
|
directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
deleteResp := DeleteObjectsResponse{}
|
|
if !deleteObjects.Quiet {
|
|
deleteResp.DeletedObjects = deletedObjects
|
|
}
|
|
deleteResp.Errors = deleteErrors
|
|
|
|
writeSuccessResponseXML(w, r, deleteResp)
|
|
|
|
}
|
|
|
|
func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerClient, directoriesWithDeletion map[string]int) (newDirectoriesWithDeletion map[string]int) {
|
|
var allDirs []string
|
|
for dir := range directoriesWithDeletion {
|
|
allDirs = append(allDirs, dir)
|
|
}
|
|
slices.SortFunc(allDirs, func(a, b string) int {
|
|
return len(b) - len(a)
|
|
})
|
|
newDirectoriesWithDeletion = make(map[string]int)
|
|
for _, dir := range allDirs {
|
|
parentDir, dirName := util.FullPath(dir).DirAndName()
|
|
if parentDir == s3a.option.BucketsPath {
|
|
continue
|
|
}
|
|
if err := doDeleteEntry(client, parentDir, dirName, false, false); err != nil {
|
|
glog.V(4).Infof("directory %s has %d deletion but still not empty: %v", dir, directoriesWithDeletion[dir], err)
|
|
} else {
|
|
newDirectoriesWithDeletion[parentDir]++
|
|
}
|
|
}
|
|
return
|
|
}
|