Files
seaweedFS/weed/s3api/s3tables/handler_table.go
Chris Lu 403592bb9f Add Spark Iceberg catalog integration tests and CI support (#8242)
* Add Spark Iceberg catalog integration tests and CI support

Implement comprehensive integration tests for Spark with SeaweedFS Iceberg REST catalog:
- Basic CRUD operations (Create, Read, Update, Delete) on Iceberg tables
- Namespace (database) management
- Data insertion, querying, and deletion
- Time travel capabilities via snapshot versioning
- Compatible with SeaweedFS S3 and Iceberg REST endpoints

Tests mirror the structure of existing Trino integration tests but use Spark's
Python SQL API and PySpark for testing.

Add GitHub Actions CI job for spark-iceberg-catalog-tests in s3-tables-tests.yml
to automatically run Spark integration tests on pull requests.

* fmt

* Fix Spark integration tests - code review feedback

* go mod tidy

* Add go mod tidy step to integration test jobs

Add 'go mod tidy' step before test runs for all integration test jobs:
- s3-tables-tests
- iceberg-catalog-tests
- trino-iceberg-catalog-tests
- spark-iceberg-catalog-tests

This ensures dependencies are clean before running tests.

* Fix remaining Spark operations test issues

Address final code review comments:

Setup & Initialization:
- Add waitForSparkReady() helper function that polls Spark readiness
  with backoff instead of hardcoded 10-second sleep
- Extract setupSparkTestEnv() helper to reduce boilerplate duplication
  between TestSparkCatalogBasicOperations and TestSparkTimeTravel
- Both tests now use helpers for consistent, reliable setup

Assertions & Validation:
- Make setup-critical operations (namespace, table creation, initial
  insert) use t.Fatalf instead of t.Errorf to fail fast
- Validate setupSQL output in TestSparkTimeTravel and fail if not
  'Setup complete'
- Add validation after second INSERT in TestSparkTimeTravel:
  verify row count increased to 2 before time travel test
- Add context to error messages with namespace and tableName params

Code Quality:
- Remove code duplication between test functions
- All critical paths now properly validated
- Consistent error handling throughout

* Fix go vet errors in S3 Tables tests

Fixes:
1. setup_test.go (Spark):
   - Add missing import: github.com/testcontainers/testcontainers-go/wait
   - Use wait.ForLog instead of undefined testcontainers.NewLogStrategy
   - Remove unused strings import

2. trino_catalog_test.go:
   - Use net.JoinHostPort instead of fmt.Sprintf for address formatting
   - Properly handles IPv6 addresses by wrapping them in brackets

* Use weed mini for simpler SeaweedFS startup

Replace complex multi-process startup (master, volume, filer, s3)
with single 'weed mini' command that starts all services together.

Benefits:
- Simpler, more reliable startup
- Single weed mini process vs 4 separate processes
- Automatic coordination between components
- Better port management with no manual coordination

Changes:
- Remove separate master, volume, filer process startup
- Use weed mini with -master.port, -filer.port, -s3.port flags
- Keep Iceberg REST as separate service (still needed)
- Increase timeout to 15s for port readiness (weed mini startup)
- Remove volumePort and filerProcess fields from TestEnvironment
- Simplify cleanup to only handle two processes (mini, iceberg rest)

* Clean up dead code and temp directory leaks

Fixes:

1. Remove dead s3Process field and cleanup:
   - weed mini bundles S3 gateway, no separate process needed
   - Removed s3Process field from TestEnvironment
   - Removed unnecessary s3Process cleanup code

2. Fix temp config directory leak:
   - Add sparkConfigDir field to TestEnvironment
   - Store returned configDir in writeSparkConfig
   - Clean up sparkConfigDir in Cleanup() with os.RemoveAll
   - Prevents accumulation of temp directories in test runs

3. Simplify Cleanup:
   - Now handles only necessary processes (weed mini, iceberg rest)
   - Removes both seaweedfsDataDir and sparkConfigDir
   - Cleaner shutdown sequence

* Use weed mini's built-in Iceberg REST and fix python binary

Changes:
- Add -s3.port.iceberg flag to weed mini for built-in Iceberg REST Catalog
- Remove separate 'weed server' process for Iceberg REST
- Remove icebergRestProcess field from TestEnvironment
- Simplify Cleanup() to only manage weed mini + Spark
- Add port readiness check for iceberg REST from weed mini
- Set Spark container Cmd to '/bin/sh -c sleep 3600' to keep it running
- Change python to python3 in container.Exec calls

This simplifies to truly one all-in-one weed mini process (master, filer, s3,
iceberg-rest) plus just the Spark container.

* go fmt

* clean up

* bind on a non-loopback IP for container access, aligned Iceberg metadata saves/locations with table locations, and reworked Spark time travel to use TIMESTAMP AS OF   with safe timestamp extraction.

* shared mini start

* Fixed internal directory creation under /buckets so .objects paths can auto-create without failing bucket-name validation, which restores table bucket object writes

* fix path

  Updated table bucket objects to write under `/buckets/<bucket>` and saved Iceberg metadata there, adjusting Spark time-travel timestamp to committed_at +1s. Rebuilt the weed binary (`go
  install ./weed`) and confirmed passing tests for Spark and Trino with focused test commands.

* Updated table bucket creation to stop creating /buckets/.objects and switched Trino REST warehouse to s3://<bucket> to match Iceberg layout.

* Stabilize S3Tables integration tests

* Fix timestamp extraction and remove dead code in bucketDir

* Use table bucket as warehouse in s3tables tests

* Update trino_blog_operations_test.go

* adds the CASCADE option to handle any remaining table metadata/files in the schema directory

* skip namespace not empty
2026-02-08 10:03:53 -08:00

1168 lines
39 KiB
Go

package s3tables
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
// handleCreateTable creates a new table in a namespace
func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
var req CreateTableRequest
if err := h.readRequestBody(r, &req); err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
if req.TableBucketARN == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
return fmt.Errorf("tableBucketARN is required")
}
namespaceName, err := validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
if req.Name == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "name is required")
return fmt.Errorf("name is required")
}
if req.Format == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "format is required")
return fmt.Errorf("format is required")
}
// Validate format
if req.Format != "ICEBERG" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "only ICEBERG format is supported")
return fmt.Errorf("invalid format")
}
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
// Validate table name
tableName, err := validateTableName(req.Name)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
// Check if namespace exists
namespacePath := GetNamespacePath(bucketName, namespaceName)
var namespaceMetadata namespaceMetadata
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
data, err := h.getExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyMetadata)
if err != nil {
return err
}
if err := json.Unmarshal(data, &namespaceMetadata); err != nil {
return fmt.Errorf("failed to unmarshal namespace metadata: %w", err)
}
return nil
})
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", namespaceName))
} else {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to check namespace: %v", err))
}
return err
}
// Authorize table creation using policy framework (namespace + bucket policies)
accountID := h.getAccountID(r)
bucketPath := GetTableBucketPath(bucketName)
namespacePolicy := ""
bucketPolicy := ""
bucketTags := map[string]string{}
var data []byte
var bucketMetadata tableBucketMetadata
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// Fetch bucket metadata to use correct owner for bucket policy evaluation
data, err = h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata)
if err == nil {
if err := json.Unmarshal(data, &bucketMetadata); err != nil {
return fmt.Errorf("failed to unmarshal bucket metadata: %w", err)
}
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch bucket metadata: %v", err)
}
// Fetch namespace policy if it exists
policyData, err := h.getExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyPolicy)
if err == nil {
namespacePolicy = string(policyData)
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch namespace policy: %v", err)
}
// Fetch bucket policy if it exists
policyData, err = h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyPolicy)
if err == nil {
bucketPolicy = string(policyData)
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch bucket policy: %v", err)
}
if tags, err := h.readTags(r.Context(), client, bucketPath); err != nil {
return err
} else if tags != nil {
bucketTags = tags
}
return nil
})
if err != nil {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to fetch policies: %v", err))
return err
}
bucketARN := h.generateTableBucketARN(bucketMetadata.OwnerAccountID, bucketName)
identityActions := getIdentityActions(r)
nsAllowed := CheckPermissionWithContext("CreateTable", accountID, namespaceMetadata.OwnerAccountID, namespacePolicy, bucketARN, &PolicyContext{
TableBucketName: bucketName,
Namespace: namespaceName,
TableName: tableName,
RequestTags: req.Tags,
TagKeys: mapKeys(req.Tags),
TableBucketTags: bucketTags,
IdentityActions: identityActions,
})
bucketAllowed := CheckPermissionWithContext("CreateTable", accountID, bucketMetadata.OwnerAccountID, bucketPolicy, bucketARN, &PolicyContext{
TableBucketName: bucketName,
Namespace: namespaceName,
TableName: tableName,
RequestTags: req.Tags,
TagKeys: mapKeys(req.Tags),
TableBucketTags: bucketTags,
IdentityActions: identityActions,
})
if !nsAllowed && !bucketAllowed {
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create table in this namespace")
return ErrAccessDenied
}
tablePath := GetTablePath(bucketName, namespaceName, tableName)
// Check if table already exists
var existingMetadata tableMetadataInternal
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
data, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata)
if err != nil {
return err
}
if unmarshalErr := json.Unmarshal(data, &existingMetadata); unmarshalErr != nil {
return fmt.Errorf("failed to parse existing table metadata: %w", unmarshalErr)
}
return nil
})
if err == nil {
tableARN := h.generateTableARN(existingMetadata.OwnerAccountID, bucketName, namespaceName+"/"+tableName)
h.writeJSON(w, http.StatusOK, &CreateTableResponse{
TableARN: tableARN,
VersionToken: existingMetadata.VersionToken,
MetadataLocation: existingMetadata.MetadataLocation,
})
return nil
} else if !errors.Is(err, filer_pb.ErrNotFound) && !errors.Is(err, ErrAttributeNotFound) {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to check table: %v", err))
return err
}
// Create the table
now := time.Now()
versionToken := generateVersionToken()
metadata := &tableMetadataInternal{
Name: tableName,
Namespace: namespaceName,
Format: req.Format,
CreatedAt: now,
ModifiedAt: now,
OwnerAccountID: namespaceMetadata.OwnerAccountID, // Inherit namespace owner for consistency
VersionToken: versionToken,
MetadataVersion: max(req.MetadataVersion, 1),
MetadataLocation: req.MetadataLocation,
Metadata: req.Metadata,
}
metadataBytes, err := json.Marshal(metadata)
if err != nil {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to marshal table metadata")
return fmt.Errorf("failed to marshal metadata: %w", err)
}
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// Ensure table directory exists (may already be created by object storage clients)
if err := h.ensureDirectory(r.Context(), client, tablePath); err != nil {
return err
}
// Create data subdirectory for Iceberg files
dataPath := tablePath + "/data"
if err := h.ensureDirectory(r.Context(), client, dataPath); err != nil {
return err
}
// Set metadata as extended attribute
if err := h.setExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata, metadataBytes); err != nil {
return err
}
// Set tags if provided
if len(req.Tags) > 0 {
tagsBytes, err := json.Marshal(req.Tags)
if err != nil {
return fmt.Errorf("failed to marshal tags: %w", err)
}
if err := h.setExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyTags, tagsBytes); err != nil {
return err
}
}
if err := h.updateTableLocationMapping(r.Context(), client, "", req.MetadataLocation, tablePath); err != nil {
glog.V(1).Infof("failed to update table location mapping for %s: %v", req.MetadataLocation, err)
}
return nil
})
if err != nil {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to create table")
return err
}
tableARN := h.generateTableARN(metadata.OwnerAccountID, bucketName, namespaceName+"/"+tableName)
resp := &CreateTableResponse{
TableARN: tableARN,
VersionToken: versionToken,
}
h.writeJSON(w, http.StatusOK, resp)
return nil
}
// handleGetTable gets details of a table
func (h *S3TablesHandler) handleGetTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
var req GetTableRequest
if err := h.readRequestBody(r, &req); err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
var bucketName, namespace, tableName string
var err error
// Support getting by ARN or by bucket/namespace/name
if req.TableARN != "" {
bucketName, namespace, tableName, err = parseTableFromARN(req.TableARN)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
} else if req.TableBucketARN != "" && len(req.Namespace) > 0 && req.Name != "" {
bucketName, err = parseBucketNameFromARN(req.TableBucketARN)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
namespace, err = validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
tableName, err = validateTableName(req.Name)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
} else {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "either tableARN or (tableBucketARN, namespace, name) is required")
return fmt.Errorf("missing required parameters")
}
tablePath := GetTablePath(bucketName, namespace, tableName)
var metadata tableMetadataInternal
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
data, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata)
if err != nil {
return err
}
if err := json.Unmarshal(data, &metadata); err != nil {
return fmt.Errorf("failed to unmarshal table metadata: %w", err)
}
return nil
})
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", tableName))
} else {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to get table: %v", err))
}
return err
}
// Authorize access to the table using policy framework
accountID := h.getAccountID(r)
bucketPath := GetTableBucketPath(bucketName)
tablePolicy := ""
bucketPolicy := ""
bucketTags := map[string]string{}
tableTags := map[string]string{}
var bucketMetadata tableBucketMetadata
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// Fetch bucket metadata to use correct owner for bucket policy evaluation
data, err := h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata)
if err == nil {
if err := json.Unmarshal(data, &bucketMetadata); err != nil {
return fmt.Errorf("failed to unmarshal bucket metadata: %w", err)
}
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch bucket metadata: %v", err)
}
// Fetch table policy if it exists
policyData, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyPolicy)
if err == nil {
tablePolicy = string(policyData)
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch table policy: %v", err)
}
if tags, err := h.readTags(r.Context(), client, tablePath); err != nil {
return err
} else if tags != nil {
tableTags = tags
}
// Fetch bucket policy if it exists
policyData, err = h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyPolicy)
if err == nil {
bucketPolicy = string(policyData)
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch bucket policy: %v", err)
}
if tags, err := h.readTags(r.Context(), client, bucketPath); err != nil {
return err
} else if tags != nil {
bucketTags = tags
}
return nil
})
if err != nil {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to fetch policies: %v", err))
return err
}
tableARN := h.generateTableARN(metadata.OwnerAccountID, bucketName, namespace+"/"+tableName)
bucketARN := h.generateTableBucketARN(bucketMetadata.OwnerAccountID, bucketName)
identityActions := getIdentityActions(r)
tableAllowed := CheckPermissionWithContext("GetTable", accountID, metadata.OwnerAccountID, tablePolicy, tableARN, &PolicyContext{
TableBucketName: bucketName,
Namespace: namespace,
TableName: tableName,
TableBucketTags: bucketTags,
ResourceTags: tableTags,
IdentityActions: identityActions,
})
bucketAllowed := CheckPermissionWithContext("GetTable", accountID, bucketMetadata.OwnerAccountID, bucketPolicy, bucketARN, &PolicyContext{
TableBucketName: bucketName,
Namespace: namespace,
TableName: tableName,
TableBucketTags: bucketTags,
ResourceTags: tableTags,
IdentityActions: identityActions,
})
if !tableAllowed && !bucketAllowed {
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", tableName))
return ErrAccessDenied
}
resp := &GetTableResponse{
Name: metadata.Name,
TableARN: tableARN,
Namespace: []string{metadata.Namespace},
Format: metadata.Format,
CreatedAt: metadata.CreatedAt,
ModifiedAt: metadata.ModifiedAt,
OwnerAccountID: metadata.OwnerAccountID,
MetadataLocation: metadata.MetadataLocation,
MetadataVersion: metadata.MetadataVersion,
VersionToken: metadata.VersionToken,
Metadata: metadata.Metadata,
}
h.writeJSON(w, http.StatusOK, resp)
return nil
}
// handleListTables lists all tables in a namespace or bucket
func (h *S3TablesHandler) handleListTables(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
var req ListTablesRequest
if err := h.readRequestBody(r, &req); err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
if req.TableBucketARN == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
return fmt.Errorf("tableBucketARN is required")
}
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
maxTables := req.MaxTables
if maxTables <= 0 {
maxTables = 100
}
// Cap to prevent uint32 overflow when used in uint32(maxTables*2)
const maxTablesLimit = 1000
if maxTables > maxTablesLimit {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "MaxTables exceeds maximum allowed value")
return fmt.Errorf("invalid maxTables value: %d", maxTables)
}
// Pre-validate namespace before calling WithFilerClient to return 400 on validation errors
var namespaceName string
if len(req.Namespace) > 0 {
var err error
namespaceName, err = validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
}
var tables []TableSummary
var paginationToken string
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
var err error
accountID := h.getAccountID(r)
if len(req.Namespace) > 0 {
// Namespace has already been validated above
namespacePath := GetNamespacePath(bucketName, namespaceName)
bucketPath := GetTableBucketPath(bucketName)
var nsMeta namespaceMetadata
var bucketMeta tableBucketMetadata
var namespacePolicy, bucketPolicy string
bucketTags := map[string]string{}
// Fetch namespace metadata and policy
data, err := h.getExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyMetadata)
if err != nil {
return err // Not Found handled by caller
}
if err := json.Unmarshal(data, &nsMeta); err != nil {
return err
}
// Fetch namespace policy if it exists
policyData, err := h.getExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyPolicy)
if err == nil {
namespacePolicy = string(policyData)
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch namespace policy: %v", err)
}
// Fetch bucket metadata and policy
data, err = h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata)
if err == nil {
if err := json.Unmarshal(data, &bucketMeta); err != nil {
return fmt.Errorf("failed to unmarshal bucket metadata: %w", err)
}
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch bucket metadata: %v", err)
}
policyData, err = h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyPolicy)
if err == nil {
bucketPolicy = string(policyData)
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch bucket policy: %v", err)
}
if tags, err := h.readTags(r.Context(), client, bucketPath); err != nil {
return err
} else if tags != nil {
bucketTags = tags
}
bucketARN := h.generateTableBucketARN(bucketMeta.OwnerAccountID, bucketName)
identityActions := getIdentityActions(r)
nsAllowed := CheckPermissionWithContext("ListTables", accountID, nsMeta.OwnerAccountID, namespacePolicy, bucketARN, &PolicyContext{
TableBucketName: bucketName,
Namespace: namespaceName,
TableBucketTags: bucketTags,
IdentityActions: identityActions,
})
bucketAllowed := CheckPermissionWithContext("ListTables", accountID, bucketMeta.OwnerAccountID, bucketPolicy, bucketARN, &PolicyContext{
TableBucketName: bucketName,
Namespace: namespaceName,
TableBucketTags: bucketTags,
IdentityActions: identityActions,
})
if !nsAllowed && !bucketAllowed {
return ErrAccessDenied
}
tables, paginationToken, err = h.listTablesInNamespaceWithClient(r, client, bucketName, namespaceName, req.Prefix, req.ContinuationToken, maxTables)
} else {
// List tables across all namespaces in bucket
bucketPath := GetTableBucketPath(bucketName)
var bucketMeta tableBucketMetadata
var bucketPolicy string
bucketTags := map[string]string{}
// Fetch bucket metadata and policy
data, err := h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata)
if err != nil {
return err
}
if err := json.Unmarshal(data, &bucketMeta); err != nil {
return err
}
// Fetch bucket policy if it exists
policyData, err := h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyPolicy)
if err == nil {
bucketPolicy = string(policyData)
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch bucket policy: %v", err)
}
if tags, err := h.readTags(r.Context(), client, bucketPath); err != nil {
return err
} else if tags != nil {
bucketTags = tags
}
bucketARN := h.generateTableBucketARN(bucketMeta.OwnerAccountID, bucketName)
identityActions := getIdentityActions(r)
if !CheckPermissionWithContext("ListTables", accountID, bucketMeta.OwnerAccountID, bucketPolicy, bucketARN, &PolicyContext{
TableBucketName: bucketName,
TableBucketTags: bucketTags,
IdentityActions: identityActions,
}) {
return ErrAccessDenied
}
tables, paginationToken, err = h.listTablesInAllNamespaces(r, client, bucketName, req.Prefix, req.ContinuationToken, maxTables)
}
return err
})
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
// If the bucket or namespace directory is not found, return an empty result
tables = []TableSummary{}
paginationToken = ""
} else if isAuthError(err) {
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "Access Denied")
return err
} else {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to list tables: %v", err))
return err
}
}
resp := &ListTablesResponse{
Tables: tables,
ContinuationToken: paginationToken,
}
h.writeJSON(w, http.StatusOK, resp)
return nil
}
// listTablesInNamespaceWithClient lists tables in a specific namespace
func (h *S3TablesHandler) listTablesInNamespaceWithClient(r *http.Request, client filer_pb.SeaweedFilerClient, bucketName, namespaceName, prefix, continuationToken string, maxTables int) ([]TableSummary, string, error) {
namespacePath := GetNamespacePath(bucketName, namespaceName)
return h.listTablesWithClient(r, client, namespacePath, bucketName, namespaceName, prefix, continuationToken, maxTables)
}
func (h *S3TablesHandler) listTablesWithClient(r *http.Request, client filer_pb.SeaweedFilerClient, dirPath, bucketName, namespaceName, prefix, continuationToken string, maxTables int) ([]TableSummary, string, error) {
var tables []TableSummary
lastFileName := continuationToken
ctx := r.Context()
for len(tables) < maxTables {
resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
Directory: dirPath,
Limit: uint32(maxTables * 2),
StartFromFileName: lastFileName,
InclusiveStartFrom: lastFileName == "" || lastFileName == continuationToken,
})
if err != nil {
return nil, "", err
}
hasMore := false
for {
entry, respErr := resp.Recv()
if respErr != nil {
if respErr == io.EOF {
break
}
return nil, "", respErr
}
if entry.Entry == nil {
continue
}
// Skip the start item if it was included in the previous page
if len(tables) == 0 && continuationToken != "" && entry.Entry.Name == continuationToken {
continue
}
hasMore = true
lastFileName = entry.Entry.Name
if !entry.Entry.IsDirectory {
continue
}
// Skip hidden entries
if strings.HasPrefix(entry.Entry.Name, ".") {
continue
}
// Apply prefix filter
if prefix != "" && !strings.HasPrefix(entry.Entry.Name, prefix) {
continue
}
// Read table metadata from extended attribute
data, ok := entry.Entry.Extended[ExtendedKeyMetadata]
if !ok {
continue
}
var metadata tableMetadataInternal
if err := json.Unmarshal(data, &metadata); err != nil {
continue
}
// Note: Authorization (ownership or policy-based access) is checked at the handler level
// before calling this function. This filter is removed to allow policy-based sharing.
// The caller has already been verified to have ListTables permission for this namespace/bucket.
tableARN := h.generateTableARN(metadata.OwnerAccountID, bucketName, namespaceName+"/"+entry.Entry.Name)
tables = append(tables, TableSummary{
Name: entry.Entry.Name,
TableARN: tableARN,
Namespace: []string{namespaceName},
CreatedAt: metadata.CreatedAt,
ModifiedAt: metadata.ModifiedAt,
})
if len(tables) >= maxTables {
return tables, lastFileName, nil
}
}
if !hasMore {
break
}
}
if len(tables) < maxTables {
lastFileName = ""
}
return tables, lastFileName, nil
}
func (h *S3TablesHandler) listTablesInAllNamespaces(r *http.Request, client filer_pb.SeaweedFilerClient, bucketName, prefix, continuationToken string, maxTables int) ([]TableSummary, string, error) {
bucketPath := GetTableBucketPath(bucketName)
ctx := r.Context()
var continuationNamespace string
var startTableName string
if continuationToken != "" {
if parts := strings.SplitN(continuationToken, "/", 2); len(parts) == 2 {
continuationNamespace = parts[0]
startTableName = parts[1]
} else {
continuationNamespace = continuationToken
}
}
var tables []TableSummary
lastNamespace := continuationNamespace
for {
// List namespaces in batches
resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
Directory: bucketPath,
Limit: 100,
StartFromFileName: lastNamespace,
InclusiveStartFrom: (lastNamespace == continuationNamespace && startTableName != "") || (lastNamespace == "" && continuationNamespace == ""),
})
if err != nil {
return nil, "", err
}
hasMore := false
for {
entry, respErr := resp.Recv()
if respErr != nil {
if respErr == io.EOF {
break
}
return nil, "", respErr
}
if entry.Entry == nil {
continue
}
hasMore = true
lastNamespace = entry.Entry.Name
if !entry.Entry.IsDirectory || strings.HasPrefix(entry.Entry.Name, ".") {
continue
}
namespace := entry.Entry.Name
tableNameFilter := ""
if namespace == continuationNamespace {
tableNameFilter = startTableName
}
nsTables, nsToken, err := h.listTablesInNamespaceWithClient(r, client, bucketName, namespace, prefix, tableNameFilter, maxTables-len(tables))
if err != nil {
glog.Warningf("S3Tables: failed to list tables in namespace %s/%s: %v", bucketName, namespace, err)
continue
}
tables = append(tables, nsTables...)
if namespace == continuationNamespace {
startTableName = ""
}
if len(tables) >= maxTables {
paginationToken := namespace + "/" + nsToken
if nsToken == "" {
// If we hit the limit exactly at the end of a namespace, the next token should be the next namespace
paginationToken = namespace // This will start from the NEXT namespace in the outer loop
}
return tables, paginationToken, nil
}
}
if !hasMore {
break
}
}
return tables, "", nil
}
// handleDeleteTable deletes a table from a namespace
func (h *S3TablesHandler) handleDeleteTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
var req DeleteTableRequest
if err := h.readRequestBody(r, &req); err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
if req.TableBucketARN == "" || len(req.Namespace) == 0 || req.Name == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required")
return fmt.Errorf("missing required parameters")
}
namespaceName, err := validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
tableName, err := validateTableName(req.Name)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
tablePath := GetTablePath(bucketName, namespaceName, tableName)
// Check if table exists and enforce VersionToken if provided
var metadata tableMetadataInternal
var tablePolicy string
var bucketPolicy string
var bucketTags map[string]string
var tableTags map[string]string
var bucketMetadata tableBucketMetadata
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
data, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata)
if err != nil {
return err
}
if err := json.Unmarshal(data, &metadata); err != nil {
return fmt.Errorf("failed to unmarshal table metadata: %w", err)
}
if req.VersionToken != "" {
if metadata.VersionToken != req.VersionToken {
return ErrVersionTokenMismatch
}
}
// Fetch table policy if it exists
policyData, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyPolicy)
if err != nil {
if errors.Is(err, ErrAttributeNotFound) {
// No table policy set; proceed with empty policy
} else {
return fmt.Errorf("failed to fetch table policy: %w", err)
}
} else {
tablePolicy = string(policyData)
}
tableTags, err = h.readTags(r.Context(), client, tablePath)
if err != nil {
return err
}
bucketPath := GetTableBucketPath(bucketName)
data, err = h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata)
if err == nil {
if err := json.Unmarshal(data, &bucketMetadata); err != nil {
return fmt.Errorf("failed to unmarshal bucket metadata: %w", err)
}
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch bucket metadata: %w", err)
}
policyData, err = h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyPolicy)
if err != nil {
if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch bucket policy: %w", err)
}
} else {
bucketPolicy = string(policyData)
}
bucketTags, err = h.readTags(r.Context(), client, bucketPath)
if err != nil {
return err
}
return nil
})
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", tableName))
} else if errors.Is(err, ErrVersionTokenMismatch) {
h.writeError(w, http.StatusConflict, ErrCodeConflict, "version token mismatch")
} else {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to check table: %v", err))
}
return err
}
tableARN := h.generateTableARN(metadata.OwnerAccountID, bucketName, namespaceName+"/"+tableName)
bucketARN := h.generateTableBucketARN(bucketMetadata.OwnerAccountID, bucketName)
principal := h.getAccountID(r)
identityActions := getIdentityActions(r)
tableAllowed := CheckPermissionWithContext("DeleteTable", principal, metadata.OwnerAccountID, tablePolicy, tableARN, &PolicyContext{
TableBucketName: bucketName,
Namespace: namespaceName,
TableName: tableName,
TableBucketTags: bucketTags,
ResourceTags: tableTags,
IdentityActions: identityActions,
})
bucketAllowed := CheckPermissionWithContext("DeleteTable", principal, bucketMetadata.OwnerAccountID, bucketPolicy, bucketARN, &PolicyContext{
TableBucketName: bucketName,
Namespace: namespaceName,
TableName: tableName,
TableBucketTags: bucketTags,
ResourceTags: tableTags,
IdentityActions: identityActions,
})
if !tableAllowed && !bucketAllowed {
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to delete table")
return NewAuthError("DeleteTable", principal, "not authorized to delete table")
}
// Delete the table
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
if err := h.deleteDirectory(r.Context(), client, tablePath); err != nil {
return err
}
if err := h.deleteTableLocationMapping(r.Context(), client, metadata.MetadataLocation); err != nil {
glog.V(1).Infof("failed to delete table location mapping for %s: %v", metadata.MetadataLocation, err)
}
return nil
})
if err != nil {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete table")
return err
}
h.writeJSON(w, http.StatusOK, nil)
return nil
}
// handleUpdateTable updates table metadata
func (h *S3TablesHandler) handleUpdateTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
var req UpdateTableRequest
if err := h.readRequestBody(r, &req); err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
if req.TableBucketARN == "" || len(req.Namespace) == 0 || req.Name == "" {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required")
return fmt.Errorf("missing required parameters")
}
namespaceName, err := validateNamespace(req.Namespace)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
tableName, err := validateTableName(req.Name)
if err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
return err
}
tablePath := GetTablePath(bucketName, namespaceName, tableName)
// Load existing metadata and policies for authorization
var metadata tableMetadataInternal
var tablePolicy string
var bucketPolicy string
var bucketTags map[string]string
var tableTags map[string]string
var bucketMetadata tableBucketMetadata
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// 1. Get Table Metadata
data, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata)
if err != nil {
return err
}
if err := json.Unmarshal(data, &metadata); err != nil {
return fmt.Errorf("failed to unmarshal table metadata: %w", err)
}
// 2. Get Table Policy & Tags
policyData, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyPolicy)
if err == nil {
tablePolicy = string(policyData)
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch table policy: %w", err)
}
tableTags, err = h.readTags(r.Context(), client, tablePath)
if err != nil {
return err
}
// 3. Get Bucket Metadata, Policy & Tags
bucketPath := GetTableBucketPath(bucketName)
data, err = h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata)
if err == nil {
if err := json.Unmarshal(data, &bucketMetadata); err != nil {
return fmt.Errorf("failed to unmarshal bucket metadata: %w", err)
}
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch bucket metadata: %w", err)
}
policyData, err = h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyPolicy)
if err == nil {
bucketPolicy = string(policyData)
} else if !errors.Is(err, ErrAttributeNotFound) {
return fmt.Errorf("failed to fetch bucket policy: %w", err)
}
bucketTags, err = h.readTags(r.Context(), client, bucketPath)
if err != nil {
return err
}
return nil
})
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, "table not found")
} else {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, err.Error())
}
return err
}
// Authorization Check
tableARN := h.generateTableARN(metadata.OwnerAccountID, bucketName, namespaceName+"/"+tableName)
bucketARN := h.generateTableBucketARN(bucketMetadata.OwnerAccountID, bucketName)
principal := h.getAccountID(r)
identityActions := getIdentityActions(r)
tableAllowed := CheckPermissionWithContext("UpdateTable", principal, metadata.OwnerAccountID, tablePolicy, tableARN, &PolicyContext{
TableBucketName: bucketName,
Namespace: namespaceName,
TableName: tableName,
TableBucketTags: bucketTags,
ResourceTags: tableTags,
IdentityActions: identityActions,
})
bucketAllowed := CheckPermissionWithContext("UpdateTable", principal, bucketMetadata.OwnerAccountID, bucketPolicy, bucketARN, &PolicyContext{
TableBucketName: bucketName,
Namespace: namespaceName,
TableName: tableName,
TableBucketTags: bucketTags,
ResourceTags: tableTags,
IdentityActions: identityActions,
})
if !tableAllowed && !bucketAllowed {
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to update table")
return NewAuthError("UpdateTable", principal, "not authorized to update table")
}
// Check version token if provided
if req.VersionToken != "" && req.VersionToken != metadata.VersionToken {
h.writeError(w, http.StatusConflict, ErrCodeConflict, "Version token mismatch")
return ErrVersionTokenMismatch
}
// Capture old metadata location before mutation for stale mapping cleanup
oldMetadataLocation := metadata.MetadataLocation
// Update metadata
if req.Metadata != nil {
if metadata.Metadata == nil {
metadata.Metadata = &TableMetadata{}
}
if req.Metadata.Iceberg != nil {
if metadata.Metadata.Iceberg == nil {
metadata.Metadata.Iceberg = &IcebergMetadata{}
}
if req.Metadata.Iceberg.TableUUID != "" {
metadata.Metadata.Iceberg.TableUUID = req.Metadata.Iceberg.TableUUID
}
}
if len(req.Metadata.FullMetadata) > 0 {
metadata.Metadata.FullMetadata = req.Metadata.FullMetadata
}
}
if req.MetadataLocation != "" {
metadata.MetadataLocation = req.MetadataLocation
}
if req.MetadataVersion > 0 {
metadata.MetadataVersion = req.MetadataVersion
} else if metadata.MetadataVersion == 0 {
metadata.MetadataVersion = 1
}
metadata.ModifiedAt = time.Now()
metadata.VersionToken = generateVersionToken()
metadataBytes, err := json.Marshal(metadata)
if err != nil {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to marshal metadata")
return err
}
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
if err := h.setExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata, metadataBytes); err != nil {
return err
}
if err := h.updateTableLocationMapping(r.Context(), client, oldMetadataLocation, metadata.MetadataLocation, tablePath); err != nil {
glog.V(1).Infof("failed to update table location mapping for %s -> %s: %v", oldMetadataLocation, metadata.MetadataLocation, err)
}
return nil
})
if err != nil {
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to update metadata")
return err
}
h.writeJSON(w, http.StatusOK, &UpdateTableResponse{
TableARN: tableARN,
MetadataLocation: metadata.MetadataLocation,
VersionToken: metadata.VersionToken,
})
return nil
}
func (h *S3TablesHandler) updateTableLocationMapping(ctx context.Context, client filer_pb.SeaweedFilerClient, oldMetadataLocation, newMetadataLocation, tablePath string) error {
newTableLocationBucket, ok := parseTableLocationBucket(newMetadataLocation)
if !ok {
return nil
}
if err := h.ensureDirectory(ctx, client, GetTableLocationMappingDir()); err != nil {
return err
}
// If the metadata location changed, delete the stale mapping for the old bucket
if oldMetadataLocation != "" && oldMetadataLocation != newMetadataLocation {
oldTableLocationBucket, ok := parseTableLocationBucket(oldMetadataLocation)
if ok && oldTableLocationBucket != newTableLocationBucket {
oldMappingPath := GetTableLocationMappingPath(oldTableLocationBucket)
if err := h.deleteEntryIfExists(ctx, client, oldMappingPath); err != nil {
glog.V(1).Infof("failed to delete stale mapping for %s: %v", oldTableLocationBucket, err)
}
}
}
return h.upsertFile(ctx, client, GetTableLocationMappingPath(newTableLocationBucket), []byte(tablePath))
}
func (h *S3TablesHandler) deleteTableLocationMapping(ctx context.Context, client filer_pb.SeaweedFilerClient, metadataLocation string) error {
tableLocationBucket, ok := parseTableLocationBucket(metadataLocation)
if !ok {
return nil
}
return h.deleteEntryIfExists(ctx, client, GetTableLocationMappingPath(tableLocationBucket))
}