Files
seaweedFS/weed/admin/dash/s3tables_management.go
Chris Lu e6ee293c17 Add table operations test (#8241)
* Add Trino blog operations test

* Update test/s3tables/catalog_trino/trino_blog_operations_test.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* feat: add table bucket path helpers and filer operations

- Add table object root and table location mapping directories
- Implement ensureDirectory, upsertFile, deleteEntryIfExists helpers
- Support table location bucket mapping for S3 access

* feat: manage table bucket object roots on creation/deletion

- Create .objects directory for table buckets on creation
- Clean up table object bucket paths on deletion
- Enable S3 operations on table bucket object roots

* feat: add table location mapping for Iceberg REST

- Track table location bucket mappings when tables are created/updated/deleted
- Enable location-based routing for S3 operations on table data

* feat: route S3 operations to table bucket object roots

- Route table-s3 bucket names to mapped table paths
- Route table buckets to object root directories
- Support table location bucket mapping lookup

* feat: emit table-s3 locations from Iceberg REST

- Generate unique table-s3 bucket names with UUID suffix
- Store table metadata under table bucket paths
- Return table-s3 locations for Trino compatibility

* fix: handle missing directories in S3 list operations

- Propagate ErrNotFound from ListEntries for non-existent directories
- Treat missing directories as empty results for list operations
- Fixes Trino non-empty location checks on table creation

* test: improve Trino CSV parsing for single-value results

- Sanitize Trino output to skip jline warnings
- Handle single-value CSV results without header rows
- Strip quotes from numeric values in tests

* refactor: use bucket path helpers throughout S3 API

- Replace direct bucket path operations with helper functions
- Leverage centralized table bucket routing logic
- Improve maintainability with consistent path resolution

* fix: add table bucket cache and improve filer error handling

- Cache table bucket lookups to reduce filer overhead on repeated checks
- Use filer_pb.CreateEntry and filer_pb.UpdateEntry helpers to check resp.Error
- Fix delete order in handler_bucket_get_list_delete: delete table object before directory
- Make location mapping errors best-effort: log and continue, don't fail API
- Update table location mappings to delete stale prior bucket mappings on update
- Add 1-second sleep before timestamp time travel query to ensure timestamps are in past
- Fix CSV parsing: examine all lines, not skip first; handle single-value rows

* fix: properly handle stale metadata location mapping cleanup

- Capture oldMetadataLocation before mutation in handleUpdateTable
- Update updateTableLocationMapping to accept both old and new locations
- Use passed-in oldMetadataLocation to detect location changes
- Delete stale mapping only when location actually changes
- Pass empty string for oldLocation in handleCreateTable (new tables have no prior mapping)
- Improve logging to show old -> new location transitions

* refactor: cleanup imports and cache design

- Remove unused 'sync' import from bucket_paths.go
- Use filer_pb.UpdateEntry helper in setExtendedAttribute and deleteExtendedAttribute for consistent error handling
- Add dedicated tableBucketCache map[string]bool to BucketRegistry instead of mixing concerns with metadataCache
- Improve cache separation: table buckets cache is now separate from bucket metadata cache

* fix: improve cache invalidation and add transient error handling

Cache invalidation (critical fix):
- Add tableLocationCache to BucketRegistry for location mapping lookups
- Clear tableBucketCache and tableLocationCache in RemoveBucketMetadata
- Prevents stale cache entries when buckets are deleted/recreated

Transient error handling:
- Only cache table bucket lookups when conclusive (found or ErrNotFound)
- Skip caching on transient errors (network, permission, etc)
- Prevents marking real table buckets as non-table due to transient failures

Performance optimization:
- Cache tableLocationDir results to avoid repeated filer RPCs on hot paths
- tableLocationDir now checks cache before making expensive filer lookups
- Cache stores empty string for 'not found' to avoid redundant lookups

Code clarity:
- Add comment to deleteDirectory explaining DeleteEntry response lacks Error field

* go fmt

* fix: mirror transient error handling in tableLocationDir and optimize bucketDir

Transient error handling:
- tableLocationDir now only caches definitive results
- Mirrors isTableBucket behavior to prevent treating transient errors as permanent misses
- Improves reliability on flaky systems or during recovery

Performance optimization:
- bucketDir avoids redundant isTableBucket call via bucketRoot
- Directly use s3a.option.BucketsPath for regular buckets
- Saves one cache lookup for every non-table bucket operation

* fix: revert bucketDir optimization to preserve bucketRoot logic

The optimization to directly use BucketsPath bypassed bucketRoot's logic
and caused issues with S3 list operations on delimiter+prefix cases.

Revert to using path.Join(s3a.bucketRoot(bucket), bucket) which properly
handles all bucket types and ensures consistent path resolution across
the codebase.

The slight performance cost of an extra cache lookup is worth the correctness
and consistency benefits.

* feat: move table buckets under /buckets

Add a table-bucket marker attribute, reuse bucket metadata cache for table bucket detection, and update list/validation/UI/test paths to treat table buckets as /buckets entries.

* Fix S3 Tables code review issues

- handler_bucket_create.go: Fix bucket existence check to properly validate
  entryResp.Entry before setting s3BucketExists flag (nil Entry should not
  indicate existing bucket)
- bucket_paths.go: Add clarifying comment to bucketRoot() explaining unified
  buckets root path for all bucket types
- file_browser_data.go: Optimize by extracting table bucket check early to
  avoid redundant WithFilerClient call

* Fix list prefix delimiter handling

* Handle list errors conservatively

* Fix Trino FOR TIMESTAMP query - use past timestamp

Iceberg requires the timestamp to be strictly in the past.
Use current_timestamp - interval '1' second instead of current_timestamp.

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2026-02-07 13:27:47 -08:00

688 lines
22 KiB
Go

package dash
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
)
// S3Tables data structures for admin UI
type S3TablesBucketsData struct {
Username string `json:"username"`
Buckets []S3TablesBucketSummary `json:"buckets"`
TotalBuckets int `json:"total_buckets"`
LastUpdated time.Time `json:"last_updated"`
}
type S3TablesBucketSummary struct {
ARN string `json:"arn"`
Name string `json:"name"`
OwnerAccountID string `json:"ownerAccountId"`
CreatedAt time.Time `json:"createdAt"`
}
type S3TablesNamespacesData struct {
Username string `json:"username"`
BucketARN string `json:"bucket_arn"`
Namespaces []s3tables.NamespaceSummary `json:"namespaces"`
TotalNamespaces int `json:"total_namespaces"`
LastUpdated time.Time `json:"last_updated"`
}
type S3TablesTablesData struct {
Username string `json:"username"`
BucketARN string `json:"bucket_arn"`
Namespace string `json:"namespace"`
Tables []s3tables.TableSummary `json:"tables"`
TotalTables int `json:"total_tables"`
LastUpdated time.Time `json:"last_updated"`
}
type tableBucketMetadata struct {
Name string `json:"name"`
CreatedAt time.Time `json:"createdAt"`
OwnerAccountID string `json:"ownerAccountId"`
}
// S3Tables manager helpers
const s3TablesAdminListLimit = 1000
func newS3TablesManager() *s3tables.Manager {
manager := s3tables.NewManager()
manager.SetAccountID(s3_constants.AccountAdminId)
return manager
}
func (s *AdminServer) executeS3TablesOperation(ctx context.Context, operation string, req interface{}, resp interface{}) error {
return s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
mgrClient := s3tables.NewManagerClient(client)
return s.s3TablesManager.Execute(ctx, mgrClient, operation, req, resp, s3_constants.AccountAdminId)
})
}
// S3Tables data retrieval for pages
func (s *AdminServer) GetS3TablesBucketsData(ctx context.Context) (S3TablesBucketsData, error) {
var buckets []S3TablesBucketSummary
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
Directory: s3tables.TablesPath,
Limit: uint32(s3TablesAdminListLimit * 2),
InclusiveStartFrom: true,
})
if err != nil {
return err
}
for len(buckets) < s3TablesAdminListLimit {
entry, recvErr := resp.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
}
return recvErr
}
if entry.Entry == nil || !entry.Entry.IsDirectory {
continue
}
if strings.HasPrefix(entry.Entry.Name, ".") {
continue
}
if !s3tables.IsTableBucketEntry(entry.Entry) {
continue
}
metaBytes, ok := entry.Entry.Extended[s3tables.ExtendedKeyMetadata]
if !ok {
continue
}
var metadata tableBucketMetadata
if err := json.Unmarshal(metaBytes, &metadata); err != nil {
glog.V(1).Infof("S3Tables: failed to decode table bucket metadata for %s: %v", entry.Entry.Name, err)
continue
}
arn, err := s3tables.BuildBucketARN(s3tables.DefaultRegion, metadata.OwnerAccountID, entry.Entry.Name)
if err != nil {
glog.V(1).Infof("S3Tables: failed to build table bucket ARN for %s: %v", entry.Entry.Name, err)
continue
}
buckets = append(buckets, S3TablesBucketSummary{
ARN: arn,
Name: entry.Entry.Name,
OwnerAccountID: metadata.OwnerAccountID,
CreatedAt: metadata.CreatedAt,
})
}
return nil
})
if err != nil {
return S3TablesBucketsData{}, err
}
return S3TablesBucketsData{
Buckets: buckets,
TotalBuckets: len(buckets),
LastUpdated: time.Now(),
}, nil
}
func (s *AdminServer) GetS3TablesNamespacesData(ctx context.Context, bucketArn string) (S3TablesNamespacesData, error) {
var resp s3tables.ListNamespacesResponse
req := &s3tables.ListNamespacesRequest{TableBucketARN: bucketArn, MaxNamespaces: s3TablesAdminListLimit}
if err := s.executeS3TablesOperation(ctx, "ListNamespaces", req, &resp); err != nil {
return S3TablesNamespacesData{}, err
}
return S3TablesNamespacesData{
BucketARN: bucketArn,
Namespaces: resp.Namespaces,
TotalNamespaces: len(resp.Namespaces),
LastUpdated: time.Now(),
}, nil
}
func (s *AdminServer) GetS3TablesTablesData(ctx context.Context, bucketArn, namespace string) (S3TablesTablesData, error) {
var resp s3tables.ListTablesResponse
var ns []string
if namespace != "" {
ns = []string{namespace}
}
req := &s3tables.ListTablesRequest{TableBucketARN: bucketArn, Namespace: ns, MaxTables: s3TablesAdminListLimit}
if err := s.executeS3TablesOperation(ctx, "ListTables", req, &resp); err != nil {
return S3TablesTablesData{}, err
}
return S3TablesTablesData{
BucketARN: bucketArn,
Namespace: namespace,
Tables: resp.Tables,
TotalTables: len(resp.Tables),
LastUpdated: time.Now(),
}, nil
}
// Iceberg Catalog data providers
// GetIcebergCatalogData returns the Iceberg catalog overview data.
// Each S3 Table Bucket is exposed as an Iceberg catalog.
func (s *AdminServer) GetIcebergCatalogData(ctx context.Context) (IcebergCatalogData, error) {
bucketsData, err := s.GetS3TablesBucketsData(ctx)
if err != nil {
return IcebergCatalogData{}, err
}
catalogs := make([]IcebergCatalogInfo, 0, len(bucketsData.Buckets))
for _, bucket := range bucketsData.Buckets {
catalogs = append(catalogs, IcebergCatalogInfo{
Name: bucket.Name,
ARN: bucket.ARN,
OwnerAccountID: bucket.OwnerAccountID,
CreatedAt: bucket.CreatedAt,
})
}
return IcebergCatalogData{
Catalogs: catalogs,
TotalCatalogs: len(catalogs),
IcebergPort: s.icebergPort, // Use the port passed to AdminServer
LastUpdated: time.Now(),
}, nil
}
// GetIcebergNamespacesData returns namespaces for an Iceberg catalog.
func (s *AdminServer) GetIcebergNamespacesData(ctx context.Context, catalogName, bucketArn string) (IcebergNamespacesData, error) {
nsData, err := s.GetS3TablesNamespacesData(ctx, bucketArn)
if err != nil {
return IcebergNamespacesData{}, err
}
namespaces := make([]IcebergNamespaceInfo, 0, len(nsData.Namespaces))
for _, ns := range nsData.Namespaces {
name := ""
if len(ns.Namespace) > 0 {
name = strings.Join(ns.Namespace, ".")
}
namespaces = append(namespaces, IcebergNamespaceInfo{
Name: name,
CreatedAt: ns.CreatedAt,
})
}
return IcebergNamespacesData{
CatalogName: catalogName,
Namespaces: namespaces,
TotalNamespaces: len(namespaces),
LastUpdated: time.Now(),
}, nil
}
// GetIcebergTablesData returns tables for an Iceberg namespace.
func (s *AdminServer) GetIcebergTablesData(ctx context.Context, catalogName, bucketArn, namespace string) (IcebergTablesData, error) {
tablesData, err := s.GetS3TablesTablesData(ctx, bucketArn, namespace)
if err != nil {
return IcebergTablesData{}, err
}
tables := make([]IcebergTableInfo, 0, len(tablesData.Tables))
for _, t := range tablesData.Tables {
tables = append(tables, IcebergTableInfo{
Name: t.Name,
CreatedAt: t.CreatedAt,
})
}
return IcebergTablesData{
CatalogName: catalogName,
NamespaceName: namespace,
Tables: tables,
TotalTables: len(tables),
LastUpdated: time.Now(),
}, nil
}
// API handlers
func (s *AdminServer) ListS3TablesBucketsAPI(c *gin.Context) {
data, err := s.GetS3TablesBucketsData(c.Request.Context())
if err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, data)
}
func (s *AdminServer) CreateS3TablesBucket(c *gin.Context) {
var req struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Owner string `json:"owner"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "Invalid request: " + err.Error()})
return
}
if req.Name == "" {
c.JSON(400, gin.H{"error": "Bucket name is required"})
return
}
owner := strings.TrimSpace(req.Owner)
if len(owner) > MaxOwnerNameLength {
c.JSON(400, gin.H{"error": fmt.Sprintf("Owner name must be %d characters or less", MaxOwnerNameLength)})
return
}
if len(req.Tags) > 0 {
if err := s3tables.ValidateTags(req.Tags); err != nil {
c.JSON(400, gin.H{"error": "Invalid tags: " + err.Error()})
return
}
}
createReq := &s3tables.CreateTableBucketRequest{Name: req.Name, Tags: req.Tags}
var resp s3tables.CreateTableBucketResponse
if err := s.executeS3TablesOperation(c.Request.Context(), "CreateTableBucket", createReq, &resp); err != nil {
writeS3TablesError(c, err)
return
}
if owner != "" {
if err := s.SetTableBucketOwner(c.Request.Context(), req.Name, owner); err != nil {
deleteReq := &s3tables.DeleteTableBucketRequest{TableBucketARN: resp.ARN}
if deleteErr := s.executeS3TablesOperation(c.Request.Context(), "DeleteTableBucket", deleteReq, nil); deleteErr != nil {
c.JSON(500, gin.H{"error": fmt.Sprintf("Failed to set table bucket owner: %v; rollback delete failed: %v", err, deleteErr)})
return
}
writeS3TablesError(c, err)
return
}
}
c.JSON(201, gin.H{"arn": resp.ARN})
}
func (s *AdminServer) SetTableBucketOwner(ctx context.Context, bucketName, owner string) error {
return s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
Directory: s3tables.TablesPath,
Name: bucketName,
})
if err != nil {
return fmt.Errorf("lookup table bucket %s: %w", bucketName, err)
}
if resp.Entry == nil {
return fmt.Errorf("table bucket %s not found", bucketName)
}
entry := resp.Entry
if entry.Extended == nil {
return fmt.Errorf("table bucket %s metadata missing", bucketName)
}
metaBytes, ok := entry.Extended[s3tables.ExtendedKeyMetadata]
if !ok {
return fmt.Errorf("table bucket %s metadata missing", bucketName)
}
var metadata tableBucketMetadata
if err := json.Unmarshal(metaBytes, &metadata); err != nil {
return fmt.Errorf("failed to parse table bucket metadata: %w", err)
}
metadata.OwnerAccountID = owner
updated, err := json.Marshal(&metadata)
if err != nil {
return fmt.Errorf("failed to marshal table bucket metadata: %w", err)
}
entry.Extended[s3tables.ExtendedKeyMetadata] = updated
if _, err := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
Directory: s3tables.TablesPath,
Entry: entry,
}); err != nil {
return fmt.Errorf("failed to update table bucket owner: %w", err)
}
return nil
})
}
func (s *AdminServer) DeleteS3TablesBucket(c *gin.Context) {
bucketArn := c.Query("bucket")
if bucketArn == "" {
c.JSON(400, gin.H{"error": "Bucket ARN is required"})
return
}
req := &s3tables.DeleteTableBucketRequest{TableBucketARN: bucketArn}
if err := s.executeS3TablesOperation(c.Request.Context(), "DeleteTableBucket", req, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Bucket deleted"})
}
func (s *AdminServer) ListS3TablesNamespacesAPI(c *gin.Context) {
bucketArn := c.Query("bucket")
if bucketArn == "" {
c.JSON(400, gin.H{"error": "bucket query parameter is required"})
return
}
data, err := s.GetS3TablesNamespacesData(c.Request.Context(), bucketArn)
if err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, data)
}
func (s *AdminServer) CreateS3TablesNamespace(c *gin.Context) {
var req struct {
BucketARN string `json:"bucket_arn"`
Name string `json:"name"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "Invalid request: " + err.Error()})
return
}
if req.BucketARN == "" || req.Name == "" {
c.JSON(400, gin.H{"error": "bucket_arn and name are required"})
return
}
createReq := &s3tables.CreateNamespaceRequest{TableBucketARN: req.BucketARN, Namespace: []string{req.Name}}
var resp s3tables.CreateNamespaceResponse
if err := s.executeS3TablesOperation(c.Request.Context(), "CreateNamespace", createReq, &resp); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(201, gin.H{"namespace": resp.Namespace})
}
func (s *AdminServer) DeleteS3TablesNamespace(c *gin.Context) {
bucketArn := c.Query("bucket")
namespace := c.Query("name")
if bucketArn == "" || namespace == "" {
c.JSON(400, gin.H{"error": "bucket and name query parameters are required"})
return
}
req := &s3tables.DeleteNamespaceRequest{TableBucketARN: bucketArn, Namespace: []string{namespace}}
if err := s.executeS3TablesOperation(c.Request.Context(), "DeleteNamespace", req, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Namespace deleted"})
}
func (s *AdminServer) ListS3TablesTablesAPI(c *gin.Context) {
bucketArn := c.Query("bucket")
if bucketArn == "" {
c.JSON(400, gin.H{"error": "bucket query parameter is required"})
return
}
namespace := c.Query("namespace")
data, err := s.GetS3TablesTablesData(c.Request.Context(), bucketArn, namespace)
if err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, data)
}
func (s *AdminServer) CreateS3TablesTable(c *gin.Context) {
var req struct {
BucketARN string `json:"bucket_arn"`
Namespace string `json:"namespace"`
Name string `json:"name"`
Format string `json:"format"`
Tags map[string]string `json:"tags"`
Metadata *s3tables.TableMetadata `json:"metadata"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "Invalid request: " + err.Error()})
return
}
if req.BucketARN == "" || req.Namespace == "" || req.Name == "" {
c.JSON(400, gin.H{"error": "bucket_arn, namespace, and name are required"})
return
}
format := req.Format
if format == "" {
format = "ICEBERG"
}
if len(req.Tags) > 0 {
if err := s3tables.ValidateTags(req.Tags); err != nil {
c.JSON(400, gin.H{"error": "Invalid tags: " + err.Error()})
return
}
}
createReq := &s3tables.CreateTableRequest{
TableBucketARN: req.BucketARN,
Namespace: []string{req.Namespace},
Name: req.Name,
Format: format,
Tags: req.Tags,
Metadata: req.Metadata,
}
var resp s3tables.CreateTableResponse
if err := s.executeS3TablesOperation(c.Request.Context(), "CreateTable", createReq, &resp); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(201, gin.H{"table_arn": resp.TableARN, "version_token": resp.VersionToken})
}
func (s *AdminServer) DeleteS3TablesTable(c *gin.Context) {
bucketArn := c.Query("bucket")
namespace := c.Query("namespace")
name := c.Query("name")
version := c.Query("version")
if bucketArn == "" || namespace == "" || name == "" {
c.JSON(400, gin.H{"error": "bucket, namespace, and name query parameters are required"})
return
}
req := &s3tables.DeleteTableRequest{TableBucketARN: bucketArn, Namespace: []string{namespace}, Name: name, VersionToken: version}
if err := s.executeS3TablesOperation(c.Request.Context(), "DeleteTable", req, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Table deleted"})
}
func (s *AdminServer) PutS3TablesBucketPolicy(c *gin.Context) {
var req struct {
BucketARN string `json:"bucket_arn"`
Policy string `json:"policy"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "Invalid request: " + err.Error()})
return
}
if req.BucketARN == "" || req.Policy == "" {
c.JSON(400, gin.H{"error": "bucket_arn and policy are required"})
return
}
putReq := &s3tables.PutTableBucketPolicyRequest{TableBucketARN: req.BucketARN, ResourcePolicy: req.Policy}
if err := s.executeS3TablesOperation(c.Request.Context(), "PutTableBucketPolicy", putReq, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Policy updated"})
}
func (s *AdminServer) GetS3TablesBucketPolicy(c *gin.Context) {
bucketArn := c.Query("bucket")
if bucketArn == "" {
c.JSON(400, gin.H{"error": "bucket query parameter is required"})
return
}
getReq := &s3tables.GetTableBucketPolicyRequest{TableBucketARN: bucketArn}
var resp s3tables.GetTableBucketPolicyResponse
if err := s.executeS3TablesOperation(c.Request.Context(), "GetTableBucketPolicy", getReq, &resp); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"policy": resp.ResourcePolicy})
}
func (s *AdminServer) DeleteS3TablesBucketPolicy(c *gin.Context) {
bucketArn := c.Query("bucket")
if bucketArn == "" {
c.JSON(400, gin.H{"error": "bucket query parameter is required"})
return
}
deleteReq := &s3tables.DeleteTableBucketPolicyRequest{TableBucketARN: bucketArn}
if err := s.executeS3TablesOperation(c.Request.Context(), "DeleteTableBucketPolicy", deleteReq, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Policy deleted"})
}
func (s *AdminServer) PutS3TablesTablePolicy(c *gin.Context) {
var req struct {
BucketARN string `json:"bucket_arn"`
Namespace string `json:"namespace"`
Name string `json:"name"`
Policy string `json:"policy"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "Invalid request: " + err.Error()})
return
}
if req.BucketARN == "" || req.Namespace == "" || req.Name == "" || req.Policy == "" {
c.JSON(400, gin.H{"error": "bucket_arn, namespace, name, and policy are required"})
return
}
putReq := &s3tables.PutTablePolicyRequest{TableBucketARN: req.BucketARN, Namespace: []string{req.Namespace}, Name: req.Name, ResourcePolicy: req.Policy}
if err := s.executeS3TablesOperation(c.Request.Context(), "PutTablePolicy", putReq, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Policy updated"})
}
func (s *AdminServer) GetS3TablesTablePolicy(c *gin.Context) {
bucketArn := c.Query("bucket")
namespace := c.Query("namespace")
name := c.Query("name")
if bucketArn == "" || namespace == "" || name == "" {
c.JSON(400, gin.H{"error": "bucket, namespace, and name query parameters are required"})
return
}
getReq := &s3tables.GetTablePolicyRequest{TableBucketARN: bucketArn, Namespace: []string{namespace}, Name: name}
var resp s3tables.GetTablePolicyResponse
if err := s.executeS3TablesOperation(c.Request.Context(), "GetTablePolicy", getReq, &resp); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"policy": resp.ResourcePolicy})
}
func (s *AdminServer) DeleteS3TablesTablePolicy(c *gin.Context) {
bucketArn := c.Query("bucket")
namespace := c.Query("namespace")
name := c.Query("name")
if bucketArn == "" || namespace == "" || name == "" {
c.JSON(400, gin.H{"error": "bucket, namespace, and name query parameters are required"})
return
}
deleteReq := &s3tables.DeleteTablePolicyRequest{TableBucketARN: bucketArn, Namespace: []string{namespace}, Name: name}
if err := s.executeS3TablesOperation(c.Request.Context(), "DeleteTablePolicy", deleteReq, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Policy deleted"})
}
func (s *AdminServer) TagS3TablesResource(c *gin.Context) {
var req struct {
ResourceARN string `json:"resource_arn"`
Tags map[string]string `json:"tags"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "Invalid request: " + err.Error()})
return
}
if req.ResourceARN == "" || len(req.Tags) == 0 {
c.JSON(400, gin.H{"error": "resource_arn and tags are required"})
return
}
if err := s3tables.ValidateTags(req.Tags); err != nil {
c.JSON(400, gin.H{"error": "Invalid tags: " + err.Error()})
return
}
tagReq := &s3tables.TagResourceRequest{ResourceARN: req.ResourceARN, Tags: req.Tags}
if err := s.executeS3TablesOperation(c.Request.Context(), "TagResource", tagReq, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Tags updated"})
}
func (s *AdminServer) ListS3TablesTags(c *gin.Context) {
resourceArn := c.Query("arn")
if resourceArn == "" {
c.JSON(400, gin.H{"error": "arn query parameter is required"})
return
}
listReq := &s3tables.ListTagsForResourceRequest{ResourceARN: resourceArn}
var resp s3tables.ListTagsForResourceResponse
if err := s.executeS3TablesOperation(c.Request.Context(), "ListTagsForResource", listReq, &resp); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, resp)
}
func (s *AdminServer) UntagS3TablesResource(c *gin.Context) {
var req struct {
ResourceARN string `json:"resource_arn"`
TagKeys []string `json:"tag_keys"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "Invalid request: " + err.Error()})
return
}
if req.ResourceARN == "" || len(req.TagKeys) == 0 {
c.JSON(400, gin.H{"error": "resource_arn and tag_keys are required"})
return
}
untagReq := &s3tables.UntagResourceRequest{ResourceARN: req.ResourceARN, TagKeys: req.TagKeys}
if err := s.executeS3TablesOperation(c.Request.Context(), "UntagResource", untagReq, nil); err != nil {
writeS3TablesError(c, err)
return
}
c.JSON(200, gin.H{"message": "Tags removed"})
}
func parseS3TablesErrorMessage(err error) string {
if err == nil {
return ""
}
var s3Err *s3tables.S3TablesError
if errors.As(err, &s3Err) {
if s3Err.Message != "" {
return fmt.Sprintf("%s: %s", s3Err.Type, s3Err.Message)
}
return s3Err.Type
}
return err.Error()
}
func writeS3TablesError(c *gin.Context, err error) {
c.JSON(s3TablesErrorStatus(err), gin.H{"error": parseS3TablesErrorMessage(err)})
}
func s3TablesErrorStatus(err error) int {
var s3Err *s3tables.S3TablesError
if errors.As(err, &s3Err) {
switch s3Err.Type {
case s3tables.ErrCodeInvalidRequest:
return http.StatusBadRequest
case s3tables.ErrCodeNoSuchBucket, s3tables.ErrCodeNoSuchNamespace, s3tables.ErrCodeNoSuchTable, s3tables.ErrCodeNoSuchPolicy:
return http.StatusNotFound
case s3tables.ErrCodeAccessDenied:
return http.StatusForbidden
case s3tables.ErrCodeBucketAlreadyExists, s3tables.ErrCodeNamespaceAlreadyExists, s3tables.ErrCodeTableAlreadyExists, s3tables.ErrCodeConflict:
return http.StatusConflict
}
}
return http.StatusInternalServerError
}