fix: admin UI bucket deletion with filer group configured (#7735)

This commit is contained in:
Chris Lu
2025-12-13 19:04:12 -08:00
committed by GitHub
parent f70cd05404
commit 51c2ab0107
8 changed files with 767 additions and 41 deletions

View File

@@ -30,10 +30,41 @@ import (
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
)
const (
// DefaultBucketsPath is the default path for S3 buckets in the filer
DefaultBucketsPath = "/buckets"
)
// FilerConfig holds filer configuration needed for bucket operations
type FilerConfig struct {
BucketsPath string
FilerGroup string
}
// getFilerConfig retrieves the filer configuration (buckets path and filer group)
func (s *AdminServer) getFilerConfig() (*FilerConfig, error) {
config := &FilerConfig{
BucketsPath: s3_constants.DefaultBucketsPath,
FilerGroup: "",
}
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer configuration: %w", err)
}
if resp.DirBuckets != "" {
config.BucketsPath = resp.DirBuckets
}
config.FilerGroup = resp.FilerGroup
return nil
})
return config, err
}
// getCollectionName returns the collection name for a bucket, prefixed with filer group if configured
func getCollectionName(filerGroup, bucketName string) string {
if filerGroup != "" {
return fmt.Sprintf("%s_%s", filerGroup, bucketName)
}
return bucketName
}
type AdminServer struct {
masterClient *wdclient.MasterClient
@@ -255,28 +286,17 @@ func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) {
return nil, fmt.Errorf("failed to get volume information: %w", err)
}
// Get filer configuration to determine FilerGroup
var filerGroup string
err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
configResp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
glog.Warningf("Failed to get filer configuration: %v", err)
// Continue without filer group
return nil
}
filerGroup = configResp.FilerGroup
return nil
})
// Get filer configuration (buckets path and filer group)
filerConfig, err := s.getFilerConfig()
if err != nil {
return nil, fmt.Errorf("failed to get filer configuration: %w", err)
glog.Warningf("Failed to get filer configuration, using defaults: %v", err)
}
// Now list buckets from the filer and match with collection data
err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
// List buckets by looking at the /buckets directory
// List buckets by looking at the buckets directory
stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
Directory: DefaultBucketsPath,
Directory: filerConfig.BucketsPath,
Prefix: "",
StartFromFileName: "",
InclusiveStartFrom: false,
@@ -299,12 +319,7 @@ func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) {
bucketName := resp.Entry.Name
// Determine collection name for this bucket
var collectionName string
if filerGroup != "" {
collectionName = fmt.Sprintf("%s_%s", filerGroup, bucketName)
} else {
collectionName = bucketName
}
collectionName := getCollectionName(filerConfig.FilerGroup, bucketName)
// Get size and object count from collection data
var size int64
@@ -373,7 +388,13 @@ func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) {
// GetBucketDetails retrieves detailed information about a specific bucket
func (s *AdminServer) GetBucketDetails(bucketName string) (*BucketDetails, error) {
bucketPath := fmt.Sprintf("/buckets/%s", bucketName)
// Get filer configuration (buckets path)
filerConfig, err := s.getFilerConfig()
if err != nil {
glog.Warningf("Failed to get filer configuration, using defaults: %v", err)
}
bucketPath := fmt.Sprintf("%s/%s", filerConfig.BucketsPath, bucketName)
details := &BucketDetails{
Bucket: S3Bucket{
@@ -383,10 +404,10 @@ func (s *AdminServer) GetBucketDetails(bucketName string) (*BucketDetails, error
UpdatedAt: time.Now(),
}
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
// Get bucket info
bucketResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
Directory: DefaultBucketsPath,
Directory: filerConfig.BucketsPath,
Name: bucketName,
})
if err != nil {
@@ -434,7 +455,7 @@ func (s *AdminServer) GetBucketDetails(bucketName string) (*BucketDetails, error
details.Bucket.Owner = owner
// List objects in bucket (recursively)
return s.listBucketObjects(client, bucketPath, "", details)
return s.listBucketObjects(client, bucketPath, bucketPath, "", details)
})
if err != nil {
@@ -445,7 +466,8 @@ func (s *AdminServer) GetBucketDetails(bucketName string) (*BucketDetails, error
}
// listBucketObjects recursively lists all objects in a bucket
func (s *AdminServer) listBucketObjects(client filer_pb.SeaweedFilerClient, directory, prefix string, details *BucketDetails) error {
// bucketBasePath is the full path to the bucket (e.g., /buckets/mybucket)
func (s *AdminServer) listBucketObjects(client filer_pb.SeaweedFilerClient, bucketBasePath, directory, prefix string, details *BucketDetails) error {
stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
Directory: directory,
Prefix: prefix,
@@ -470,16 +492,16 @@ func (s *AdminServer) listBucketObjects(client filer_pb.SeaweedFilerClient, dire
if entry.IsDirectory {
// Recursively list subdirectories
subDir := fmt.Sprintf("%s/%s", directory, entry.Name)
err := s.listBucketObjects(client, subDir, "", details)
err := s.listBucketObjects(client, bucketBasePath, subDir, "", details)
if err != nil {
return err
}
} else {
// Add file object
objectKey := entry.Name
if directory != fmt.Sprintf("/buckets/%s", details.Bucket.Name) {
if directory != bucketBasePath {
// Remove bucket prefix to get relative path
relativePath := directory[len(fmt.Sprintf("/buckets/%s", details.Bucket.Name))+1:]
relativePath := directory[len(bucketBasePath)+1:]
objectKey = fmt.Sprintf("%s/%s", relativePath, entry.Name)
}
@@ -511,10 +533,17 @@ func (s *AdminServer) CreateS3Bucket(bucketName string) error {
// DeleteS3Bucket deletes an S3 bucket and all its contents
func (s *AdminServer) DeleteS3Bucket(bucketName string) error {
// First, check if bucket has Object Lock enabled and if there are locked objects
ctx := context.Background()
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
return s3api.CheckBucketForLockedObjects(ctx, client, DefaultBucketsPath, bucketName)
// Get filer configuration (buckets path and filer group)
filerConfig, err := s.getFilerConfig()
if err != nil {
return fmt.Errorf("failed to get filer configuration: %w", err)
}
// Check if bucket has Object Lock enabled and if there are locked objects
err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
return s3api.CheckBucketForLockedObjects(ctx, client, filerConfig.BucketsPath, bucketName)
})
if err != nil {
return err
@@ -522,21 +551,23 @@ func (s *AdminServer) DeleteS3Bucket(bucketName string) error {
// Delete the collection first (same as s3.bucket.delete shell command)
// This ensures volume data is cleaned up properly
// Collection name must be prefixed with filer group if configured
collectionName := getCollectionName(filerConfig.FilerGroup, bucketName)
err = s.WithMasterClient(func(client master_pb.SeaweedClient) error {
_, err := client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{
Name: bucketName,
Name: collectionName,
})
return err
})
if err != nil {
return fmt.Errorf("failed to delete collection: %w", err)
return fmt.Errorf("failed to delete collection %s: %w", collectionName, err)
}
// Then delete bucket directory recursively from filer
// Use same parameters as s3.bucket.delete shell command and S3 API
return s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
_, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{
Directory: DefaultBucketsPath,
Directory: filerConfig.BucketsPath,
Name: bucketName,
IsDeleteData: false, // Collection already deleted, just remove metadata
IsRecursive: true,