Files
seaweedFS/weed/admin/dash/s3tables_management.go
Chris Lu 2bb21ea276 feat: Add Iceberg REST Catalog server and admin UI (#8175)
* feat: Add Iceberg REST Catalog server

Implement Iceberg REST Catalog API on a separate port (default 8181)
that exposes S3 Tables metadata through the Apache Iceberg REST protocol.

- Add new weed/s3api/iceberg package with REST handlers
- Implement /v1/config endpoint returning catalog configuration
- Implement namespace endpoints (list/create/get/head/delete)
- Implement table endpoints (list/create/load/head/delete/update)
- Add -port.iceberg flag to S3 standalone server (s3.go)
- Add -s3.port.iceberg flag to combined server mode (server.go)
- Add -s3.port.iceberg flag to mini cluster mode (mini.go)
- Support prefix-based routing for multiple catalogs

The Iceberg REST server reuses S3 Tables metadata storage under
/table-buckets and enables DuckDB, Spark, and other Iceberg clients
to connect to SeaweedFS as a catalog.

* feat: Add Iceberg Catalog pages to admin UI

Add admin UI pages to browse Iceberg catalogs, namespaces, and tables.

- Add Iceberg Catalog menu item under Object Store navigation
- Create iceberg_catalog.templ showing catalog overview with REST info
- Create iceberg_namespaces.templ listing namespaces in a catalog
- Create iceberg_tables.templ listing tables in a namespace
- Add handlers and routes in admin_handlers.go
- Add Iceberg data provider methods in s3tables_management.go
- Add Iceberg data types in types.go

The Iceberg Catalog pages provide visibility into the same S3 Tables
data through an Iceberg-centric lens, including REST endpoint examples
for DuckDB and PyIceberg.

* test: Add Iceberg catalog integration tests and reorg s3tables tests

- Reorganize existing s3tables tests to test/s3tables/table-buckets/
- Add new test/s3tables/catalog/ for Iceberg REST catalog tests
- Add TestIcebergConfig to verify /v1/config endpoint
- Add TestIcebergNamespaces to verify namespace listing
- Add TestDuckDBIntegration for DuckDB connectivity (requires Docker)
- Update CI workflow to use new test paths

* fix: Generate proper random UUIDs for Iceberg tables

Address code review feedback:
- Replace placeholder UUID with crypto/rand-based UUID v4 generation
- Add detailed TODO comments for handleUpdateTable stub explaining
  the required atomic metadata swap implementation

* fix: Serve Iceberg on localhost listener when binding to different interface

Address code review feedback: properly serve the localhost listener
when the Iceberg server is bound to a non-localhost interface.

* ci: Add Iceberg catalog integration tests to CI

Add new job to run Iceberg catalog tests in CI, along with:
- Iceberg package build verification
- Iceberg unit tests
- Iceberg go vet checks
- Iceberg format checks

* fix: Address code review feedback for Iceberg implementation

- fix: Replace hardcoded account ID with s3_constants.AccountAdminId in buildTableBucketARN()
- fix: Improve UUID generation error handling with deterministic fallback (timestamp + PID + counter)
- fix: Update handleUpdateTable to return HTTP 501 Not Implemented instead of fake success
- fix: Better error handling in handleNamespaceExists to distinguish 404 from 500 errors
- fix: Use relative URL in template instead of hardcoded localhost:8181
- fix: Add HTTP timeout to test's waitForService function to avoid hangs
- fix: Use dynamic ephemeral ports in integration tests to avoid flaky parallel failures
- fix: Add Iceberg port to final port configuration logging in mini.go

* fix: Address critical issues in Iceberg implementation

- fix: Cache table UUIDs to ensure persistence across LoadTable calls
  The UUID now remains stable for the lifetime of the server session.
  TODO: For production, UUIDs should be persisted in S3 Tables metadata.

- fix: Remove redundant URL-encoded namespace parsing
  mux router already decodes %1F to \x1F before passing to handlers.
  Redundant ReplaceAll call could cause bugs with literal %1F in namespace.

* fix: Improve test robustness and reduce code duplication

- fix: Make DuckDB test more robust by failing on unexpected errors
  Instead of silently logging errors, now explicitly check for expected
  conditions (extension not available) and skip the test appropriately.

- fix: Extract username helper method to reduce duplication
  Created getUsername() helper in AdminHandlers to avoid duplicating
  the username retrieval logic across Iceberg page handlers.

* fix: Add mutex protection to table UUID cache

Protects concurrent access to the tableUUIDs map with sync.RWMutex.
Uses read-lock for fast path when UUID already cached, and write-lock
for generating new UUIDs. Includes double-check pattern to handle race
condition between read-unlock and write-lock.

* style: fix go fmt errors

* feat(iceberg): persist table UUID in S3 Tables metadata

* feat(admin): configure Iceberg port in Admin UI and commands

* refactor: address review comments (flags, tests, handlers)

- command/mini: fix tracking of explicit s3.port.iceberg flag
- command/admin: add explicit -iceberg.port flag
- admin/handlers: reuse getUsername helper
- tests: use 127.0.0.1 for ephemeral ports and os.Stat for file size check

* test: check error from FileStat in verify_gc_empty_test
2026-02-02 23:12:13 -08:00

685 lines
22 KiB
Go

package dash
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
)
// S3Tables data structures for admin UI
type S3TablesBucketsData struct {
Username string `json:"username"`
Buckets []S3TablesBucketSummary `json:"buckets"`
TotalBuckets int `json:"total_buckets"`
LastUpdated time.Time `json:"last_updated"`
}
type S3TablesBucketSummary struct {
ARN string `json:"arn"`
Name string `json:"name"`
OwnerAccountID string `json:"ownerAccountId"`
CreatedAt time.Time `json:"createdAt"`
}
type S3TablesNamespacesData struct {
Username string `json:"username"`
BucketARN string `json:"bucket_arn"`
Namespaces []s3tables.NamespaceSummary `json:"namespaces"`
TotalNamespaces int `json:"total_namespaces"`
LastUpdated time.Time `json:"last_updated"`
}
type S3TablesTablesData struct {
Username string `json:"username"`
BucketARN string `json:"bucket_arn"`
Namespace string `json:"namespace"`
Tables []s3tables.TableSummary `json:"tables"`
TotalTables int `json:"total_tables"`
LastUpdated time.Time `json:"last_updated"`
}
type tableBucketMetadata struct {
Name string `json:"name"`
CreatedAt time.Time `json:"createdAt"`
OwnerAccountID string `json:"ownerAccountId"`
}
// S3Tables manager helpers
const s3TablesAdminListLimit = 1000
func newS3TablesManager() *s3tables.Manager {
manager := s3tables.NewManager()
manager.SetAccountID(s3_constants.AccountAdminId)
return manager
}
func (s *AdminServer) executeS3TablesOperation(ctx context.Context, operation string, req interface{}, resp interface{}) error {
return s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.s3TablesManager.Execute(ctx, mgrClient, operation, req, resp, s3_constants.AccountAdminId)
})
}
// S3Tables data retrieval for pages
func (s *AdminServer) GetS3TablesBucketsData(ctx context.Context) (S3TablesBucketsData, error) {
var buckets []S3TablesBucketSummary
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
Directory: s3tables.TablesPath,
Limit: uint32(s3TablesAdminListLimit * 2),
InclusiveStartFrom: true,
})
if err != nil {
return err
}
for len(buckets) < s3TablesAdminListLimit {
entry, recvErr := resp.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
}
return recvErr
}
if entry.Entry == nil || !entry.Entry.IsDirectory {
continue
}
if strings.HasPrefix(entry.Entry.Name, ".") {
continue
}
metaBytes, ok := entry.Entry.Extended[s3tables.ExtendedKeyMetadata]
if !ok {
continue
}
var metadata tableBucketMetadata
if err := json.Unmarshal(metaBytes, &metadata); err != nil {
glog.V(1).Infof("S3Tables: failed to decode table bucket metadata for %s: %v", entry.Entry.Name, err)
continue
}
arn, err := s3tables.BuildBucketARN(s3tables.DefaultRegion, metadata.OwnerAccountID, entry.Entry.Name)
if err != nil {
glog.V(1).Infof("S3Tables: failed to build table bucket ARN for %s: %v", entry.Entry.Name, err)
continue
}
buckets = append(buckets, S3TablesBucketSummary{
ARN: arn,
Name: entry.Entry.Name,
OwnerAccountID: metadata.OwnerAccountID,
CreatedAt: metadata.CreatedAt,
})
}
return nil
})
if err != nil {
return S3TablesBucketsData{}, err
}
return S3TablesBucketsData{
Buckets: buckets,
TotalBuckets: len(buckets),
LastUpdated: time.Now(),
}, nil
}
func (s *AdminServer) GetS3TablesNamespacesData(ctx context.Context, bucketArn string) (S3TablesNamespacesData, error) {
var resp s3tables.ListNamespacesResponse
req := &s3tables.ListNamespacesRequest{TableBucketARN: bucketArn, MaxNamespaces: s3TablesAdminListLimit}
if err := s.executeS3TablesOperation(ctx, "ListNamespaces", req, &resp); err != nil {
return S3TablesNamespacesData{}, err
}
return S3TablesNamespacesData{
BucketARN: bucketArn,
Namespaces: resp.Namespaces,
TotalNamespaces: len(resp.Namespaces),
LastUpdated: time.Now(),
}, nil
}
func (s *AdminServer) GetS3TablesTablesData(ctx context.Context, bucketArn, namespace string) (S3TablesTablesData, error) {
var resp s3tables.ListTablesResponse
var ns []string
if namespace != "" {
ns = []string{namespace}
}
req := &s3tables.ListTablesRequest{TableBucketARN: bucketArn, Namespace: ns, MaxTables: s3TablesAdminListLimit}
if err := s.executeS3TablesOperation(ctx, "ListTables", req, &resp); err != nil {
return S3TablesTablesData{}, err
}
return S3TablesTablesData{
BucketARN: bucketArn,
Namespace: namespace,
Tables: resp.Tables,
TotalTables: len(resp.Tables),
LastUpdated: time.Now(),
}, nil
}
// Iceberg Catalog data providers
// GetIcebergCatalogData returns the Iceberg catalog overview data.
// Each S3 Table Bucket is exposed as an Iceberg catalog.
func (s *AdminServer) GetIcebergCatalogData(ctx context.Context) (IcebergCatalogData, error) {
bucketsData, err := s.GetS3TablesBucketsData(ctx)
if err != nil {
return IcebergCatalogData{}, err
}
catalogs := make([]IcebergCatalogInfo, 0, len(bucketsData.Buckets))
for _, bucket := range bucketsData.Buckets {
catalogs = append(catalogs, IcebergCatalogInfo{
Name: bucket.Name,
ARN: bucket.ARN,
OwnerAccountID: bucket.OwnerAccountID,
CreatedAt: bucket.CreatedAt,
})
}
return IcebergCatalogData{
Catalogs: catalogs,
TotalCatalogs: len(catalogs),
IcebergPort: s.icebergPort, // Use the port passed to AdminServer
LastUpdated: time.Now(),
}, nil
}
// GetIcebergNamespacesData returns namespaces for an Iceberg catalog.
func (s *AdminServer) GetIcebergNamespacesData(ctx context.Context, catalogName, bucketArn string) (IcebergNamespacesData, error) {
nsData, err := s.GetS3TablesNamespacesData(ctx, bucketArn)
if err != nil {
return IcebergNamespacesData{}, err
}
namespaces := make([]IcebergNamespaceInfo, 0, len(nsData.Namespaces))
for _, ns := range nsData.Namespaces {
name := ""
if len(ns.Namespace) > 0 {
name = strings.Join(ns.Namespace, ".")
}
namespaces = append(namespaces, IcebergNamespaceInfo{
Name: name,
CreatedAt: ns.CreatedAt,
})
}
return IcebergNamespacesData{
CatalogName: catalogName,
Namespaces: namespaces,
TotalNamespaces: len(namespaces),
LastUpdated: time.Now(),
}, nil
}
// GetIcebergTablesData returns tables for an Iceberg namespace.
func (s *AdminServer) GetIcebergTablesData(ctx context.Context, catalogName, bucketArn, namespace string) (IcebergTablesData, error) {
tablesData, err := s.GetS3TablesTablesData(ctx, bucketArn, namespace)
if err != nil {
return IcebergTablesData{}, err
}
tables := make([]IcebergTableInfo, 0, len(tablesData.Tables))
for _, t := range tablesData.Tables {
tables = append(tables, IcebergTableInfo{
Name: t.Name,
CreatedAt: t.CreatedAt,
})
}
return IcebergTablesData{
CatalogName: catalogName,
NamespaceName: namespace,
Tables: tables,
TotalTables: len(tables),
LastUpdated: time.Now(),
}, nil
}
// API handlers
func (s *AdminServer) ListS3TablesBucketsAPI(c *gin.Context) {
data, err := s.GetS3TablesBucketsData(c.Request.Context())
if err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, data)
}
func (s *AdminServer) CreateS3TablesBucket(c *gin.Context) {
var req struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Owner string `json:"owner"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "Invalid request: " + err.Error()})
return
}
if req.Name == "" {
c.JSON(400, gin.H{"error": "Bucket name is required"})
return
}
owner := strings.TrimSpace(req.Owner)
if len(owner) > MaxOwnerNameLength {
c.JSON(400, gin.H{"error": fmt.Sprintf("Owner name must be %d characters or less", MaxOwnerNameLength)})
return
}
if len(req.Tags) > 0 {
if err := s3tables.ValidateTags(req.Tags); err != nil {
c.JSON(400, gin.H{"error": "Invalid tags: " + err.Error()})
return
}
}
createReq := &s3tables.CreateTableBucketRequest{Name: req.Name, Tags: req.Tags}
var resp s3tables.CreateTableBucketResponse
if err := s.executeS3TablesOperation(c.Request.Context(), "CreateTableBucket", createReq, &resp); err != nil {
writeS3TablesError(c, err)
return
}
if owner != "" {
if err := s.SetTableBucketOwner(c.Request.Context(), req.Name, owner); err != nil {
deleteReq := &s3tables.DeleteTableBucketRequest{TableBucketARN: resp.ARN}
if deleteErr := s.executeS3TablesOperation(c.Request.Context(), "DeleteTableBucket", deleteReq, nil); deleteErr != nil {
c.JSON(500, gin.H{"error": fmt.Sprintf("Failed to set table bucket owner: %v; rollback delete failed: %v", err, deleteErr)})
return
}
writeS3TablesError(c, err)
return
}
}
c.JSON(201, gin.H{"arn": resp.ARN})
}
func (s *AdminServer) SetTableBucketOwner(ctx context.Context, bucketName, owner string) error {
return s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
Directory: s3tables.TablesPath,
Name: bucketName,
})
if err != nil {
return fmt.Errorf("lookup table bucket %s: %w", bucketName, err)
}
if resp.Entry == nil {
return fmt.Errorf("table bucket %s not found", bucketName)
}
entry := resp.Entry
if entry.Extended == nil {
return fmt.Errorf("table bucket %s metadata missing", bucketName)
}
metaBytes, ok := entry.Extended[s3tables.ExtendedKeyMetadata]
if !ok {
return fmt.Errorf("table bucket %s metadata missing", bucketName)
}
var metadata tableBucketMetadata
if err := json.Unmarshal(metaBytes, &metadata); err != nil {
return fmt.Errorf("failed to parse table bucket metadata: %w", err)
}
metadata.OwnerAccountID = owner
updated, err := json.Marshal(&metadata)
if err != nil {
return fmt.Errorf("failed to marshal table bucket metadata: %w", err)
}
entry.Extended[s3tables.ExtendedKeyMetadata] = updated
if _, err := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
Directory: s3tables.TablesPath,
Entry: entry,
}); err != nil {
return fmt.Errorf("failed to update table bucket owner: %w", err)
}
return nil
})
}
func (s *AdminServer) DeleteS3TablesBucket(c *gin.Context) {
bucketArn := c.Query("bucket")
if bucketArn == "" {
c.JSON(400, gin.H{"error": "Bucket ARN is required"})
return
}
req := &s3tables.DeleteTableBucketRequest{TableBucketARN: bucketArn}
if err := s.executeS3TablesOperation(c.Request.Context(), "DeleteTableBucket", req, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Bucket deleted"})
}
func (s *AdminServer) ListS3TablesNamespacesAPI(c *gin.Context) {
bucketArn := c.Query("bucket")
if bucketArn == "" {
c.JSON(400, gin.H{"error": "bucket query parameter is required"})
return
}
data, err := s.GetS3TablesNamespacesData(c.Request.Context(), bucketArn)
if err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, data)
}
func (s *AdminServer) CreateS3TablesNamespace(c *gin.Context) {
var req struct {
BucketARN string `json:"bucket_arn"`
Name string `json:"name"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "Invalid request: " + err.Error()})
return
}
if req.BucketARN == "" || req.Name == "" {
c.JSON(400, gin.H{"error": "bucket_arn and name are required"})
return
}
createReq := &s3tables.CreateNamespaceRequest{TableBucketARN: req.BucketARN, Namespace: []string{req.Name}}
var resp s3tables.CreateNamespaceResponse
if err := s.executeS3TablesOperation(c.Request.Context(), "CreateNamespace", createReq, &resp); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(201, gin.H{"namespace": resp.Namespace})
}
func (s *AdminServer) DeleteS3TablesNamespace(c *gin.Context) {
bucketArn := c.Query("bucket")
namespace := c.Query("name")
if bucketArn == "" || namespace == "" {
c.JSON(400, gin.H{"error": "bucket and name query parameters are required"})
return
}
req := &s3tables.DeleteNamespaceRequest{TableBucketARN: bucketArn, Namespace: []string{namespace}}
if err := s.executeS3TablesOperation(c.Request.Context(), "DeleteNamespace", req, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Namespace deleted"})
}
func (s *AdminServer) ListS3TablesTablesAPI(c *gin.Context) {
bucketArn := c.Query("bucket")
if bucketArn == "" {
c.JSON(400, gin.H{"error": "bucket query parameter is required"})
return
}
namespace := c.Query("namespace")
data, err := s.GetS3TablesTablesData(c.Request.Context(), bucketArn, namespace)
if err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, data)
}
func (s *AdminServer) CreateS3TablesTable(c *gin.Context) {
var req struct {
BucketARN string `json:"bucket_arn"`
Namespace string `json:"namespace"`
Name string `json:"name"`
Format string `json:"format"`
Tags map[string]string `json:"tags"`
Metadata *s3tables.TableMetadata `json:"metadata"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "Invalid request: " + err.Error()})
return
}
if req.BucketARN == "" || req.Namespace == "" || req.Name == "" {
c.JSON(400, gin.H{"error": "bucket_arn, namespace, and name are required"})
return
}
format := req.Format
if format == "" {
format = "ICEBERG"
}
if len(req.Tags) > 0 {
if err := s3tables.ValidateTags(req.Tags); err != nil {
c.JSON(400, gin.H{"error": "Invalid tags: " + err.Error()})
return
}
}
createReq := &s3tables.CreateTableRequest{
TableBucketARN: req.BucketARN,
Namespace: []string{req.Namespace},
Name: req.Name,
Format: format,
Tags: req.Tags,
Metadata: req.Metadata,
}
var resp s3tables.CreateTableResponse
if err := s.executeS3TablesOperation(c.Request.Context(), "CreateTable", createReq, &resp); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(201, gin.H{"table_arn": resp.TableARN, "version_token": resp.VersionToken})
}
func (s *AdminServer) DeleteS3TablesTable(c *gin.Context) {
bucketArn := c.Query("bucket")
namespace := c.Query("namespace")
name := c.Query("name")
version := c.Query("version")
if bucketArn == "" || namespace == "" || name == "" {
c.JSON(400, gin.H{"error": "bucket, namespace, and name query parameters are required"})
return
}
req := &s3tables.DeleteTableRequest{TableBucketARN: bucketArn, Namespace: []string{namespace}, Name: name, VersionToken: version}
if err := s.executeS3TablesOperation(c.Request.Context(), "DeleteTable", req, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Table deleted"})
}
func (s *AdminServer) PutS3TablesBucketPolicy(c *gin.Context) {
var req struct {
BucketARN string `json:"bucket_arn"`
Policy string `json:"policy"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "Invalid request: " + err.Error()})
return
}
if req.BucketARN == "" || req.Policy == "" {
c.JSON(400, gin.H{"error": "bucket_arn and policy are required"})
return
}
putReq := &s3tables.PutTableBucketPolicyRequest{TableBucketARN: req.BucketARN, ResourcePolicy: req.Policy}
if err := s.executeS3TablesOperation(c.Request.Context(), "PutTableBucketPolicy", putReq, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Policy updated"})
}
func (s *AdminServer) GetS3TablesBucketPolicy(c *gin.Context) {
bucketArn := c.Query("bucket")
if bucketArn == "" {
c.JSON(400, gin.H{"error": "bucket query parameter is required"})
return
}
getReq := &s3tables.GetTableBucketPolicyRequest{TableBucketARN: bucketArn}
var resp s3tables.GetTableBucketPolicyResponse
if err := s.executeS3TablesOperation(c.Request.Context(), "GetTableBucketPolicy", getReq, &resp); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"policy": resp.ResourcePolicy})
}
func (s *AdminServer) DeleteS3TablesBucketPolicy(c *gin.Context) {
bucketArn := c.Query("bucket")
if bucketArn == "" {
c.JSON(400, gin.H{"error": "bucket query parameter is required"})
return
}
deleteReq := &s3tables.DeleteTableBucketPolicyRequest{TableBucketARN: bucketArn}
if err := s.executeS3TablesOperation(c.Request.Context(), "DeleteTableBucketPolicy", deleteReq, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Policy deleted"})
}
func (s *AdminServer) PutS3TablesTablePolicy(c *gin.Context) {
var req struct {
BucketARN string `json:"bucket_arn"`
Namespace string `json:"namespace"`
Name string `json:"name"`
Policy string `json:"policy"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "Invalid request: " + err.Error()})
return
}
if req.BucketARN == "" || req.Namespace == "" || req.Name == "" || req.Policy == "" {
c.JSON(400, gin.H{"error": "bucket_arn, namespace, name, and policy are required"})
return
}
putReq := &s3tables.PutTablePolicyRequest{TableBucketARN: req.BucketARN, Namespace: []string{req.Namespace}, Name: req.Name, ResourcePolicy: req.Policy}
if err := s.executeS3TablesOperation(c.Request.Context(), "PutTablePolicy", putReq, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Policy updated"})
}
func (s *AdminServer) GetS3TablesTablePolicy(c *gin.Context) {
bucketArn := c.Query("bucket")
namespace := c.Query("namespace")
name := c.Query("name")
if bucketArn == "" || namespace == "" || name == "" {
c.JSON(400, gin.H{"error": "bucket, namespace, and name query parameters are required"})
return
}
getReq := &s3tables.GetTablePolicyRequest{TableBucketARN: bucketArn, Namespace: []string{namespace}, Name: name}
var resp s3tables.GetTablePolicyResponse
if err := s.executeS3TablesOperation(c.Request.Context(), "GetTablePolicy", getReq, &resp); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"policy": resp.ResourcePolicy})
}
func (s *AdminServer) DeleteS3TablesTablePolicy(c *gin.Context) {
bucketArn := c.Query("bucket")
namespace := c.Query("namespace")
name := c.Query("name")
if bucketArn == "" || namespace == "" || name == "" {
c.JSON(400, gin.H{"error": "bucket, namespace, and name query parameters are required"})
return
}
deleteReq := &s3tables.DeleteTablePolicyRequest{TableBucketARN: bucketArn, Namespace: []string{namespace}, Name: name}
if err := s.executeS3TablesOperation(c.Request.Context(), "DeleteTablePolicy", deleteReq, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Policy deleted"})
}
func (s *AdminServer) TagS3TablesResource(c *gin.Context) {
var req struct {
ResourceARN string `json:"resource_arn"`
Tags map[string]string `json:"tags"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "Invalid request: " + err.Error()})
return
}
if req.ResourceARN == "" || len(req.Tags) == 0 {
c.JSON(400, gin.H{"error": "resource_arn and tags are required"})
return
}
if err := s3tables.ValidateTags(req.Tags); err != nil {
c.JSON(400, gin.H{"error": "Invalid tags: " + err.Error()})
return
}
tagReq := &s3tables.TagResourceRequest{ResourceARN: req.ResourceARN, Tags: req.Tags}
if err := s.executeS3TablesOperation(c.Request.Context(), "TagResource", tagReq, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Tags updated"})
}
func (s *AdminServer) ListS3TablesTags(c *gin.Context) {
resourceArn := c.Query("arn")
if resourceArn == "" {
c.JSON(400, gin.H{"error": "arn query parameter is required"})
return
}
listReq := &s3tables.ListTagsForResourceRequest{ResourceARN: resourceArn}
var resp s3tables.ListTagsForResourceResponse
if err := s.executeS3TablesOperation(c.Request.Context(), "ListTagsForResource", listReq, &resp); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, resp)
}
func (s *AdminServer) UntagS3TablesResource(c *gin.Context) {
var req struct {
ResourceARN string `json:"resource_arn"`
TagKeys []string `json:"tag_keys"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "Invalid request: " + err.Error()})
return
}
if req.ResourceARN == "" || len(req.TagKeys) == 0 {
c.JSON(400, gin.H{"error": "resource_arn and tag_keys are required"})
return
}
untagReq := &s3tables.UntagResourceRequest{ResourceARN: req.ResourceARN, TagKeys: req.TagKeys}
if err := s.executeS3TablesOperation(c.Request.Context(), "UntagResource", untagReq, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Tags removed"})
}
func parseS3TablesErrorMessage(err error) string {
if err == nil {
return ""
}
var s3Err *s3tables.S3TablesError
if errors.As(err, &s3Err) {
if s3Err.Message != "" {
return fmt.Sprintf("%s: %s", s3Err.Type, s3Err.Message)
}
return s3Err.Type
}
return err.Error()
}
func writeS3TablesError(c *gin.Context, err error) {
c.JSON(s3TablesErrorStatus(err), gin.H{"error": parseS3TablesErrorMessage(err)})
}
func s3TablesErrorStatus(err error) int {
var s3Err *s3tables.S3TablesError
if errors.As(err, &s3Err) {
switch s3Err.Type {
case s3tables.ErrCodeInvalidRequest:
return http.StatusBadRequest
case s3tables.ErrCodeNoSuchBucket, s3tables.ErrCodeNoSuchNamespace, s3tables.ErrCodeNoSuchTable, s3tables.ErrCodeNoSuchPolicy:
return http.StatusNotFound
case s3tables.ErrCodeAccessDenied:
return http.StatusForbidden
case s3tables.ErrCodeBucketAlreadyExists, s3tables.ErrCodeNamespaceAlreadyExists, s3tables.ErrCodeTableAlreadyExists, s3tables.ErrCodeConflict:
return http.StatusConflict
}
}
return http.StatusInternalServerError
}