Updated namespace creation to use authenticated account ID for ownership and unified permission checks across all namespace operations to use the correct account principal.
416 lines
12 KiB
Go
416 lines
12 KiB
Go
package s3tables
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
)
|
|
|
|
// handleCreateNamespace creates a new namespace in a table bucket
|
|
func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
|
var req CreateNamespaceRequest
|
|
if err := h.readRequestBody(r, &req); err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
// Check permission
|
|
principal := h.getPrincipalFromRequest(r)
|
|
accountID := h.getAccountID(r)
|
|
if !CanCreateNamespace(principal, accountID) {
|
|
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create namespace")
|
|
return NewAuthError("CreateNamespace", principal, "not authorized to create namespace")
|
|
}
|
|
|
|
if req.TableBucketARN == "" {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
|
|
return fmt.Errorf("tableBucketARN is required")
|
|
}
|
|
|
|
if len(req.Namespace) == 0 {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "namespace is required")
|
|
return fmt.Errorf("namespace is required")
|
|
}
|
|
|
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
namespaceName, err := validateNamespace(req.Namespace)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
// Check if table bucket exists
|
|
bucketPath := getTableBucketPath(bucketName)
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
_, err := h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata)
|
|
return err
|
|
})
|
|
|
|
if err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchBucket, fmt.Sprintf("table bucket %s not found", bucketName))
|
|
} else {
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to check table bucket: %v", err))
|
|
}
|
|
return err
|
|
}
|
|
|
|
namespacePath := getNamespacePath(bucketName, namespaceName)
|
|
|
|
// Check if namespace already exists
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
_, err := h.getExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyMetadata)
|
|
return err
|
|
})
|
|
|
|
if err == nil {
|
|
h.writeError(w, http.StatusConflict, ErrCodeNamespaceAlreadyExists, fmt.Sprintf("namespace %s already exists", namespaceName))
|
|
return fmt.Errorf("namespace already exists")
|
|
} else if !errors.Is(err, filer_pb.ErrNotFound) {
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to check namespace: %v", err))
|
|
return err
|
|
}
|
|
|
|
// Create the namespace
|
|
now := time.Now()
|
|
metadata := &namespaceMetadata{
|
|
Namespace: req.Namespace,
|
|
CreatedAt: now,
|
|
OwnerID: h.getAccountID(r),
|
|
}
|
|
|
|
metadataBytes, err := json.Marshal(metadata)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to marshal namespace metadata")
|
|
return fmt.Errorf("failed to marshal metadata: %w", err)
|
|
}
|
|
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
// Create namespace directory
|
|
if err := h.createDirectory(r.Context(), client, namespacePath); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Set metadata as extended attribute
|
|
if err := h.setExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyMetadata, metadataBytes); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to create namespace")
|
|
return err
|
|
}
|
|
|
|
resp := &CreateNamespaceResponse{
|
|
Namespace: req.Namespace,
|
|
TableBucketARN: req.TableBucketARN,
|
|
}
|
|
|
|
h.writeJSON(w, http.StatusOK, resp)
|
|
return nil
|
|
}
|
|
|
|
// handleGetNamespace gets details of a namespace
|
|
func (h *S3TablesHandler) handleGetNamespace(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
|
var req GetNamespaceRequest
|
|
if err := h.readRequestBody(r, &req); err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
// Check permission
|
|
principal := h.getPrincipalFromRequest(r)
|
|
accountID := h.getAccountID(r)
|
|
if !CanGetNamespace(principal, accountID) {
|
|
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to get namespace details")
|
|
return NewAuthError("GetNamespace", principal, "not authorized to get namespace details")
|
|
}
|
|
|
|
if req.TableBucketARN == "" {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
|
|
return fmt.Errorf("tableBucketARN is required")
|
|
}
|
|
|
|
namespaceName, err := validateNamespace(req.Namespace)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
namespacePath := getNamespacePath(bucketName, namespaceName)
|
|
|
|
// Get namespace
|
|
var metadata namespaceMetadata
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
data, err := h.getExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyMetadata)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return json.Unmarshal(data, &metadata)
|
|
})
|
|
|
|
if err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", flattenNamespace(req.Namespace)))
|
|
} else {
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to get namespace: %v", err))
|
|
}
|
|
return err
|
|
}
|
|
|
|
resp := &GetNamespaceResponse{
|
|
Namespace: metadata.Namespace,
|
|
CreatedAt: metadata.CreatedAt,
|
|
OwnerAccountID: metadata.OwnerID,
|
|
}
|
|
|
|
h.writeJSON(w, http.StatusOK, resp)
|
|
return nil
|
|
}
|
|
|
|
// handleListNamespaces lists all namespaces in a table bucket
|
|
func (h *S3TablesHandler) handleListNamespaces(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
|
var req ListNamespacesRequest
|
|
if err := h.readRequestBody(r, &req); err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
// Check permission
|
|
principal := h.getPrincipalFromRequest(r)
|
|
accountID := h.getAccountID(r)
|
|
if !CanListNamespaces(principal, accountID) {
|
|
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to list namespaces")
|
|
return NewAuthError("ListNamespaces", principal, "not authorized to list namespaces")
|
|
}
|
|
|
|
if req.TableBucketARN == "" {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
|
|
return fmt.Errorf("tableBucketARN is required")
|
|
}
|
|
|
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
maxNamespaces := req.MaxNamespaces
|
|
if maxNamespaces <= 0 {
|
|
maxNamespaces = 100
|
|
}
|
|
|
|
bucketPath := getTableBucketPath(bucketName)
|
|
var namespaces []NamespaceSummary
|
|
|
|
lastFileName := req.ContinuationToken
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
for len(namespaces) < maxNamespaces {
|
|
resp, err := client.ListEntries(r.Context(), &filer_pb.ListEntriesRequest{
|
|
Directory: bucketPath,
|
|
Limit: uint32(maxNamespaces * 2),
|
|
StartFromFileName: lastFileName,
|
|
InclusiveStartFrom: lastFileName == "" || lastFileName == req.ContinuationToken,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
hasMore := false
|
|
for {
|
|
entry, respErr := resp.Recv()
|
|
if respErr != nil {
|
|
if respErr == io.EOF {
|
|
break
|
|
}
|
|
return respErr
|
|
}
|
|
if entry.Entry == nil {
|
|
continue
|
|
}
|
|
|
|
// Skip the start item if it was included in the previous page
|
|
if len(namespaces) == 0 && req.ContinuationToken != "" && entry.Entry.Name == req.ContinuationToken {
|
|
continue
|
|
}
|
|
|
|
hasMore = true
|
|
lastFileName = entry.Entry.Name
|
|
|
|
if !entry.Entry.IsDirectory {
|
|
continue
|
|
}
|
|
|
|
// Skip hidden entries
|
|
if strings.HasPrefix(entry.Entry.Name, ".") {
|
|
continue
|
|
}
|
|
|
|
// Apply prefix filter
|
|
if req.Prefix != "" && !strings.HasPrefix(entry.Entry.Name, req.Prefix) {
|
|
continue
|
|
}
|
|
|
|
// Read metadata from extended attribute
|
|
data, ok := entry.Entry.Extended[ExtendedKeyMetadata]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
var metadata namespaceMetadata
|
|
if err := json.Unmarshal(data, &metadata); err != nil {
|
|
continue
|
|
}
|
|
|
|
namespaces = append(namespaces, NamespaceSummary{
|
|
Namespace: metadata.Namespace,
|
|
CreatedAt: metadata.CreatedAt,
|
|
})
|
|
|
|
if len(namespaces) >= maxNamespaces {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
if !hasMore {
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
namespaces = []NamespaceSummary{}
|
|
} else {
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to list namespaces: %v", err))
|
|
return err
|
|
}
|
|
}
|
|
|
|
paginationToken := ""
|
|
if len(namespaces) >= maxNamespaces {
|
|
paginationToken = lastFileName
|
|
}
|
|
|
|
resp := &ListNamespacesResponse{
|
|
Namespaces: namespaces,
|
|
ContinuationToken: paginationToken,
|
|
}
|
|
|
|
h.writeJSON(w, http.StatusOK, resp)
|
|
return nil
|
|
}
|
|
|
|
// handleDeleteNamespace deletes a namespace from a table bucket
|
|
func (h *S3TablesHandler) handleDeleteNamespace(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
|
var req DeleteNamespaceRequest
|
|
if err := h.readRequestBody(r, &req); err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
// Check permission
|
|
principal := h.getPrincipalFromRequest(r)
|
|
accountID := h.getAccountID(r)
|
|
if !CanDeleteNamespace(principal, accountID) {
|
|
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to delete namespace")
|
|
return NewAuthError("DeleteNamespace", principal, "not authorized to delete namespace")
|
|
}
|
|
|
|
if req.TableBucketARN == "" {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
|
|
return fmt.Errorf("tableBucketARN is required")
|
|
}
|
|
|
|
namespaceName, err := validateNamespace(req.Namespace)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
namespacePath := getNamespacePath(bucketName, namespaceName)
|
|
|
|
// Check if namespace exists and is empty
|
|
hasChildren := false
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
resp, err := client.ListEntries(r.Context(), &filer_pb.ListEntriesRequest{
|
|
Directory: namespacePath,
|
|
Limit: 10,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
entry, err := resp.Recv()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
return err
|
|
}
|
|
if entry.Entry != nil && !strings.HasPrefix(entry.Entry.Name, ".") {
|
|
hasChildren = true
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", flattenNamespace(req.Namespace)))
|
|
} else {
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to list namespace entries: %v", err))
|
|
}
|
|
return err
|
|
}
|
|
|
|
if hasChildren {
|
|
h.writeError(w, http.StatusConflict, ErrCodeNamespaceNotEmpty, "namespace is not empty")
|
|
return fmt.Errorf("namespace not empty")
|
|
}
|
|
|
|
// Delete the namespace
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
return h.deleteDirectory(r.Context(), client, namespacePath)
|
|
})
|
|
|
|
if err != nil {
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete namespace")
|
|
return err
|
|
}
|
|
|
|
h.writeJSON(w, http.StatusOK, nil)
|
|
return nil
|
|
}
|