s3tables: redesign Iceberg REST Catalog using iceberg-go and automate integration tests (#8197)
* full integration with iceberg-go * Table Commit Operations (handleUpdateTable) * s3tables: fix Iceberg v2 compliance and namespace properties This commit ensures SeaweedFS Iceberg REST Catalog is compliant with Iceberg Format Version 2 by: - Using iceberg-go's table.NewMetadataWithUUID for strict v2 compliance. - Explicitly initializing namespace properties to empty maps. - Removing omitempty from required Iceberg response fields. - Fixing CommitTableRequest unmarshaling using table.Requirements and table.Updates. * s3tables: automate Iceberg integration tests - Added Makefile for local test execution and cluster management. - Added docker-compose for PyIceberg compatibility kit. - Added Go integration test harness for PyIceberg. - Updated GitHub CI to run Iceberg catalog tests automatically. * s3tables: update PyIceberg test suite for compatibility - Updated test_rest_catalog.py to use latest PyIceberg transaction APIs. - Updated Dockerfile to include pyarrow and pandas dependencies. - Improved namespace and table handling in integration tests. * s3tables: address review feedback on Iceberg Catalog - Implemented robust metadata version parsing and incrementing. - Ensured table metadata changes are persisted during commit (handleUpdateTable). - Standardized namespace property initialization for consistency. - Fixed unused variable and incorrect struct field build errors. * s3tables: finalize Iceberg REST Catalog and optimize tests - Implemented robust metadata versioning and persistence. - Standardized namespace property initialization. - Optimized integration tests using pre-built Docker image. - Added strict property persistence validation to test suite. - Fixed build errors from previous partial updates. * Address PR review: fix Table UUID stability, implement S3Tables UpdateTable, and support full metadata persistence individually * fix: Iceberg catalog stable UUIDs, metadata persistence, and file writing - Ensure table UUIDs are stable (do not regenerate on load). - Persist full table metadata (Iceberg JSON) in s3tables extended attributes. - Add `MetadataVersion` to explicitly track version numbers, replacing regex parsing. - Implement `saveMetadataFile` to persist metadata JSON files to the Filer on commit. - Update `CreateTable` and `UpdateTable` handlers to use the new logic. * test: bind weed mini to 0.0.0.0 in integration tests to fix Docker connectivity * Iceberg: fix metadata handling in REST catalog - Add nil guard in createTable - Fix updateTable to correctly load existing metadata from storage - Ensure full metadata persistence on updates - Populate loadTable result with parsed metadata * S3Tables: add auth checks and fix response fields in UpdateTable - Add CheckPermissionWithContext to UpdateTable handler - Include TableARN and MetadataLocation in UpdateTable response - Use ErrCodeConflict (409) for version token mismatches * Tests: improve Iceberg catalog test infrastructure and cleanup - Makefile: use PID file for precise process killing - test_rest_catalog.py: remove unused variables and fix f-strings * Iceberg: fix variable shadowing in UpdateTable - Rename inner loop variable `req` to `requirement` to avoid shadowing outer request variable * S3Tables: simplify MetadataVersion initialization - Use `max(req.MetadataVersion, 1)` instead of anonymous function * Tests: remove unicode characters from S3 tables integration test logs - Remove unicode checkmarks from test output for cleaner logs * Iceberg: improve metadata persistence robustness - Fix MetadataLocation in LoadTableResult to fallback to generated location - Improve saveMetadataFile to ensure directory hierarchy existence and robust error handling
This commit is contained in:
@@ -127,6 +127,8 @@ func (h *S3TablesHandler) HandleRequest(w http.ResponseWriter, r *http.Request,
|
||||
err = h.handleGetTable(w, r, filerClient)
|
||||
case "ListTables":
|
||||
err = h.handleListTables(w, r, filerClient)
|
||||
case "UpdateTable":
|
||||
err = h.handleUpdateTable(w, r, filerClient)
|
||||
case "DeleteTable":
|
||||
err = h.handleDeleteTable(w, r, filerClient)
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ func (h *S3TablesHandler) handleCreateTableBucket(w http.ResponseWriter, r *http
|
||||
return err
|
||||
}
|
||||
|
||||
bucketPath := getTableBucketPath(req.Name)
|
||||
bucketPath := GetTableBucketPath(req.Name)
|
||||
|
||||
// Check if bucket already exists and ensure no conflict with object store buckets
|
||||
tableBucketExists := false
|
||||
|
||||
@@ -32,7 +32,7 @@ func (h *S3TablesHandler) handleGetTableBucket(w http.ResponseWriter, r *http.Re
|
||||
return err
|
||||
}
|
||||
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
|
||||
var metadata tableBucketMetadata
|
||||
var bucketPolicy string
|
||||
@@ -177,7 +177,7 @@ func (h *S3TablesHandler) handleListTableBuckets(w http.ResponseWriter, r *http.
|
||||
continue
|
||||
}
|
||||
|
||||
bucketPath := getTableBucketPath(entry.Entry.Name)
|
||||
bucketPath := GetTableBucketPath(entry.Entry.Name)
|
||||
bucketPolicy := ""
|
||||
policyData, err := h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyPolicy)
|
||||
if err != nil {
|
||||
@@ -261,7 +261,7 @@ func (h *S3TablesHandler) handleDeleteTableBucket(w http.ResponseWriter, r *http
|
||||
return err
|
||||
}
|
||||
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
|
||||
// Check if bucket exists and perform ownership + emptiness check in one block
|
||||
var metadata tableBucketMetadata
|
||||
|
||||
@@ -43,7 +43,7 @@ func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.R
|
||||
}
|
||||
|
||||
// Check if table bucket exists
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
var bucketMetadata tableBucketMetadata
|
||||
var bucketPolicy string
|
||||
var bucketTags map[string]string
|
||||
@@ -93,7 +93,7 @@ func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.R
|
||||
return ErrAccessDenied
|
||||
}
|
||||
|
||||
namespacePath := getNamespacePath(bucketName, namespaceName)
|
||||
namespacePath := GetNamespacePath(bucketName, namespaceName)
|
||||
|
||||
// Check if namespace already exists
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
@@ -177,8 +177,8 @@ func (h *S3TablesHandler) handleGetNamespace(w http.ResponseWriter, r *http.Requ
|
||||
return err
|
||||
}
|
||||
|
||||
namespacePath := getNamespacePath(bucketName, namespaceName)
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
namespacePath := GetNamespacePath(bucketName, namespaceName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
|
||||
// Get namespace and bucket policy
|
||||
var metadata namespaceMetadata
|
||||
@@ -264,7 +264,7 @@ func (h *S3TablesHandler) handleListNamespaces(w http.ResponseWriter, r *http.Re
|
||||
maxNamespaces = 100
|
||||
}
|
||||
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
|
||||
// Check permission (check bucket ownership)
|
||||
var bucketMetadata tableBucketMetadata
|
||||
@@ -446,8 +446,8 @@ func (h *S3TablesHandler) handleDeleteNamespace(w http.ResponseWriter, r *http.R
|
||||
return err
|
||||
}
|
||||
|
||||
namespacePath := getNamespacePath(bucketName, namespaceName)
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
namespacePath := GetNamespacePath(bucketName, namespaceName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
|
||||
// Check if namespace exists and get metadata for permission check
|
||||
var metadata namespaceMetadata
|
||||
|
||||
@@ -66,7 +66,7 @@ func (h *S3TablesHandler) handlePutTableBucketPolicy(w http.ResponseWriter, r *h
|
||||
}
|
||||
|
||||
// Check if bucket exists and get metadata for ownership check
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
var bucketMetadata tableBucketMetadata
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
data, err := h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata)
|
||||
@@ -133,7 +133,7 @@ func (h *S3TablesHandler) handleGetTableBucketPolicy(w http.ResponseWriter, r *h
|
||||
return err
|
||||
}
|
||||
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
var policy []byte
|
||||
var bucketMetadata tableBucketMetadata
|
||||
|
||||
@@ -204,7 +204,7 @@ func (h *S3TablesHandler) handleDeleteTableBucketPolicy(w http.ResponseWriter, r
|
||||
return err
|
||||
}
|
||||
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
|
||||
// Check if bucket exists and get metadata for ownership check
|
||||
var bucketMetadata tableBucketMetadata
|
||||
@@ -301,8 +301,8 @@ func (h *S3TablesHandler) handlePutTablePolicy(w http.ResponseWriter, r *http.Re
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||
return err
|
||||
}
|
||||
tablePath := getTablePath(bucketName, namespaceName, tableName)
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
tablePath := GetTablePath(bucketName, namespaceName, tableName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
|
||||
var metadata tableMetadataInternal
|
||||
var bucketPolicy string
|
||||
@@ -396,8 +396,8 @@ func (h *S3TablesHandler) handleGetTablePolicy(w http.ResponseWriter, r *http.Re
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||
return err
|
||||
}
|
||||
tablePath := getTablePath(bucketName, namespaceName, tableName)
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
tablePath := GetTablePath(bucketName, namespaceName, tableName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
var policy []byte
|
||||
var metadata tableMetadataInternal
|
||||
var bucketPolicy string
|
||||
@@ -497,8 +497,8 @@ func (h *S3TablesHandler) handleDeleteTablePolicy(w http.ResponseWriter, r *http
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||
return err
|
||||
}
|
||||
tablePath := getTablePath(bucketName, namespaceName, tableName)
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
tablePath := GetTablePath(bucketName, namespaceName, tableName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
|
||||
// Check if table exists
|
||||
var metadata tableMetadataInternal
|
||||
@@ -604,7 +604,7 @@ func (h *S3TablesHandler) handleTagResource(w http.ResponseWriter, r *http.Reque
|
||||
|
||||
// Fetch bucket policy if we have a bucket name
|
||||
if bucketName != "" {
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
policyData, err := h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyPolicy)
|
||||
if err != nil {
|
||||
if !errors.Is(err, ErrAttributeNotFound) {
|
||||
@@ -722,7 +722,7 @@ func (h *S3TablesHandler) handleListTagsForResource(w http.ResponseWriter, r *ht
|
||||
|
||||
// Fetch bucket policy if we have a bucket name
|
||||
if bucketName != "" {
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
policyData, err := h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyPolicy)
|
||||
if err != nil {
|
||||
if !errors.Is(err, ErrAttributeNotFound) {
|
||||
@@ -828,7 +828,7 @@ func (h *S3TablesHandler) handleUntagResource(w http.ResponseWriter, r *http.Req
|
||||
|
||||
// Fetch bucket policy if we have a bucket name
|
||||
if bucketName != "" {
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
policyData, err := h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyPolicy)
|
||||
if err != nil {
|
||||
if !errors.Is(err, ErrAttributeNotFound) {
|
||||
@@ -914,13 +914,13 @@ func (h *S3TablesHandler) resolveResourcePath(resourceARN string) (path string,
|
||||
// Try parsing as table ARN first
|
||||
bucketName, namespace, tableName, err := parseTableFromARN(resourceARN)
|
||||
if err == nil {
|
||||
return getTablePath(bucketName, namespace, tableName), ExtendedKeyTags, ResourceTypeTable, nil
|
||||
return GetTablePath(bucketName, namespace, tableName), ExtendedKeyTags, ResourceTypeTable, nil
|
||||
}
|
||||
|
||||
// Try parsing as bucket ARN
|
||||
bucketName, err = parseBucketNameFromARN(resourceARN)
|
||||
if err == nil {
|
||||
return getTableBucketPath(bucketName), ExtendedKeyTags, ResourceTypeBucket, nil
|
||||
return GetTableBucketPath(bucketName), ExtendedKeyTags, ResourceTypeBucket, nil
|
||||
}
|
||||
|
||||
return "", "", "", fmt.Errorf("invalid resource ARN: %s", resourceARN)
|
||||
|
||||
@@ -63,7 +63,7 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque
|
||||
}
|
||||
|
||||
// Check if namespace exists
|
||||
namespacePath := getNamespacePath(bucketName, namespaceName)
|
||||
namespacePath := GetNamespacePath(bucketName, namespaceName)
|
||||
var namespaceMetadata namespaceMetadata
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
data, err := h.getExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyMetadata)
|
||||
@@ -87,7 +87,7 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque
|
||||
|
||||
// Authorize table creation using policy framework (namespace + bucket policies)
|
||||
accountID := h.getAccountID(r)
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
namespacePolicy := ""
|
||||
bucketPolicy := ""
|
||||
bucketTags := map[string]string{}
|
||||
@@ -160,7 +160,7 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque
|
||||
return ErrAccessDenied
|
||||
}
|
||||
|
||||
tablePath := getTablePath(bucketName, namespaceName, tableName)
|
||||
tablePath := GetTablePath(bucketName, namespaceName, tableName)
|
||||
|
||||
// Check if table already exists
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
@@ -171,7 +171,7 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque
|
||||
if err == nil {
|
||||
h.writeError(w, http.StatusConflict, ErrCodeTableAlreadyExists, fmt.Sprintf("table %s already exists", tableName))
|
||||
return fmt.Errorf("table already exists")
|
||||
} else if !errors.Is(err, filer_pb.ErrNotFound) {
|
||||
} else if !errors.Is(err, filer_pb.ErrNotFound) && !errors.Is(err, ErrAttributeNotFound) {
|
||||
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to check table: %v", err))
|
||||
return err
|
||||
}
|
||||
@@ -181,14 +181,16 @@ func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Reque
|
||||
versionToken := generateVersionToken()
|
||||
|
||||
metadata := &tableMetadataInternal{
|
||||
Name: tableName,
|
||||
Namespace: namespaceName,
|
||||
Format: req.Format,
|
||||
CreatedAt: now,
|
||||
ModifiedAt: now,
|
||||
OwnerAccountID: namespaceMetadata.OwnerAccountID, // Inherit namespace owner for consistency
|
||||
VersionToken: versionToken,
|
||||
Metadata: req.Metadata,
|
||||
Name: tableName,
|
||||
Namespace: namespaceName,
|
||||
Format: req.Format,
|
||||
CreatedAt: now,
|
||||
ModifiedAt: now,
|
||||
OwnerAccountID: namespaceMetadata.OwnerAccountID, // Inherit namespace owner for consistency
|
||||
VersionToken: versionToken,
|
||||
MetadataVersion: max(req.MetadataVersion, 1),
|
||||
MetadataLocation: req.MetadataLocation,
|
||||
Metadata: req.Metadata,
|
||||
}
|
||||
|
||||
metadataBytes, err := json.Marshal(metadata)
|
||||
@@ -284,7 +286,7 @@ func (h *S3TablesHandler) handleGetTable(w http.ResponseWriter, r *http.Request,
|
||||
return fmt.Errorf("missing required parameters")
|
||||
}
|
||||
|
||||
tablePath := getTablePath(bucketName, namespace, tableName)
|
||||
tablePath := GetTablePath(bucketName, namespace, tableName)
|
||||
|
||||
var metadata tableMetadataInternal
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
@@ -309,7 +311,7 @@ func (h *S3TablesHandler) handleGetTable(w http.ResponseWriter, r *http.Request,
|
||||
|
||||
// Authorize access to the table using policy framework
|
||||
accountID := h.getAccountID(r)
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
tablePolicy := ""
|
||||
bucketPolicy := ""
|
||||
bucketTags := map[string]string{}
|
||||
@@ -395,6 +397,7 @@ func (h *S3TablesHandler) handleGetTable(w http.ResponseWriter, r *http.Request,
|
||||
ModifiedAt: metadata.ModifiedAt,
|
||||
OwnerAccountID: metadata.OwnerAccountID,
|
||||
MetadataLocation: metadata.MetadataLocation,
|
||||
MetadataVersion: metadata.MetadataVersion,
|
||||
VersionToken: metadata.VersionToken,
|
||||
Metadata: metadata.Metadata,
|
||||
}
|
||||
@@ -454,8 +457,8 @@ func (h *S3TablesHandler) handleListTables(w http.ResponseWriter, r *http.Reques
|
||||
|
||||
if len(req.Namespace) > 0 {
|
||||
// Namespace has already been validated above
|
||||
namespacePath := getNamespacePath(bucketName, namespaceName)
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
namespacePath := GetNamespacePath(bucketName, namespaceName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
var nsMeta namespaceMetadata
|
||||
var bucketMeta tableBucketMetadata
|
||||
var namespacePolicy, bucketPolicy string
|
||||
@@ -521,7 +524,7 @@ func (h *S3TablesHandler) handleListTables(w http.ResponseWriter, r *http.Reques
|
||||
tables, paginationToken, err = h.listTablesInNamespaceWithClient(r, client, bucketName, namespaceName, req.Prefix, req.ContinuationToken, maxTables)
|
||||
} else {
|
||||
// List tables across all namespaces in bucket
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
var bucketMeta tableBucketMetadata
|
||||
var bucketPolicy string
|
||||
bucketTags := map[string]string{}
|
||||
@@ -588,7 +591,7 @@ func (h *S3TablesHandler) handleListTables(w http.ResponseWriter, r *http.Reques
|
||||
|
||||
// listTablesInNamespaceWithClient lists tables in a specific namespace
|
||||
func (h *S3TablesHandler) listTablesInNamespaceWithClient(r *http.Request, client filer_pb.SeaweedFilerClient, bucketName, namespaceName, prefix, continuationToken string, maxTables int) ([]TableSummary, string, error) {
|
||||
namespacePath := getNamespacePath(bucketName, namespaceName)
|
||||
namespacePath := GetNamespacePath(bucketName, namespaceName)
|
||||
return h.listTablesWithClient(r, client, namespacePath, bucketName, namespaceName, prefix, continuationToken, maxTables)
|
||||
}
|
||||
|
||||
@@ -685,7 +688,7 @@ func (h *S3TablesHandler) listTablesWithClient(r *http.Request, client filer_pb.
|
||||
}
|
||||
|
||||
func (h *S3TablesHandler) listTablesInAllNamespaces(r *http.Request, client filer_pb.SeaweedFilerClient, bucketName, prefix, continuationToken string, maxTables int) ([]TableSummary, string, error) {
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
ctx := r.Context()
|
||||
|
||||
var continuationNamespace string
|
||||
@@ -801,7 +804,7 @@ func (h *S3TablesHandler) handleDeleteTable(w http.ResponseWriter, r *http.Reque
|
||||
return err
|
||||
}
|
||||
|
||||
tablePath := getTablePath(bucketName, namespaceName, tableName)
|
||||
tablePath := GetTablePath(bucketName, namespaceName, tableName)
|
||||
|
||||
// Check if table exists and enforce VersionToken if provided
|
||||
var metadata tableMetadataInternal
|
||||
@@ -843,7 +846,7 @@ func (h *S3TablesHandler) handleDeleteTable(w http.ResponseWriter, r *http.Reque
|
||||
return err
|
||||
}
|
||||
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
data, err = h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata)
|
||||
if err == nil {
|
||||
if err := json.Unmarshal(data, &bucketMetadata); err != nil {
|
||||
@@ -917,3 +920,184 @@ func (h *S3TablesHandler) handleDeleteTable(w http.ResponseWriter, r *http.Reque
|
||||
h.writeJSON(w, http.StatusOK, nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleUpdateTable updates table metadata
|
||||
func (h *S3TablesHandler) handleUpdateTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||
var req UpdateTableRequest
|
||||
if err := h.readRequestBody(r, &req); err != nil {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
if req.TableBucketARN == "" || len(req.Namespace) == 0 || req.Name == "" {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required")
|
||||
return fmt.Errorf("missing required parameters")
|
||||
}
|
||||
|
||||
namespaceName, err := validateNamespace(req.Namespace)
|
||||
if err != nil {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||
if err != nil {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
tableName, err := validateTableName(req.Name)
|
||||
if err != nil {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
tablePath := GetTablePath(bucketName, namespaceName, tableName)
|
||||
|
||||
// Load existing metadata and policies for authorization
|
||||
var metadata tableMetadataInternal
|
||||
var tablePolicy string
|
||||
var bucketPolicy string
|
||||
var bucketTags map[string]string
|
||||
var tableTags map[string]string
|
||||
var bucketMetadata tableBucketMetadata
|
||||
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
// 1. Get Table Metadata
|
||||
data, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := json.Unmarshal(data, &metadata); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal table metadata: %w", err)
|
||||
}
|
||||
|
||||
// 2. Get Table Policy & Tags
|
||||
policyData, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyPolicy)
|
||||
if err == nil {
|
||||
tablePolicy = string(policyData)
|
||||
} else if !errors.Is(err, ErrAttributeNotFound) {
|
||||
return fmt.Errorf("failed to fetch table policy: %w", err)
|
||||
}
|
||||
tableTags, err = h.readTags(r.Context(), client, tablePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 3. Get Bucket Metadata, Policy & Tags
|
||||
bucketPath := GetTableBucketPath(bucketName)
|
||||
data, err = h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata)
|
||||
if err == nil {
|
||||
if err := json.Unmarshal(data, &bucketMetadata); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal bucket metadata: %w", err)
|
||||
}
|
||||
} else if !errors.Is(err, ErrAttributeNotFound) {
|
||||
return fmt.Errorf("failed to fetch bucket metadata: %w", err)
|
||||
}
|
||||
policyData, err = h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyPolicy)
|
||||
if err == nil {
|
||||
bucketPolicy = string(policyData)
|
||||
} else if !errors.Is(err, ErrAttributeNotFound) {
|
||||
return fmt.Errorf("failed to fetch bucket policy: %w", err)
|
||||
}
|
||||
bucketTags, err = h.readTags(r.Context(), client, bucketPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, filer_pb.ErrNotFound) {
|
||||
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, "table not found")
|
||||
} else {
|
||||
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, err.Error())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Authorization Check
|
||||
tableARN := h.generateTableARN(metadata.OwnerAccountID, bucketName, namespaceName+"/"+tableName)
|
||||
bucketARN := h.generateTableBucketARN(bucketMetadata.OwnerAccountID, bucketName)
|
||||
principal := h.getAccountID(r)
|
||||
identityActions := getIdentityActions(r)
|
||||
|
||||
tableAllowed := CheckPermissionWithContext("UpdateTable", principal, metadata.OwnerAccountID, tablePolicy, tableARN, &PolicyContext{
|
||||
TableBucketName: bucketName,
|
||||
Namespace: namespaceName,
|
||||
TableName: tableName,
|
||||
TableBucketTags: bucketTags,
|
||||
ResourceTags: tableTags,
|
||||
IdentityActions: identityActions,
|
||||
})
|
||||
bucketAllowed := CheckPermissionWithContext("UpdateTable", principal, bucketMetadata.OwnerAccountID, bucketPolicy, bucketARN, &PolicyContext{
|
||||
TableBucketName: bucketName,
|
||||
Namespace: namespaceName,
|
||||
TableName: tableName,
|
||||
TableBucketTags: bucketTags,
|
||||
ResourceTags: tableTags,
|
||||
IdentityActions: identityActions,
|
||||
})
|
||||
|
||||
if !tableAllowed && !bucketAllowed {
|
||||
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to update table")
|
||||
return NewAuthError("UpdateTable", principal, "not authorized to update table")
|
||||
}
|
||||
|
||||
// Check version token if provided
|
||||
if req.VersionToken != "" && req.VersionToken != metadata.VersionToken {
|
||||
h.writeError(w, http.StatusConflict, ErrCodeConflict, "Version token mismatch")
|
||||
return ErrVersionTokenMismatch
|
||||
}
|
||||
|
||||
// Update metadata
|
||||
if req.Metadata != nil {
|
||||
if metadata.Metadata == nil {
|
||||
metadata.Metadata = &TableMetadata{}
|
||||
}
|
||||
if req.Metadata.Iceberg != nil {
|
||||
if metadata.Metadata.Iceberg == nil {
|
||||
metadata.Metadata.Iceberg = &IcebergMetadata{}
|
||||
}
|
||||
if req.Metadata.Iceberg.TableUUID != "" {
|
||||
metadata.Metadata.Iceberg.TableUUID = req.Metadata.Iceberg.TableUUID
|
||||
}
|
||||
}
|
||||
if len(req.Metadata.FullMetadata) > 0 {
|
||||
metadata.Metadata.FullMetadata = req.Metadata.FullMetadata
|
||||
}
|
||||
}
|
||||
if req.MetadataLocation != "" {
|
||||
metadata.MetadataLocation = req.MetadataLocation
|
||||
}
|
||||
if req.MetadataVersion > 0 {
|
||||
metadata.MetadataVersion = req.MetadataVersion
|
||||
} else if metadata.MetadataVersion == 0 {
|
||||
metadata.MetadataVersion = 1
|
||||
}
|
||||
metadata.ModifiedAt = time.Now()
|
||||
metadata.VersionToken = generateVersionToken()
|
||||
|
||||
metadataBytes, err := json.Marshal(metadata)
|
||||
if err != nil {
|
||||
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to marshal metadata")
|
||||
return err
|
||||
}
|
||||
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
return h.setExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata, metadataBytes)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to update metadata")
|
||||
return err
|
||||
}
|
||||
|
||||
h.writeJSON(w, http.StatusOK, &UpdateTableResponse{
|
||||
TableARN: tableARN,
|
||||
MetadataLocation: metadata.MetadataLocation,
|
||||
VersionToken: metadata.VersionToken,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -358,7 +358,7 @@ func (v *TableBucketFileValidator) ValidateTableBucketUploadWithClient(
|
||||
}
|
||||
|
||||
// Verify the table exists and has ICEBERG format by checking its metadata
|
||||
tablePath := getTablePath(bucket, namespace, table)
|
||||
tablePath := GetTablePath(bucket, namespace, table)
|
||||
dir, name := splitPath(tablePath)
|
||||
|
||||
resp, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package s3tables
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Table bucket types
|
||||
|
||||
@@ -140,7 +143,8 @@ type IcebergMetadata struct {
|
||||
}
|
||||
|
||||
type TableMetadata struct {
|
||||
Iceberg *IcebergMetadata `json:"iceberg,omitempty"`
|
||||
Iceberg *IcebergMetadata `json:"iceberg,omitempty"`
|
||||
FullMetadata json.RawMessage `json:"fullMetadata,omitempty"`
|
||||
}
|
||||
|
||||
type Table struct {
|
||||
@@ -156,12 +160,14 @@ type Table struct {
|
||||
}
|
||||
|
||||
type CreateTableRequest struct {
|
||||
TableBucketARN string `json:"tableBucketARN"`
|
||||
Namespace []string `json:"namespace"`
|
||||
Name string `json:"name"`
|
||||
Format string `json:"format"`
|
||||
Metadata *TableMetadata `json:"metadata,omitempty"`
|
||||
Tags map[string]string `json:"tags,omitempty"`
|
||||
TableBucketARN string `json:"tableBucketARN"`
|
||||
Namespace []string `json:"namespace"`
|
||||
Name string `json:"name"`
|
||||
Format string `json:"format"`
|
||||
Metadata *TableMetadata `json:"metadata,omitempty"`
|
||||
MetadataVersion int `json:"metadataVersion,omitempty"`
|
||||
MetadataLocation string `json:"metadataLocation,omitempty"`
|
||||
Tags map[string]string `json:"tags,omitempty"`
|
||||
}
|
||||
|
||||
type CreateTableResponse struct {
|
||||
@@ -187,6 +193,7 @@ type GetTableResponse struct {
|
||||
OwnerAccountID string `json:"ownerAccountId"`
|
||||
MetadataLocation string `json:"metadataLocation,omitempty"`
|
||||
VersionToken string `json:"versionToken"`
|
||||
MetadataVersion int `json:"metadataVersion"`
|
||||
Metadata *TableMetadata `json:"metadata,omitempty"`
|
||||
}
|
||||
|
||||
@@ -219,6 +226,22 @@ type DeleteTableRequest struct {
|
||||
VersionToken string `json:"versionToken,omitempty"`
|
||||
}
|
||||
|
||||
type UpdateTableRequest struct {
|
||||
TableBucketARN string `json:"tableBucketARN"`
|
||||
Namespace []string `json:"namespace"`
|
||||
Name string `json:"name"`
|
||||
VersionToken string `json:"versionToken,omitempty"`
|
||||
Metadata *TableMetadata `json:"metadata,omitempty"`
|
||||
MetadataVersion int `json:"metadataVersion,omitempty"`
|
||||
MetadataLocation string `json:"metadataLocation,omitempty"`
|
||||
}
|
||||
|
||||
type UpdateTableResponse struct {
|
||||
TableARN string `json:"tableARN"`
|
||||
VersionToken string `json:"versionToken"`
|
||||
MetadataLocation string `json:"metadataLocation,omitempty"`
|
||||
}
|
||||
|
||||
// Table policy types
|
||||
|
||||
type PutTablePolicyRequest struct {
|
||||
|
||||
@@ -79,18 +79,18 @@ func parseTableFromARN(arn string) (bucketName, namespace, tableName string, err
|
||||
|
||||
// Path helpers
|
||||
|
||||
// getTableBucketPath returns the filer path for a table bucket
|
||||
func getTableBucketPath(bucketName string) string {
|
||||
// GetTableBucketPath returns the filer path for a table bucket
|
||||
func GetTableBucketPath(bucketName string) string {
|
||||
return path.Join(TablesPath, bucketName)
|
||||
}
|
||||
|
||||
// getNamespacePath returns the filer path for a namespace
|
||||
func getNamespacePath(bucketName, namespace string) string {
|
||||
// GetNamespacePath returns the filer path for a namespace
|
||||
func GetNamespacePath(bucketName, namespace string) string {
|
||||
return path.Join(TablesPath, bucketName, namespace)
|
||||
}
|
||||
|
||||
// getTablePath returns the filer path for a table
|
||||
func getTablePath(bucketName, namespace, tableName string) string {
|
||||
// GetTablePath returns the filer path for a table
|
||||
func GetTablePath(bucketName, namespace, tableName string) string {
|
||||
return path.Join(TablesPath, bucketName, namespace, tableName)
|
||||
}
|
||||
|
||||
@@ -118,6 +118,7 @@ type tableMetadataInternal struct {
|
||||
ModifiedAt time.Time `json:"modifiedAt"`
|
||||
OwnerAccountID string `json:"ownerAccountId"`
|
||||
VersionToken string `json:"versionToken"`
|
||||
MetadataVersion int `json:"metadataVersion"`
|
||||
MetadataLocation string `json:"metadataLocation,omitempty"`
|
||||
Metadata *TableMetadata `json:"metadata,omitempty"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user