Files
seaweedFS/weed/s3api/s3tables/handler_table.go
Chris Lu b7bba7e7dc s3tables: Generate ARNs using resource owner account ID
Change ARN generation to use resource OwnerAccountID instead of caller
identity (h.getAccountID(r)). This ensures ARNs are stable and consistent
regardless of which principal accesses the resource.

Updated generateTableBucketARN and generateTableARN function signatures
to accept ownerAccountID parameter. All call sites updated to pass the
resource owner's account ID from metadata.

This prevents ARN inconsistency issues when multiple principals have
access to the same resource via policies.
2026-01-28 17:38:22 -08:00

706 lines
22 KiB
Go

package s3tables
import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
// handleCreateTable creates a new table in a namespace
func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
var req CreateTableRequest
if err := h.readRequestBody(r, &req); err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
if req.TableBucketARN == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
return fmt.Errorf("tableBucketARN is required")
}
namespaceName, err := validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
if req.Name == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "name is required")
return fmt.Errorf("name is required")
}
if req.Format == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "format is required")
return fmt.Errorf("format is required")
}
// Validate format
if req.Format != "ICEBERG" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "only ICEBERG format is supported")
return fmt.Errorf("invalid format")
}
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
// Validate table name
tableName, err := validateTableName(req.Name)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
// Check if namespace exists
namespacePath := getNamespacePath(bucketName, namespaceName)
var namespaceMetadata namespaceMetadata
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
data, err := h.getExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyMetadata)
if err != nil {
return err
}
if err := json.Unmarshal(data, &namespaceMetadata); err != nil {
return fmt.Errorf("failed to unmarshal namespace metadata: %w", err)
}
return nil
})
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", namespaceName))
} else {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to check namespace: %v", err))
}
return err
}
// Authorize table creation using policy framework (namespace + bucket policies)
accountID := h.getAccountID(r)
bucketPath := getTableBucketPath(bucketName)
namespacePolicy := ""
bucketPolicy := ""
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// Fetch namespace policy if it exists
policyData, err := h.getExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyPolicy)
if err == nil {
namespacePolicy = string(policyData)
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch namespace policy: %v", err)
}
// Fetch bucket policy if it exists
policyData, err = h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyPolicy)
if err == nil {
bucketPolicy = string(policyData)
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch bucket policy: %v", err)
}
return nil
})
if err != nil {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to fetch policies: %v", err))
return err
}
// Check authorization: namespace policy OR bucket policy OR ownership
nsAllowed := CanCreateTable(accountID, namespaceMetadata.OwnerAccountID, namespacePolicy)
bucketAllowed := CanCreateTable(accountID, namespaceMetadata.OwnerAccountID, bucketPolicy)
if !nsAllowed && !bucketAllowed {
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create table in this namespace")
return ErrAccessDenied
}
tablePath := getTablePath(bucketName, namespaceName, tableName)
// Check if table already exists
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
_, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata)
return err
})
if err == nil {
h.writeError(w, http.StatusConflict, ErrCodeTableAlreadyExists, fmt.Sprintf("table %s already exists", tableName))
return fmt.Errorf("table already exists")
} else if !errors.Is(err, filer_pb.ErrNotFound) {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to check table: %v", err))
return err
}
// Create the table
now := time.Now()
versionToken := generateVersionToken()
metadata := &tableMetadataInternal{
Name: tableName,
Namespace: namespaceName,
Format: req.Format,
CreatedAt: now,
ModifiedAt: now,
OwnerAccountID: h.getAccountID(r),
VersionToken: versionToken,
Metadata: req.Metadata,
}
metadataBytes, err := json.Marshal(metadata)
if err != nil {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to marshal table metadata")
return fmt.Errorf("failed to marshal metadata: %w", err)
}
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// Create table directory
if err := h.createDirectory(r.Context(), client, tablePath); err != nil {
return err
}
// Create data subdirectory for Iceberg files
dataPath := tablePath + "/data"
if err := h.createDirectory(r.Context(), client, dataPath); err != nil {
return err
}
// Set metadata as extended attribute
if err := h.setExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata, metadataBytes); err != nil {
return err
}
// Set tags if provided
if len(req.Tags) > 0 {
tagsBytes, err := json.Marshal(req.Tags)
if err != nil {
return fmt.Errorf("failed to marshal tags: %w", err)
}
if err := h.setExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyTags, tagsBytes); err != nil {
return err
}
}
return nil
})
if err != nil {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to create table")
return err
}
tableARN := h.generateTableARN(metadata.OwnerAccountID, bucketName, namespaceName+"/"+tableName)
resp := &CreateTableResponse{
TableARN: tableARN,
VersionToken: versionToken,
}
h.writeJSON(w, http.StatusOK, resp)
return nil
}
// handleGetTable gets details of a table
func (h *S3TablesHandler) handleGetTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
var req GetTableRequest
if err := h.readRequestBody(r, &req); err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
var bucketName, namespace, tableName string
var err error
// Support getting by ARN or by bucket/namespace/name
if req.TableARN != "" {
bucketName, namespace, tableName, err = parseTableFromARN(req.TableARN)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
} else if req.TableBucketARN != "" && len(req.Namespace) > 0 && req.Name != "" {
bucketName, err = parseBucketNameFromARN(req.TableBucketARN)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
namespace, err = validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
tableName, err = validateTableName(req.Name)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
} else {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "either tableARN or (tableBucketARN, namespace, name) is required")
return fmt.Errorf("missing required parameters")
}
tablePath := getTablePath(bucketName, namespace, tableName)
var metadata tableMetadataInternal
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
data, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata)
if err != nil {
return err
}
if err := json.Unmarshal(data, &metadata); err != nil {
return fmt.Errorf("failed to unmarshal table metadata: %w", err)
}
return nil
})
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", tableName))
} else {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to get table: %v", err))
}
return err
}
// Authorize access to the table using policy framework
accountID := h.getAccountID(r)
bucketPath := getTableBucketPath(bucketName)
tablePolicy := ""
bucketPolicy := ""
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// Fetch table policy if it exists
policyData, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyPolicy)
if err == nil {
tablePolicy = string(policyData)
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch table policy: %v", err)
}
// Fetch bucket policy if it exists
policyData, err = h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyPolicy)
if err == nil {
bucketPolicy = string(policyData)
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch bucket policy: %v", err)
}
return nil
})
if err != nil {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to fetch policies: %v", err))
return err
}
// Check authorization: table policy OR bucket policy OR ownership
tableAllowed := CanGetTable(accountID, metadata.OwnerAccountID, tablePolicy)
bucketAllowed := CanGetTable(accountID, metadata.OwnerAccountID, bucketPolicy)
if !tableAllowed && !bucketAllowed {
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", tableName))
return ErrAccessDenied
}
tableARN := h.generateTableARN(metadata.OwnerAccountID, bucketName, namespace+"/"+tableName)
resp := &GetTableResponse{
Name: metadata.Name,
TableARN: tableARN,
Namespace: []string{metadata.Namespace},
Format: metadata.Format,
CreatedAt: metadata.CreatedAt,
ModifiedAt: metadata.ModifiedAt,
OwnerAccountID: metadata.OwnerAccountID,
MetadataLocation: metadata.MetadataLocation,
VersionToken: metadata.VersionToken,
}
h.writeJSON(w, http.StatusOK, resp)
return nil
}
// handleListTables lists all tables in a namespace or bucket
func (h *S3TablesHandler) handleListTables(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
var req ListTablesRequest
if err := h.readRequestBody(r, &req); err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
if req.TableBucketARN == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
return fmt.Errorf("tableBucketARN is required")
}
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
maxTables := req.MaxTables
if maxTables <= 0 {
maxTables = 100
}
// Cap to prevent uint32 overflow when used in uint32(maxTables*2)
const maxTablesLimit = 1000
if maxTables > maxTablesLimit {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "MaxTables exceeds maximum allowed value")
return fmt.Errorf("invalid maxTables value: %d", maxTables)
}
// Pre-validate namespace before calling WithFilerClient to return 400 on validation errors
var namespaceName string
if len(req.Namespace) > 0 {
var err error
namespaceName, err = validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
}
var tables []TableSummary
var paginationToken string
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
var err error
if len(req.Namespace) > 0 {
// Namespace has already been validated above
namespacePath := getNamespacePath(bucketName, namespaceName)
var nsMeta namespaceMetadata
data, err := h.getExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyMetadata)
if err != nil {
return err // Not Found handled by caller
}
if err := json.Unmarshal(data, &nsMeta); err != nil {
return err
}
if accountID := h.getAccountID(r); accountID != nsMeta.OwnerAccountID {
return ErrAccessDenied
}
tables, paginationToken, err = h.listTablesInNamespaceWithClient(r, client, bucketName, namespaceName, req.Prefix, req.ContinuationToken, maxTables)
} else {
// Check permission (check bucket ownership)
bucketPath := getTableBucketPath(bucketName)
var bucketMeta tableBucketMetadata
data, err := h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata)
if err != nil {
return err
}
if err := json.Unmarshal(data, &bucketMeta); err != nil {
return err
}
if accountID := h.getAccountID(r); accountID != bucketMeta.OwnerAccountID {
return ErrAccessDenied
}
tables, paginationToken, err = h.listTablesInAllNamespaces(r, client, bucketName, req.Prefix, req.ContinuationToken, maxTables)
}
return err
})
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
// If the bucket or namespace directory is not found, return an empty result
tables = []TableSummary{}
paginationToken = ""
} else if isAuthError(err) {
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "Access Denied")
return err
} else {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to list tables: %v", err))
return err
}
}
resp := &ListTablesResponse{
Tables: tables,
ContinuationToken: paginationToken,
}
h.writeJSON(w, http.StatusOK, resp)
return nil
}
// listTablesInNamespaceWithClient lists tables in a specific namespace
func (h *S3TablesHandler) listTablesInNamespaceWithClient(r *http.Request, client filer_pb.SeaweedFilerClient, bucketName, namespaceName, prefix, continuationToken string, maxTables int) ([]TableSummary, string, error) {
namespacePath := getNamespacePath(bucketName, namespaceName)
return h.listTablesWithClient(r, client, namespacePath, bucketName, namespaceName, prefix, continuationToken, maxTables)
}
func (h *S3TablesHandler) listTablesWithClient(r *http.Request, client filer_pb.SeaweedFilerClient, dirPath, bucketName, namespaceName, prefix, continuationToken string, maxTables int) ([]TableSummary, string, error) {
var tables []TableSummary
lastFileName := continuationToken
ctx := r.Context()
for len(tables) < maxTables {
resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
Directory: dirPath,
Limit: uint32(maxTables * 2),
StartFromFileName: lastFileName,
InclusiveStartFrom: lastFileName == "" || lastFileName == continuationToken,
})
if err != nil {
return nil, "", err
}
hasMore := false
for {
entry, respErr := resp.Recv()
if respErr != nil {
if respErr == io.EOF {
break
}
return nil, "", respErr
}
if entry.Entry == nil {
continue
}
// Skip the start item if it was included in the previous page
if len(tables) == 0 && continuationToken != "" && entry.Entry.Name == continuationToken {
continue
}
hasMore = true
lastFileName = entry.Entry.Name
if !entry.Entry.IsDirectory {
continue
}
// Skip hidden entries
if strings.HasPrefix(entry.Entry.Name, ".") {
continue
}
// Apply prefix filter
if prefix != "" && !strings.HasPrefix(entry.Entry.Name, prefix) {
continue
}
// Read table metadata from extended attribute
data, ok := entry.Entry.Extended[ExtendedKeyMetadata]
if !ok {
continue
}
var metadata tableMetadataInternal
if err := json.Unmarshal(data, &metadata); err != nil {
continue
}
if metadata.OwnerAccountID != h.getAccountID(r) {
continue
}
tableARN := h.generateTableARN(metadata.OwnerAccountID, bucketName, namespaceName+"/"+entry.Entry.Name)
tables = append(tables, TableSummary{
Name: entry.Entry.Name,
TableARN: tableARN,
Namespace: []string{namespaceName},
CreatedAt: metadata.CreatedAt,
ModifiedAt: metadata.ModifiedAt,
})
if len(tables) >= maxTables {
return tables, lastFileName, nil
}
}
if !hasMore {
break
}
}
if len(tables) < maxTables {
lastFileName = ""
}
return tables, lastFileName, nil
}
func (h *S3TablesHandler) listTablesInAllNamespaces(r *http.Request, client filer_pb.SeaweedFilerClient, bucketName, prefix, continuationToken string, maxTables int) ([]TableSummary, string, error) {
bucketPath := getTableBucketPath(bucketName)
ctx := r.Context()
var continuationNamespace string
var startTableName string
if continuationToken != "" {
if parts := strings.SplitN(continuationToken, "/", 2); len(parts) == 2 {
continuationNamespace = parts[0]
startTableName = parts[1]
} else {
continuationNamespace = continuationToken
}
}
var tables []TableSummary
lastNamespace := continuationNamespace
for {
// List namespaces in batches
resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
Directory: bucketPath,
Limit: 100,
StartFromFileName: lastNamespace,
InclusiveStartFrom: (lastNamespace == continuationNamespace && startTableName != "") || (lastNamespace == "" && continuationNamespace == ""),
})
if err != nil {
return nil, "", err
}
hasMore := false
for {
entry, respErr := resp.Recv()
if respErr != nil {
if respErr == io.EOF {
break
}
return nil, "", respErr
}
if entry.Entry == nil {
continue
}
hasMore = true
lastNamespace = entry.Entry.Name
if !entry.Entry.IsDirectory || strings.HasPrefix(entry.Entry.Name, ".") {
continue
}
namespace := entry.Entry.Name
tableNameFilter := ""
if namespace == continuationNamespace {
tableNameFilter = startTableName
}
nsTables, nsToken, err := h.listTablesInNamespaceWithClient(r, client, bucketName, namespace, prefix, tableNameFilter, maxTables-len(tables))
if err != nil {
glog.Warningf("S3Tables: failed to list tables in namespace %s/%s: %v", bucketName, namespace, err)
continue
}
tables = append(tables, nsTables...)
if namespace == continuationNamespace {
startTableName = ""
}
if len(tables) >= maxTables {
paginationToken := namespace + "/" + nsToken
if nsToken == "" {
// If we hit the limit exactly at the end of a namespace, the next token should be the next namespace
paginationToken = namespace // This will start from the NEXT namespace in the outer loop
}
return tables, paginationToken, nil
}
}
if !hasMore {
break
}
}
return tables, "", nil
}
// handleDeleteTable deletes a table from a namespace
func (h *S3TablesHandler) handleDeleteTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
var req DeleteTableRequest
if err := h.readRequestBody(r, &req); err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
if req.TableBucketARN == "" || len(req.Namespace) == 0 || req.Name == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required")
return fmt.Errorf("missing required parameters")
}
namespaceName, err := validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
tableName, err := validateTableName(req.Name)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
tablePath := getTablePath(bucketName, namespaceName, tableName)
// Check if table exists and enforce VersionToken if provided
var metadata tableMetadataInternal
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
data, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata)
if err != nil {
return err
}
if err := json.Unmarshal(data, &metadata); err != nil {
return fmt.Errorf("failed to unmarshal table metadata: %w", err)
}
if req.VersionToken != "" {
if metadata.VersionToken != req.VersionToken {
return ErrVersionTokenMismatch
}
}
return nil
})
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", tableName))
} else if errors.Is(err, ErrVersionTokenMismatch) {
h.writeError(w, http.StatusConflict, ErrCodeConflict, "version token mismatch")
} else {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to check table: %v", err))
}
return err
}
// Check ownership
if accountID := h.getAccountID(r); accountID != metadata.OwnerAccountID {
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", tableName))
return ErrAccessDenied
}
// Delete the table
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return h.deleteDirectory(r.Context(), client, tablePath)
})
if err != nil {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete table")
return err
}
h.writeJSON(w, http.StatusOK, nil)
return nil
}