s3tables: complete s3tables package implementation
- namespace.go: namespace CRUD operations (310 lines) - table.go: table CRUD operations with Iceberg schema support (409 lines) - policy.go: resource policies and tagging operations (419 lines) - types.go: request/response types and error definitions (290 lines) - All handlers updated to use standalone utilities from utils.go - All files follow single responsibility principle
This commit is contained in:
310
weed/s3api/s3tables/namespace.go
Normal file
310
weed/s3api/s3tables/namespace.go
Normal file
@@ -0,0 +1,310 @@
|
|||||||
|
package s3tables
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// handleCreateNamespace creates a new namespace in a table bucket
|
||||||
|
func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req CreateNamespaceRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.TableBucketARN == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
|
||||||
|
return fmt.Errorf("tableBucketARN is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(req.Namespace) == 0 {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "namespace is required")
|
||||||
|
return fmt.Errorf("namespace is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// For simplicity, use the first namespace element as the directory name
|
||||||
|
namespaceName := req.Namespace[0]
|
||||||
|
|
||||||
|
// Validate namespace name
|
||||||
|
if len(namespaceName) < 1 || len(namespaceName) > 255 {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "namespace name must be between 1 and 255 characters")
|
||||||
|
return fmt.Errorf("invalid namespace name length")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if table bucket exists
|
||||||
|
bucketPath := getTableBucketPath(bucketName)
|
||||||
|
var bucketExists bool
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
_, err := h.getExtendedAttribute(client, bucketPath, ExtendedKeyMetadata)
|
||||||
|
bucketExists = err == nil
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if !bucketExists {
|
||||||
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchBucket, fmt.Sprintf("table bucket %s not found", bucketName))
|
||||||
|
return fmt.Errorf("bucket not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
namespacePath := getNamespacePath(bucketName, namespaceName)
|
||||||
|
|
||||||
|
// Check if namespace already exists
|
||||||
|
exists := false
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
_, err := h.getExtendedAttribute(client, namespacePath, ExtendedKeyMetadata)
|
||||||
|
exists = err == nil
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if exists {
|
||||||
|
h.writeError(w, http.StatusConflict, ErrCodeNamespaceAlreadyExists, fmt.Sprintf("namespace %s already exists", namespaceName))
|
||||||
|
return fmt.Errorf("namespace already exists")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the namespace
|
||||||
|
now := time.Now()
|
||||||
|
metadata := &namespaceMetadata{
|
||||||
|
Namespace: req.Namespace,
|
||||||
|
CreatedAt: now,
|
||||||
|
OwnerID: h.accountID,
|
||||||
|
}
|
||||||
|
|
||||||
|
metadataBytes, _ := json.Marshal(metadata)
|
||||||
|
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
// Create namespace directory
|
||||||
|
if err := h.createDirectory(client, namespacePath); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set metadata as extended attribute
|
||||||
|
if err := h.setExtendedAttribute(client, namespacePath, ExtendedKeyMetadata, metadataBytes); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to create namespace")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &CreateNamespaceResponse{
|
||||||
|
Namespace: req.Namespace,
|
||||||
|
TableBucketARN: req.TableBucketARN,
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, resp)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleGetNamespace gets details of a namespace
|
||||||
|
func (h *S3TablesHandler) handleGetNamespace(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req GetNamespaceRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.TableBucketARN == "" || req.Namespace == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN and namespace are required")
|
||||||
|
return fmt.Errorf("tableBucketARN and namespace are required")
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
namespacePath := getNamespacePath(bucketName, req.Namespace)
|
||||||
|
|
||||||
|
var metadata namespaceMetadata
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
data, err := h.getExtendedAttribute(client, namespacePath, ExtendedKeyMetadata)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return json.Unmarshal(data, &metadata)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", req.Namespace))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &GetNamespaceResponse{
|
||||||
|
Namespace: metadata.Namespace,
|
||||||
|
CreatedAt: metadata.CreatedAt,
|
||||||
|
OwnerAccountID: metadata.OwnerID,
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, resp)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleListNamespaces lists all namespaces in a table bucket
|
||||||
|
func (h *S3TablesHandler) handleListNamespaces(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req ListNamespacesRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.TableBucketARN == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
|
||||||
|
return fmt.Errorf("tableBucketARN is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
maxNamespaces := req.MaxNamespaces
|
||||||
|
if maxNamespaces <= 0 {
|
||||||
|
maxNamespaces = 100
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketPath := getTableBucketPath(bucketName)
|
||||||
|
var namespaces []NamespaceSummary
|
||||||
|
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
resp, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
|
||||||
|
Directory: bucketPath,
|
||||||
|
Limit: uint32(maxNamespaces),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
entry, err := resp.Recv()
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if entry.Entry == nil || !entry.Entry.IsDirectory {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip hidden entries
|
||||||
|
if strings.HasPrefix(entry.Entry.Name, ".") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply prefix filter
|
||||||
|
if req.Prefix != "" && !strings.HasPrefix(entry.Entry.Name, req.Prefix) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read metadata from extended attribute
|
||||||
|
data, ok := entry.Entry.Extended[ExtendedKeyMetadata]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var metadata namespaceMetadata
|
||||||
|
if err := json.Unmarshal(data, &metadata); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
namespaces = append(namespaces, NamespaceSummary{
|
||||||
|
Namespace: metadata.Namespace,
|
||||||
|
CreatedAt: metadata.CreatedAt,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
namespaces = []NamespaceSummary{}
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &ListNamespacesResponse{
|
||||||
|
Namespaces: namespaces,
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, resp)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleDeleteNamespace deletes a namespace from a table bucket
|
||||||
|
func (h *S3TablesHandler) handleDeleteNamespace(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req DeleteNamespaceRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.TableBucketARN == "" || req.Namespace == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN and namespace are required")
|
||||||
|
return fmt.Errorf("tableBucketARN and namespace are required")
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
namespacePath := getNamespacePath(bucketName, req.Namespace)
|
||||||
|
|
||||||
|
// Check if namespace exists and is empty
|
||||||
|
hasChildren := false
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
resp, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
|
||||||
|
Directory: namespacePath,
|
||||||
|
Limit: 10,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
entry, err := resp.Recv()
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if entry.Entry != nil && !strings.HasPrefix(entry.Entry.Name, ".") {
|
||||||
|
hasChildren = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if hasChildren {
|
||||||
|
h.writeError(w, http.StatusConflict, ErrCodeNamespaceNotEmpty, "namespace is not empty")
|
||||||
|
return fmt.Errorf("namespace not empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the namespace
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
return h.deleteDirectory(client, namespacePath)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete namespace")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, nil)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
419
weed/s3api/s3tables/policy.go
Normal file
419
weed/s3api/s3tables/policy.go
Normal file
@@ -0,0 +1,419 @@
|
|||||||
|
package s3tables
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// handlePutTableBucketPolicy puts a policy on a table bucket
|
||||||
|
func (h *S3TablesHandler) handlePutTableBucketPolicy(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req PutTableBucketPolicyRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.TableBucketARN == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
|
||||||
|
return fmt.Errorf("tableBucketARN is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.ResourcePolicy == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "resourcePolicy is required")
|
||||||
|
return fmt.Errorf("resourcePolicy is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if bucket exists
|
||||||
|
bucketPath := getTableBucketPath(bucketName)
|
||||||
|
var bucketExists bool
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
_, err := h.getExtendedAttribute(client, bucketPath, ExtendedKeyMetadata)
|
||||||
|
bucketExists = err == nil
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if !bucketExists {
|
||||||
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchBucket, fmt.Sprintf("table bucket %s not found", bucketName))
|
||||||
|
return fmt.Errorf("bucket not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write policy
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
return h.setExtendedAttribute(client, bucketPath, ExtendedKeyPolicy, []byte(req.ResourcePolicy))
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to put table bucket policy")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, nil)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleGetTableBucketPolicy gets the policy of a table bucket
|
||||||
|
func (h *S3TablesHandler) handleGetTableBucketPolicy(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req GetTableBucketPolicyRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.TableBucketARN == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
|
||||||
|
return fmt.Errorf("tableBucketARN is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketPath := getTableBucketPath(bucketName)
|
||||||
|
var policy []byte
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
var readErr error
|
||||||
|
policy, readErr = h.getExtendedAttribute(client, bucketPath, ExtendedKeyPolicy)
|
||||||
|
return readErr
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchPolicy, "table bucket policy not found")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &GetTableBucketPolicyResponse{
|
||||||
|
ResourcePolicy: string(policy),
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, resp)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleDeleteTableBucketPolicy deletes the policy of a table bucket
|
||||||
|
func (h *S3TablesHandler) handleDeleteTableBucketPolicy(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req DeleteTableBucketPolicyRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.TableBucketARN == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
|
||||||
|
return fmt.Errorf("tableBucketARN is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketPath := getTableBucketPath(bucketName)
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
return h.deleteExtendedAttribute(client, bucketPath, ExtendedKeyPolicy)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
// Ignore error if policy doesn't exist
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, nil)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handlePutTablePolicy puts a policy on a table
|
||||||
|
func (h *S3TablesHandler) handlePutTablePolicy(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req PutTablePolicyRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required")
|
||||||
|
return fmt.Errorf("missing required parameters")
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.ResourcePolicy == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "resourcePolicy is required")
|
||||||
|
return fmt.Errorf("resourcePolicy is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if table exists
|
||||||
|
tablePath := getTablePath(bucketName, req.Namespace, req.Name)
|
||||||
|
var tableExists bool
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
_, err := h.getExtendedAttribute(client, tablePath, ExtendedKeyMetadata)
|
||||||
|
tableExists = err == nil
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if !tableExists {
|
||||||
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", req.Name))
|
||||||
|
return fmt.Errorf("table not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write policy
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
return h.setExtendedAttribute(client, tablePath, ExtendedKeyPolicy, []byte(req.ResourcePolicy))
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to put table policy")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, nil)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleGetTablePolicy gets the policy of a table
|
||||||
|
func (h *S3TablesHandler) handleGetTablePolicy(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req GetTablePolicyRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required")
|
||||||
|
return fmt.Errorf("missing required parameters")
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tablePath := getTablePath(bucketName, req.Namespace, req.Name)
|
||||||
|
var policy []byte
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
var readErr error
|
||||||
|
policy, readErr = h.getExtendedAttribute(client, tablePath, ExtendedKeyPolicy)
|
||||||
|
return readErr
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchPolicy, "table policy not found")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &GetTablePolicyResponse{
|
||||||
|
ResourcePolicy: string(policy),
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, resp)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleDeleteTablePolicy deletes the policy of a table
|
||||||
|
func (h *S3TablesHandler) handleDeleteTablePolicy(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req DeleteTablePolicyRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required")
|
||||||
|
return fmt.Errorf("missing required parameters")
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tablePath := getTablePath(bucketName, req.Namespace, req.Name)
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
return h.deleteExtendedAttribute(client, tablePath, ExtendedKeyPolicy)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
// Ignore error if policy doesn't exist
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, nil)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleTagResource adds tags to a resource
|
||||||
|
func (h *S3TablesHandler) handleTagResource(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req TagResourceRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.ResourceARN == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "resourceArn is required")
|
||||||
|
return fmt.Errorf("resourceArn is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(req.Tags) == 0 {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tags are required")
|
||||||
|
return fmt.Errorf("tags are required")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse resource ARN to determine if it's a bucket or table
|
||||||
|
resourcePath, extendedKey, err := h.resolveResourcePath(req.ResourceARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read existing tags and merge
|
||||||
|
existingTags := make(map[string]string)
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
data, err := h.getExtendedAttribute(client, resourcePath, extendedKey)
|
||||||
|
if err == nil {
|
||||||
|
json.Unmarshal(data, &existingTags)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// Merge new tags
|
||||||
|
for k, v := range req.Tags {
|
||||||
|
existingTags[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write merged tags
|
||||||
|
tagsBytes, _ := json.Marshal(existingTags)
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
return h.setExtendedAttribute(client, resourcePath, extendedKey, tagsBytes)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to tag resource")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, nil)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleListTagsForResource lists tags for a resource
|
||||||
|
func (h *S3TablesHandler) handleListTagsForResource(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req ListTagsForResourceRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.ResourceARN == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "resourceArn is required")
|
||||||
|
return fmt.Errorf("resourceArn is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
resourcePath, extendedKey, err := h.resolveResourcePath(req.ResourceARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tags := make(map[string]string)
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
data, err := h.getExtendedAttribute(client, resourcePath, extendedKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil // Return empty tags if not found
|
||||||
|
}
|
||||||
|
return json.Unmarshal(data, &tags)
|
||||||
|
})
|
||||||
|
|
||||||
|
resp := &ListTagsForResourceResponse{
|
||||||
|
Tags: tags,
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, resp)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleUntagResource removes tags from a resource
|
||||||
|
func (h *S3TablesHandler) handleUntagResource(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req UntagResourceRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.ResourceARN == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "resourceArn is required")
|
||||||
|
return fmt.Errorf("resourceArn is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(req.TagKeys) == 0 {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tagKeys are required")
|
||||||
|
return fmt.Errorf("tagKeys are required")
|
||||||
|
}
|
||||||
|
|
||||||
|
resourcePath, extendedKey, err := h.resolveResourcePath(req.ResourceARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read existing tags
|
||||||
|
tags := make(map[string]string)
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
data, err := h.getExtendedAttribute(client, resourcePath, extendedKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return json.Unmarshal(data, &tags)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Remove specified tags
|
||||||
|
for _, key := range req.TagKeys {
|
||||||
|
delete(tags, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write updated tags
|
||||||
|
tagsBytes, _ := json.Marshal(tags)
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
return h.setExtendedAttribute(client, resourcePath, extendedKey, tagsBytes)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to untag resource")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, nil)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// resolveResourcePath determines the resource path and extended attribute key from a resource ARN
|
||||||
|
func (h *S3TablesHandler) resolveResourcePath(resourceARN string) (path string, key string, err error) {
|
||||||
|
// Try parsing as table ARN first
|
||||||
|
bucketName, namespace, tableName, err := parseTableFromARN(resourceARN)
|
||||||
|
if err == nil {
|
||||||
|
return getTablePath(bucketName, namespace, tableName), ExtendedKeyTags, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try parsing as bucket ARN
|
||||||
|
bucketName, err = parseBucketNameFromARN(resourceARN)
|
||||||
|
if err == nil {
|
||||||
|
return getTableBucketPath(bucketName), ExtendedKeyTags, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return "", "", fmt.Errorf("invalid resource ARN: %s", resourceARN)
|
||||||
|
}
|
||||||
409
weed/s3api/s3tables/table.go
Normal file
409
weed/s3api/s3tables/table.go
Normal file
@@ -0,0 +1,409 @@
|
|||||||
|
package s3tables
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// handleCreateTable creates a new table in a namespace
|
||||||
|
func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req CreateTableRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.TableBucketARN == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
|
||||||
|
return fmt.Errorf("tableBucketARN is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.Namespace == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "namespace is required")
|
||||||
|
return fmt.Errorf("namespace is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.Name == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "name is required")
|
||||||
|
return fmt.Errorf("name is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.Format == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "format is required")
|
||||||
|
return fmt.Errorf("format is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate format
|
||||||
|
if req.Format != "ICEBERG" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "only ICEBERG format is supported")
|
||||||
|
return fmt.Errorf("invalid format")
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate table name
|
||||||
|
if len(req.Name) < 1 || len(req.Name) > 255 {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "table name must be between 1 and 255 characters")
|
||||||
|
return fmt.Errorf("invalid table name length")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if namespace exists
|
||||||
|
namespacePath := getNamespacePath(bucketName, req.Namespace)
|
||||||
|
var namespaceExists bool
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
_, err := h.getExtendedAttribute(client, namespacePath, ExtendedKeyMetadata)
|
||||||
|
namespaceExists = err == nil
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if !namespaceExists {
|
||||||
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", req.Namespace))
|
||||||
|
return fmt.Errorf("namespace not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
tablePath := getTablePath(bucketName, req.Namespace, req.Name)
|
||||||
|
|
||||||
|
// Check if table already exists
|
||||||
|
exists := false
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
_, err := h.getExtendedAttribute(client, tablePath, ExtendedKeyMetadata)
|
||||||
|
exists = err == nil
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if exists {
|
||||||
|
h.writeError(w, http.StatusConflict, ErrCodeTableAlreadyExists, fmt.Sprintf("table %s already exists", req.Name))
|
||||||
|
return fmt.Errorf("table already exists")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the table
|
||||||
|
now := time.Now()
|
||||||
|
versionToken := generateVersionToken()
|
||||||
|
|
||||||
|
metadata := &tableMetadataInternal{
|
||||||
|
Name: req.Name,
|
||||||
|
Namespace: req.Namespace,
|
||||||
|
Format: req.Format,
|
||||||
|
CreatedAt: now,
|
||||||
|
ModifiedAt: now,
|
||||||
|
OwnerID: h.accountID,
|
||||||
|
VersionToken: versionToken,
|
||||||
|
Schema: req.Metadata,
|
||||||
|
}
|
||||||
|
|
||||||
|
metadataBytes, _ := json.Marshal(metadata)
|
||||||
|
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
// Create table directory
|
||||||
|
if err := h.createDirectory(client, tablePath); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create data subdirectory for Iceberg files
|
||||||
|
dataPath := tablePath + "/data"
|
||||||
|
if err := h.createDirectory(client, dataPath); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set metadata as extended attribute
|
||||||
|
if err := h.setExtendedAttribute(client, tablePath, ExtendedKeyMetadata, metadataBytes); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set tags if provided
|
||||||
|
if len(req.Tags) > 0 {
|
||||||
|
tagsBytes, _ := json.Marshal(req.Tags)
|
||||||
|
if err := h.setExtendedAttribute(client, tablePath, ExtendedKeyTags, tagsBytes); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to create table")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tableARN := h.generateTableARN(bucketName, req.Namespace, req.Name)
|
||||||
|
|
||||||
|
resp := &CreateTableResponse{
|
||||||
|
TableARN: tableARN,
|
||||||
|
VersionToken: versionToken,
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, resp)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleGetTable gets details of a table
|
||||||
|
func (h *S3TablesHandler) handleGetTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req GetTableRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var bucketName, namespace, tableName string
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// Support getting by ARN or by bucket/namespace/name
|
||||||
|
if req.TableARN != "" {
|
||||||
|
bucketName, namespace, tableName, err = parseTableFromARN(req.TableARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else if req.TableBucketARN != "" && req.Namespace != "" && req.Name != "" {
|
||||||
|
bucketName, err = parseBucketNameFromARN(req.TableBucketARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
namespace = req.Namespace
|
||||||
|
tableName = req.Name
|
||||||
|
} else {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "either tableARN or (tableBucketARN, namespace, name) is required")
|
||||||
|
return fmt.Errorf("missing required parameters")
|
||||||
|
}
|
||||||
|
|
||||||
|
tablePath := getTablePath(bucketName, namespace, tableName)
|
||||||
|
|
||||||
|
var metadata tableMetadataInternal
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
data, err := h.getExtendedAttribute(client, tablePath, ExtendedKeyMetadata)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return json.Unmarshal(data, &metadata)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", tableName))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tableARN := h.generateTableARN(bucketName, namespace, tableName)
|
||||||
|
|
||||||
|
resp := &GetTableResponse{
|
||||||
|
Name: metadata.Name,
|
||||||
|
TableARN: tableARN,
|
||||||
|
Namespace: []string{metadata.Namespace},
|
||||||
|
Format: metadata.Format,
|
||||||
|
CreatedAt: metadata.CreatedAt,
|
||||||
|
ModifiedAt: metadata.ModifiedAt,
|
||||||
|
OwnerAccountID: metadata.OwnerID,
|
||||||
|
MetadataLocation: metadata.MetadataLocation,
|
||||||
|
VersionToken: metadata.VersionToken,
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, resp)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleListTables lists all tables in a namespace or bucket
|
||||||
|
func (h *S3TablesHandler) handleListTables(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req ListTablesRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.TableBucketARN == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
|
||||||
|
return fmt.Errorf("tableBucketARN is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
maxTables := req.MaxTables
|
||||||
|
if maxTables <= 0 {
|
||||||
|
maxTables = 100
|
||||||
|
}
|
||||||
|
|
||||||
|
var tables []TableSummary
|
||||||
|
|
||||||
|
// If namespace is specified, list tables in that namespace only
|
||||||
|
if req.Namespace != "" {
|
||||||
|
err = h.listTablesInNamespace(filerClient, bucketName, req.Namespace, req.Prefix, maxTables, &tables)
|
||||||
|
} else {
|
||||||
|
// List tables in all namespaces
|
||||||
|
err = h.listTablesInAllNamespaces(filerClient, bucketName, req.Prefix, maxTables, &tables)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
tables = []TableSummary{}
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &ListTablesResponse{
|
||||||
|
Tables: tables,
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, resp)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *S3TablesHandler) listTablesInNamespace(filerClient FilerClient, bucketName, namespace, prefix string, maxTables int, tables *[]TableSummary) error {
|
||||||
|
namespacePath := getNamespacePath(bucketName, namespace)
|
||||||
|
|
||||||
|
return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
resp, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
|
||||||
|
Directory: namespacePath,
|
||||||
|
Limit: uint32(maxTables),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
entry, err := resp.Recv()
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if entry.Entry == nil || !entry.Entry.IsDirectory {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip hidden entries
|
||||||
|
if strings.HasPrefix(entry.Entry.Name, ".") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply prefix filter
|
||||||
|
if prefix != "" && !strings.HasPrefix(entry.Entry.Name, prefix) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read table metadata from extended attribute
|
||||||
|
data, ok := entry.Entry.Extended[ExtendedKeyMetadata]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var metadata tableMetadataInternal
|
||||||
|
if err := json.Unmarshal(data, &metadata); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
tableARN := h.generateTableARN(bucketName, namespace, entry.Entry.Name)
|
||||||
|
|
||||||
|
*tables = append(*tables, TableSummary{
|
||||||
|
Name: metadata.Name,
|
||||||
|
TableARN: tableARN,
|
||||||
|
Namespace: []string{namespace},
|
||||||
|
CreatedAt: metadata.CreatedAt,
|
||||||
|
ModifiedAt: metadata.ModifiedAt,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *S3TablesHandler) listTablesInAllNamespaces(filerClient FilerClient, bucketName, prefix string, maxTables int, tables *[]TableSummary) error {
|
||||||
|
bucketPath := getTableBucketPath(bucketName)
|
||||||
|
|
||||||
|
return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
// List all namespaces first
|
||||||
|
resp, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
|
||||||
|
Directory: bucketPath,
|
||||||
|
Limit: 1000,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
entry, err := resp.Recv()
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if entry.Entry == nil || !entry.Entry.IsDirectory {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip hidden entries
|
||||||
|
if strings.HasPrefix(entry.Entry.Name, ".") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace := entry.Entry.Name
|
||||||
|
|
||||||
|
// List tables in this namespace
|
||||||
|
if err := h.listTablesInNamespace(filerClient, bucketName, namespace, prefix, maxTables-len(*tables), tables); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(*tables) >= maxTables {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleDeleteTable deletes a table from a namespace
|
||||||
|
func (h *S3TablesHandler) handleDeleteTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||||
|
var req DeleteTableRequest
|
||||||
|
if err := h.readRequestBody(r, &req); err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required")
|
||||||
|
return fmt.Errorf("missing required parameters")
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tablePath := getTablePath(bucketName, req.Namespace, req.Name)
|
||||||
|
|
||||||
|
// Check if table exists
|
||||||
|
var tableExists bool
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
_, err := h.getExtendedAttribute(client, tablePath, ExtendedKeyMetadata)
|
||||||
|
tableExists = err == nil
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if !tableExists {
|
||||||
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", req.Name))
|
||||||
|
return fmt.Errorf("table not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the table
|
||||||
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
return h.deleteDirectory(client, tablePath)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete table")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
h.writeJSON(w, http.StatusOK, nil)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
290
weed/s3api/s3tables/types.go
Normal file
290
weed/s3api/s3tables/types.go
Normal file
@@ -0,0 +1,290 @@
|
|||||||
|
package s3tables
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
// Table bucket types
|
||||||
|
|
||||||
|
type TableBucket struct {
|
||||||
|
ARN string `json:"arn"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
OwnerID string `json:"ownerAccountId"`
|
||||||
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CreateTableBucketRequest struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Tags map[string]string `json:"tags,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CreateTableBucketResponse struct {
|
||||||
|
ARN string `json:"arn"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetTableBucketRequest struct {
|
||||||
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetTableBucketResponse struct {
|
||||||
|
ARN string `json:"arn"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
OwnerAccountID string `json:"ownerAccountId"`
|
||||||
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListTableBucketsRequest struct {
|
||||||
|
Prefix string `json:"prefix,omitempty"`
|
||||||
|
ContinuationToken string `json:"continuationToken,omitempty"`
|
||||||
|
MaxBuckets int `json:"maxBuckets,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type TableBucketSummary struct {
|
||||||
|
ARN string `json:"arn"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListTableBucketsResponse struct {
|
||||||
|
TableBuckets []TableBucketSummary `json:"tableBuckets"`
|
||||||
|
ContinuationToken string `json:"continuationToken,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type DeleteTableBucketRequest struct {
|
||||||
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Table bucket policy types
|
||||||
|
|
||||||
|
type PutTableBucketPolicyRequest struct {
|
||||||
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
ResourcePolicy string `json:"resourcePolicy"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetTableBucketPolicyRequest struct {
|
||||||
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetTableBucketPolicyResponse struct {
|
||||||
|
ResourcePolicy string `json:"resourcePolicy"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type DeleteTableBucketPolicyRequest struct {
|
||||||
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Namespace types
|
||||||
|
|
||||||
|
type Namespace struct {
|
||||||
|
Namespace []string `json:"namespace"`
|
||||||
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
|
OwnerAccountID string `json:"ownerAccountId"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CreateNamespaceRequest struct {
|
||||||
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
Namespace []string `json:"namespace"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CreateNamespaceResponse struct {
|
||||||
|
Namespace []string `json:"namespace"`
|
||||||
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetNamespaceRequest struct {
|
||||||
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
Namespace string `json:"namespace"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetNamespaceResponse struct {
|
||||||
|
Namespace []string `json:"namespace"`
|
||||||
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
|
OwnerAccountID string `json:"ownerAccountId"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListNamespacesRequest struct {
|
||||||
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
Prefix string `json:"prefix,omitempty"`
|
||||||
|
ContinuationToken string `json:"continuationToken,omitempty"`
|
||||||
|
MaxNamespaces int `json:"maxNamespaces,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type NamespaceSummary struct {
|
||||||
|
Namespace []string `json:"namespace"`
|
||||||
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListNamespacesResponse struct {
|
||||||
|
Namespaces []NamespaceSummary `json:"namespaces"`
|
||||||
|
ContinuationToken string `json:"continuationToken,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type DeleteNamespaceRequest struct {
|
||||||
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
Namespace string `json:"namespace"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Table types
|
||||||
|
|
||||||
|
type IcebergSchemaField struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
Required bool `json:"required,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type IcebergSchema struct {
|
||||||
|
Fields []IcebergSchemaField `json:"fields"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type IcebergMetadata struct {
|
||||||
|
Schema IcebergSchema `json:"schema"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type TableMetadata struct {
|
||||||
|
Iceberg *IcebergMetadata `json:"iceberg,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Table struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
TableARN string `json:"tableARN"`
|
||||||
|
Namespace []string `json:"namespace"`
|
||||||
|
Format string `json:"format"`
|
||||||
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
|
ModifiedAt time.Time `json:"modifiedAt"`
|
||||||
|
OwnerAccountID string `json:"ownerAccountId"`
|
||||||
|
MetadataLocation string `json:"metadataLocation,omitempty"`
|
||||||
|
Metadata *TableMetadata `json:"metadata,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CreateTableRequest struct {
|
||||||
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
Namespace string `json:"namespace"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Format string `json:"format"`
|
||||||
|
Metadata *TableMetadata `json:"metadata,omitempty"`
|
||||||
|
Tags map[string]string `json:"tags,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type CreateTableResponse struct {
|
||||||
|
TableARN string `json:"tableARN"`
|
||||||
|
VersionToken string `json:"versionToken"`
|
||||||
|
MetadataLocation string `json:"metadataLocation,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetTableRequest struct {
|
||||||
|
TableBucketARN string `json:"tableBucketARN,omitempty"`
|
||||||
|
Namespace string `json:"namespace,omitempty"`
|
||||||
|
Name string `json:"name,omitempty"`
|
||||||
|
TableARN string `json:"tableARN,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetTableResponse struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
TableARN string `json:"tableARN"`
|
||||||
|
Namespace []string `json:"namespace"`
|
||||||
|
Format string `json:"format"`
|
||||||
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
|
ModifiedAt time.Time `json:"modifiedAt"`
|
||||||
|
OwnerAccountID string `json:"ownerAccountId"`
|
||||||
|
MetadataLocation string `json:"metadataLocation,omitempty"`
|
||||||
|
VersionToken string `json:"versionToken"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListTablesRequest struct {
|
||||||
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
Namespace string `json:"namespace,omitempty"`
|
||||||
|
Prefix string `json:"prefix,omitempty"`
|
||||||
|
ContinuationToken string `json:"continuationToken,omitempty"`
|
||||||
|
MaxTables int `json:"maxTables,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type TableSummary struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
TableARN string `json:"tableARN"`
|
||||||
|
Namespace []string `json:"namespace"`
|
||||||
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
|
ModifiedAt time.Time `json:"modifiedAt"`
|
||||||
|
MetadataLocation string `json:"metadataLocation,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListTablesResponse struct {
|
||||||
|
Tables []TableSummary `json:"tables"`
|
||||||
|
ContinuationToken string `json:"continuationToken,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type DeleteTableRequest struct {
|
||||||
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
Namespace string `json:"namespace"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
VersionToken string `json:"versionToken,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Table policy types
|
||||||
|
|
||||||
|
type PutTablePolicyRequest struct {
|
||||||
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
Namespace string `json:"namespace"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
ResourcePolicy string `json:"resourcePolicy"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetTablePolicyRequest struct {
|
||||||
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
Namespace string `json:"namespace"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type GetTablePolicyResponse struct {
|
||||||
|
ResourcePolicy string `json:"resourcePolicy"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type DeleteTablePolicyRequest struct {
|
||||||
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
Namespace string `json:"namespace"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tagging types
|
||||||
|
|
||||||
|
type TagResourceRequest struct {
|
||||||
|
ResourceARN string `json:"resourceArn"`
|
||||||
|
Tags map[string]string `json:"tags"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListTagsForResourceRequest struct {
|
||||||
|
ResourceARN string `json:"resourceArn"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ListTagsForResourceResponse struct {
|
||||||
|
Tags map[string]string `json:"tags"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type UntagResourceRequest struct {
|
||||||
|
ResourceARN string `json:"resourceArn"`
|
||||||
|
TagKeys []string `json:"tagKeys"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error types
|
||||||
|
|
||||||
|
type S3TablesError struct {
|
||||||
|
Type string `json:"__type"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *S3TablesError) Error() string {
|
||||||
|
return e.Message
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error codes
|
||||||
|
const (
|
||||||
|
ErrCodeBucketAlreadyExists = "BucketAlreadyExists"
|
||||||
|
ErrCodeBucketNotEmpty = "BucketNotEmpty"
|
||||||
|
ErrCodeNoSuchBucket = "NoSuchBucket"
|
||||||
|
ErrCodeNoSuchNamespace = "NoSuchNamespace"
|
||||||
|
ErrCodeNoSuchTable = "NoSuchTable"
|
||||||
|
ErrCodeNamespaceAlreadyExists = "NamespaceAlreadyExists"
|
||||||
|
ErrCodeNamespaceNotEmpty = "NamespaceNotEmpty"
|
||||||
|
ErrCodeTableAlreadyExists = "TableAlreadyExists"
|
||||||
|
ErrCodeAccessDenied = "AccessDenied"
|
||||||
|
ErrCodeInvalidRequest = "InvalidRequest"
|
||||||
|
ErrCodeInternalError = "InternalError"
|
||||||
|
ErrCodeNoSuchPolicy = "NoSuchPolicy"
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user