s3: change s3 tables to use RESTful API (#8169)
* s3: refactor s3 tables to use RESTful API * test/s3tables: guard empty namespaces * s3api: document tag parsing and validate get-table * s3api: limit S3Tables REST body size * Update weed/s3api/s3api_tables.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/s3api/s3tables/handler.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * s3api: accept encoded table bucket ARNs * s3api: validate namespaces and close body * s3api: match encoded table bucket ARNs * s3api: scope table bucket ARN routes * s3api: dedupe table bucket request builders * test/s3tables: allow list tables without namespace * s3api: validate table params and tag ARN * s3api: tighten tag handling and get-table params * s3api: loosen tag ARN route matching * Fix S3 Tables REST routing and tests * Adjust S3 Tables request parsing * Gate S3 Tables target routing * Avoid double decoding namespaces --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -2,15 +2,31 @@ package s3tables
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go-v2/aws"
|
||||||
|
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *S3TablesClient) doRequest(operation string, body interface{}) (*http.Response, error) {
|
func getFirstNamespace(namespace []string) (string, error) {
|
||||||
|
if len(namespace) == 0 {
|
||||||
|
return "", fmt.Errorf("namespace must not be empty")
|
||||||
|
}
|
||||||
|
return namespace[0], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *S3TablesClient) doRestRequest(method, path string, body interface{}) (*http.Response, error) {
|
||||||
var bodyBytes []byte
|
var bodyBytes []byte
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
@@ -21,19 +37,92 @@ func (c *S3TablesClient) doRequest(operation string, body interface{}) (*http.Re
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequest(http.MethodPost, c.endpoint, bytes.NewReader(bodyBytes))
|
req, err := http.NewRequest(method, c.endpoint+path, bytes.NewReader(bodyBytes))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
req.Header.Set("Content-Type", "application/x-amz-json-1.1")
|
if body != nil {
|
||||||
req.Header.Set("X-Amz-Target", "S3Tables."+operation)
|
req.Header.Set("Content-Type", "application/x-amz-json-1.1")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.signRequest(req, bodyBytes); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return c.client.Do(req)
|
return c.client.Do(req)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3TablesClient) doRequestAndDecode(operation string, reqBody interface{}, respBody interface{}) error {
|
func (c *S3TablesClient) doTargetRequest(operation string, body interface{}) (*http.Response, error) {
|
||||||
resp, err := c.doRequest(operation, reqBody)
|
var bodyBytes []byte
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if body != nil {
|
||||||
|
bodyBytes, err = json.Marshal(body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to marshal request body: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodPost, c.endpoint+"/", bytes.NewReader(bodyBytes))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req.URL.RawPath = "/"
|
||||||
|
req.Header.Set("Content-Type", "application/x-amz-json-1.1")
|
||||||
|
req.Header.Set("X-Amz-Target", "S3Tables."+operation)
|
||||||
|
|
||||||
|
if err := c.signRequest(req, bodyBytes); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.client.Do(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *S3TablesClient) doTargetRequestAndDecode(operation string, reqBody interface{}, respBody interface{}) error {
|
||||||
|
resp, err := c.doTargetRequest(operation, reqBody)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
bodyBytes, readErr := io.ReadAll(resp.Body)
|
||||||
|
if readErr != nil {
|
||||||
|
return fmt.Errorf("%s failed with status %d and could not read error response body: %v", operation, resp.StatusCode, readErr)
|
||||||
|
}
|
||||||
|
var errResp s3tables.S3TablesError
|
||||||
|
if err := json.Unmarshal(bodyBytes, &errResp); err != nil {
|
||||||
|
return fmt.Errorf("%s failed with status %d, could not decode error response: %v. Body: %s", operation, resp.StatusCode, err, string(bodyBytes))
|
||||||
|
}
|
||||||
|
return fmt.Errorf("%s failed: %s - %s", operation, errResp.Type, errResp.Message)
|
||||||
|
}
|
||||||
|
|
||||||
|
if respBody != nil {
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(respBody); err != nil {
|
||||||
|
return fmt.Errorf("failed to decode %s response: %w", operation, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *S3TablesClient) signRequest(req *http.Request, body []byte) error {
|
||||||
|
creds := aws.Credentials{
|
||||||
|
AccessKeyID: c.accessKey,
|
||||||
|
SecretAccessKey: c.secretKey,
|
||||||
|
}
|
||||||
|
if req.Host == "" {
|
||||||
|
req.Host = req.URL.Host
|
||||||
|
}
|
||||||
|
req.Header.Set("Host", req.URL.Host)
|
||||||
|
payloadHash := sha256.Sum256(body)
|
||||||
|
return v4.NewSigner().SignHTTP(context.Background(), creds, req, hex.EncodeToString(payloadHash[:]), "s3tables", c.region, time.Now())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *S3TablesClient) doRestRequestAndDecode(operation, method, path string, reqBody interface{}, respBody interface{}) error {
|
||||||
|
resp, err := c.doRestRequest(method, path, reqBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -68,233 +157,275 @@ func (c *S3TablesClient) CreateTableBucket(name string, tags map[string]string)
|
|||||||
Tags: tags,
|
Tags: tags,
|
||||||
}
|
}
|
||||||
var result s3tables.CreateTableBucketResponse
|
var result s3tables.CreateTableBucketResponse
|
||||||
if err := c.doRequestAndDecode("CreateTableBucket", req, &result); err != nil {
|
if err := c.doRestRequestAndDecode("CreateTableBucket", http.MethodPut, "/buckets", req, &result); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3TablesClient) GetTableBucket(arn string) (*s3tables.GetTableBucketResponse, error) {
|
func (c *S3TablesClient) GetTableBucket(arn string) (*s3tables.GetTableBucketResponse, error) {
|
||||||
req := &s3tables.GetTableBucketRequest{
|
path := "/buckets/" + url.PathEscape(arn)
|
||||||
TableBucketARN: arn,
|
|
||||||
}
|
|
||||||
var result s3tables.GetTableBucketResponse
|
var result s3tables.GetTableBucketResponse
|
||||||
if err := c.doRequestAndDecode("GetTableBucket", req, &result); err != nil {
|
if err := c.doRestRequestAndDecode("GetTableBucket", http.MethodGet, path, nil, &result); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3TablesClient) ListTableBuckets(prefix, continuationToken string, maxBuckets int) (*s3tables.ListTableBucketsResponse, error) {
|
func (c *S3TablesClient) ListTableBuckets(prefix, continuationToken string, maxBuckets int) (*s3tables.ListTableBucketsResponse, error) {
|
||||||
req := &s3tables.ListTableBucketsRequest{
|
query := url.Values{}
|
||||||
Prefix: prefix,
|
if prefix != "" {
|
||||||
ContinuationToken: continuationToken,
|
query.Set("prefix", prefix)
|
||||||
MaxBuckets: maxBuckets,
|
}
|
||||||
|
if continuationToken != "" {
|
||||||
|
query.Set("continuationToken", continuationToken)
|
||||||
|
}
|
||||||
|
if maxBuckets > 0 {
|
||||||
|
query.Set("maxBuckets", strconv.Itoa(maxBuckets))
|
||||||
|
}
|
||||||
|
path := "/buckets"
|
||||||
|
if encoded := query.Encode(); encoded != "" {
|
||||||
|
path = path + "?" + encoded
|
||||||
}
|
}
|
||||||
var result s3tables.ListTableBucketsResponse
|
var result s3tables.ListTableBucketsResponse
|
||||||
if err := c.doRequestAndDecode("ListTableBuckets", req, &result); err != nil {
|
if err := c.doRestRequestAndDecode("ListTableBuckets", http.MethodGet, path, nil, &result); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3TablesClient) DeleteTableBucket(arn string) error {
|
func (c *S3TablesClient) DeleteTableBucket(arn string) error {
|
||||||
req := &s3tables.DeleteTableBucketRequest{
|
path := "/buckets/" + url.PathEscape(arn)
|
||||||
TableBucketARN: arn,
|
return c.doRestRequestAndDecode("DeleteTableBucket", http.MethodDelete, path, nil, nil)
|
||||||
}
|
|
||||||
return c.doRequestAndDecode("DeleteTableBucket", req, nil)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Namespace operations
|
// Namespace operations
|
||||||
|
|
||||||
func (c *S3TablesClient) CreateNamespace(bucketARN string, namespace []string) (*s3tables.CreateNamespaceResponse, error) {
|
func (c *S3TablesClient) CreateNamespace(bucketARN string, namespace []string) (*s3tables.CreateNamespaceResponse, error) {
|
||||||
req := &s3tables.CreateNamespaceRequest{
|
if len(namespace) == 0 {
|
||||||
TableBucketARN: bucketARN,
|
return nil, fmt.Errorf("CreateNamespace requires namespace")
|
||||||
Namespace: namespace,
|
|
||||||
}
|
}
|
||||||
|
req := &s3tables.CreateNamespaceRequest{
|
||||||
|
Namespace: namespace,
|
||||||
|
}
|
||||||
|
path := "/namespaces/" + url.PathEscape(bucketARN)
|
||||||
var result s3tables.CreateNamespaceResponse
|
var result s3tables.CreateNamespaceResponse
|
||||||
if err := c.doRequestAndDecode("CreateNamespace", req, &result); err != nil {
|
if err := c.doRestRequestAndDecode("CreateNamespace", http.MethodPut, path, req, &result); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3TablesClient) GetNamespace(bucketARN string, namespace []string) (*s3tables.GetNamespaceResponse, error) {
|
func (c *S3TablesClient) GetNamespace(bucketARN string, namespace []string) (*s3tables.GetNamespaceResponse, error) {
|
||||||
req := &s3tables.GetNamespaceRequest{
|
name, err := getFirstNamespace(namespace)
|
||||||
TableBucketARN: bucketARN,
|
if err != nil {
|
||||||
Namespace: namespace,
|
return nil, fmt.Errorf("GetNamespace requires namespace: %w", err)
|
||||||
}
|
}
|
||||||
|
path := "/namespaces/" + url.PathEscape(bucketARN) + "/" + url.PathEscape(name)
|
||||||
var result s3tables.GetNamespaceResponse
|
var result s3tables.GetNamespaceResponse
|
||||||
if err := c.doRequestAndDecode("GetNamespace", req, &result); err != nil {
|
if err := c.doRestRequestAndDecode("GetNamespace", http.MethodGet, path, nil, &result); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3TablesClient) ListNamespaces(bucketARN, prefix, continuationToken string, maxNamespaces int) (*s3tables.ListNamespacesResponse, error) {
|
func (c *S3TablesClient) ListNamespaces(bucketARN, prefix, continuationToken string, maxNamespaces int) (*s3tables.ListNamespacesResponse, error) {
|
||||||
req := &s3tables.ListNamespacesRequest{
|
query := url.Values{}
|
||||||
TableBucketARN: bucketARN,
|
if prefix != "" {
|
||||||
Prefix: prefix,
|
query.Set("prefix", prefix)
|
||||||
ContinuationToken: continuationToken,
|
}
|
||||||
MaxNamespaces: maxNamespaces,
|
if continuationToken != "" {
|
||||||
|
query.Set("continuationToken", continuationToken)
|
||||||
|
}
|
||||||
|
if maxNamespaces > 0 {
|
||||||
|
query.Set("maxNamespaces", strconv.Itoa(maxNamespaces))
|
||||||
|
}
|
||||||
|
path := "/namespaces/" + url.PathEscape(bucketARN)
|
||||||
|
if encoded := query.Encode(); encoded != "" {
|
||||||
|
path = path + "?" + encoded
|
||||||
}
|
}
|
||||||
var result s3tables.ListNamespacesResponse
|
var result s3tables.ListNamespacesResponse
|
||||||
if err := c.doRequestAndDecode("ListNamespaces", req, &result); err != nil {
|
if err := c.doRestRequestAndDecode("ListNamespaces", http.MethodGet, path, nil, &result); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3TablesClient) DeleteNamespace(bucketARN string, namespace []string) error {
|
func (c *S3TablesClient) DeleteNamespace(bucketARN string, namespace []string) error {
|
||||||
req := &s3tables.DeleteNamespaceRequest{
|
name, err := getFirstNamespace(namespace)
|
||||||
TableBucketARN: bucketARN,
|
if err != nil {
|
||||||
Namespace: namespace,
|
return fmt.Errorf("DeleteNamespace requires namespace: %w", err)
|
||||||
}
|
}
|
||||||
return c.doRequestAndDecode("DeleteNamespace", req, nil)
|
path := "/namespaces/" + url.PathEscape(bucketARN) + "/" + url.PathEscape(name)
|
||||||
|
return c.doRestRequestAndDecode("DeleteNamespace", http.MethodDelete, path, nil, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Table operations
|
// Table operations
|
||||||
|
|
||||||
func (c *S3TablesClient) CreateTable(bucketARN string, namespace []string, name, format string, metadata *s3tables.TableMetadata, tags map[string]string) (*s3tables.CreateTableResponse, error) {
|
func (c *S3TablesClient) CreateTable(bucketARN string, namespace []string, name, format string, metadata *s3tables.TableMetadata, tags map[string]string) (*s3tables.CreateTableResponse, error) {
|
||||||
req := &s3tables.CreateTableRequest{
|
nameSpace, err := getFirstNamespace(namespace)
|
||||||
TableBucketARN: bucketARN,
|
if err != nil {
|
||||||
Namespace: namespace,
|
return nil, fmt.Errorf("CreateTable requires namespace: %w", err)
|
||||||
Name: name,
|
|
||||||
Format: format,
|
|
||||||
Metadata: metadata,
|
|
||||||
Tags: tags,
|
|
||||||
}
|
}
|
||||||
|
req := &s3tables.CreateTableRequest{
|
||||||
|
Name: name,
|
||||||
|
Format: format,
|
||||||
|
Metadata: metadata,
|
||||||
|
Tags: tags,
|
||||||
|
}
|
||||||
|
path := "/tables/" + url.PathEscape(bucketARN) + "/" + url.PathEscape(nameSpace)
|
||||||
var result s3tables.CreateTableResponse
|
var result s3tables.CreateTableResponse
|
||||||
if err := c.doRequestAndDecode("CreateTable", req, &result); err != nil {
|
if err := c.doRestRequestAndDecode("CreateTable", http.MethodPut, path, req, &result); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3TablesClient) GetTable(bucketARN string, namespace []string, name string) (*s3tables.GetTableResponse, error) {
|
func (c *S3TablesClient) GetTable(bucketARN string, namespace []string, name string) (*s3tables.GetTableResponse, error) {
|
||||||
req := &s3tables.GetTableRequest{
|
nameSpace, err := getFirstNamespace(namespace)
|
||||||
TableBucketARN: bucketARN,
|
if err != nil {
|
||||||
Namespace: namespace,
|
return nil, fmt.Errorf("GetTable requires namespace: %w", err)
|
||||||
Name: name,
|
|
||||||
}
|
}
|
||||||
|
query := url.Values{}
|
||||||
|
query.Set("tableBucketARN", bucketARN)
|
||||||
|
query.Set("namespace", nameSpace)
|
||||||
|
query.Set("name", name)
|
||||||
|
path := "/get-table?" + query.Encode()
|
||||||
var result s3tables.GetTableResponse
|
var result s3tables.GetTableResponse
|
||||||
if err := c.doRequestAndDecode("GetTable", req, &result); err != nil {
|
if err := c.doRestRequestAndDecode("GetTable", http.MethodGet, path, nil, &result); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3TablesClient) ListTables(bucketARN string, namespace []string, prefix, continuationToken string, maxTables int) (*s3tables.ListTablesResponse, error) {
|
func (c *S3TablesClient) ListTables(bucketARN string, namespace []string, prefix, continuationToken string, maxTables int) (*s3tables.ListTablesResponse, error) {
|
||||||
req := &s3tables.ListTablesRequest{
|
query := url.Values{}
|
||||||
TableBucketARN: bucketARN,
|
if len(namespace) > 0 {
|
||||||
Namespace: namespace,
|
nameSpace, err := getFirstNamespace(namespace)
|
||||||
Prefix: prefix,
|
if err != nil {
|
||||||
ContinuationToken: continuationToken,
|
return nil, fmt.Errorf("ListTables requires namespace: %w", err)
|
||||||
MaxTables: maxTables,
|
}
|
||||||
|
query.Set("namespace", nameSpace)
|
||||||
|
}
|
||||||
|
if prefix != "" {
|
||||||
|
query.Set("prefix", prefix)
|
||||||
|
}
|
||||||
|
if continuationToken != "" {
|
||||||
|
query.Set("continuationToken", continuationToken)
|
||||||
|
}
|
||||||
|
if maxTables > 0 {
|
||||||
|
query.Set("maxTables", strconv.Itoa(maxTables))
|
||||||
|
}
|
||||||
|
path := "/tables/" + url.PathEscape(bucketARN)
|
||||||
|
if encoded := query.Encode(); encoded != "" {
|
||||||
|
path = path + "?" + encoded
|
||||||
}
|
}
|
||||||
var result s3tables.ListTablesResponse
|
var result s3tables.ListTablesResponse
|
||||||
if err := c.doRequestAndDecode("ListTables", req, &result); err != nil {
|
if err := c.doRestRequestAndDecode("ListTables", http.MethodGet, path, nil, &result); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3TablesClient) DeleteTable(bucketARN string, namespace []string, name string) error {
|
func (c *S3TablesClient) DeleteTable(bucketARN string, namespace []string, name string) error {
|
||||||
req := &s3tables.DeleteTableRequest{
|
nameSpace, err := getFirstNamespace(namespace)
|
||||||
TableBucketARN: bucketARN,
|
if err != nil {
|
||||||
Namespace: namespace,
|
return fmt.Errorf("DeleteTable requires namespace: %w", err)
|
||||||
Name: name,
|
|
||||||
}
|
}
|
||||||
return c.doRequestAndDecode("DeleteTable", req, nil)
|
path := "/tables/" + url.PathEscape(bucketARN) + "/" + url.PathEscape(nameSpace) + "/" + url.PathEscape(name)
|
||||||
|
return c.doRestRequestAndDecode("DeleteTable", http.MethodDelete, path, nil, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Policy operations
|
// Policy operations
|
||||||
|
|
||||||
func (c *S3TablesClient) PutTableBucketPolicy(bucketARN, policy string) error {
|
func (c *S3TablesClient) PutTableBucketPolicy(bucketARN, policy string) error {
|
||||||
req := &s3tables.PutTableBucketPolicyRequest{
|
req := &s3tables.PutTableBucketPolicyRequest{
|
||||||
TableBucketARN: bucketARN,
|
|
||||||
ResourcePolicy: policy,
|
ResourcePolicy: policy,
|
||||||
}
|
}
|
||||||
return c.doRequestAndDecode("PutTableBucketPolicy", req, nil)
|
path := "/buckets/" + url.PathEscape(bucketARN) + "/policy"
|
||||||
|
return c.doRestRequestAndDecode("PutTableBucketPolicy", http.MethodPut, path, req, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3TablesClient) GetTableBucketPolicy(bucketARN string) (*s3tables.GetTableBucketPolicyResponse, error) {
|
func (c *S3TablesClient) GetTableBucketPolicy(bucketARN string) (*s3tables.GetTableBucketPolicyResponse, error) {
|
||||||
req := &s3tables.GetTableBucketPolicyRequest{
|
path := "/buckets/" + url.PathEscape(bucketARN) + "/policy"
|
||||||
TableBucketARN: bucketARN,
|
|
||||||
}
|
|
||||||
var result s3tables.GetTableBucketPolicyResponse
|
var result s3tables.GetTableBucketPolicyResponse
|
||||||
if err := c.doRequestAndDecode("GetTableBucketPolicy", req, &result); err != nil {
|
if err := c.doRestRequestAndDecode("GetTableBucketPolicy", http.MethodGet, path, nil, &result); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3TablesClient) DeleteTableBucketPolicy(bucketARN string) error {
|
func (c *S3TablesClient) DeleteTableBucketPolicy(bucketARN string) error {
|
||||||
req := &s3tables.DeleteTableBucketPolicyRequest{
|
path := "/buckets/" + url.PathEscape(bucketARN) + "/policy"
|
||||||
TableBucketARN: bucketARN,
|
return c.doRestRequestAndDecode("DeleteTableBucketPolicy", http.MethodDelete, path, nil, nil)
|
||||||
}
|
|
||||||
return c.doRequestAndDecode("DeleteTableBucketPolicy", req, nil)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Table Policy operations
|
// Table Policy operations
|
||||||
|
|
||||||
func (c *S3TablesClient) PutTablePolicy(bucketARN string, namespace []string, name, policy string) error {
|
func (c *S3TablesClient) PutTablePolicy(bucketARN string, namespace []string, name, policy string) error {
|
||||||
|
nameSpace, err := getFirstNamespace(namespace)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("PutTablePolicy requires namespace: %w", err)
|
||||||
|
}
|
||||||
req := &s3tables.PutTablePolicyRequest{
|
req := &s3tables.PutTablePolicyRequest{
|
||||||
TableBucketARN: bucketARN,
|
|
||||||
Namespace: namespace,
|
|
||||||
Name: name,
|
|
||||||
ResourcePolicy: policy,
|
ResourcePolicy: policy,
|
||||||
}
|
}
|
||||||
return c.doRequestAndDecode("PutTablePolicy", req, nil)
|
path := "/tables/" + url.PathEscape(bucketARN) + "/" + url.PathEscape(nameSpace) + "/" + url.PathEscape(name) + "/policy"
|
||||||
|
return c.doRestRequestAndDecode("PutTablePolicy", http.MethodPut, path, req, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3TablesClient) GetTablePolicy(bucketARN string, namespace []string, name string) (*s3tables.GetTablePolicyResponse, error) {
|
func (c *S3TablesClient) GetTablePolicy(bucketARN string, namespace []string, name string) (*s3tables.GetTablePolicyResponse, error) {
|
||||||
req := &s3tables.GetTablePolicyRequest{
|
nameSpace, err := getFirstNamespace(namespace)
|
||||||
TableBucketARN: bucketARN,
|
if err != nil {
|
||||||
Namespace: namespace,
|
return nil, fmt.Errorf("GetTablePolicy requires namespace: %w", err)
|
||||||
Name: name,
|
|
||||||
}
|
}
|
||||||
|
path := "/tables/" + url.PathEscape(bucketARN) + "/" + url.PathEscape(nameSpace) + "/" + url.PathEscape(name) + "/policy"
|
||||||
var result s3tables.GetTablePolicyResponse
|
var result s3tables.GetTablePolicyResponse
|
||||||
if err := c.doRequestAndDecode("GetTablePolicy", req, &result); err != nil {
|
if err := c.doRestRequestAndDecode("GetTablePolicy", http.MethodGet, path, nil, &result); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3TablesClient) DeleteTablePolicy(bucketARN string, namespace []string, name string) error {
|
func (c *S3TablesClient) DeleteTablePolicy(bucketARN string, namespace []string, name string) error {
|
||||||
req := &s3tables.DeleteTablePolicyRequest{
|
nameSpace, err := getFirstNamespace(namespace)
|
||||||
TableBucketARN: bucketARN,
|
if err != nil {
|
||||||
Namespace: namespace,
|
return fmt.Errorf("DeleteTablePolicy requires namespace: %w", err)
|
||||||
Name: name,
|
|
||||||
}
|
}
|
||||||
return c.doRequestAndDecode("DeleteTablePolicy", req, nil)
|
path := "/tables/" + url.PathEscape(bucketARN) + "/" + url.PathEscape(nameSpace) + "/" + url.PathEscape(name) + "/policy"
|
||||||
|
return c.doRestRequestAndDecode("DeleteTablePolicy", http.MethodDelete, path, nil, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tagging operations
|
// Tagging operations
|
||||||
|
|
||||||
func (c *S3TablesClient) TagResource(resourceARN string, tags map[string]string) error {
|
func (c *S3TablesClient) TagResource(resourceARN string, tags map[string]string) error {
|
||||||
req := &s3tables.TagResourceRequest{
|
req := &s3tables.TagResourceRequest{
|
||||||
ResourceARN: resourceARN,
|
Tags: tags,
|
||||||
Tags: tags,
|
|
||||||
}
|
}
|
||||||
return c.doRequestAndDecode("TagResource", req, nil)
|
path := "/tag/" + url.PathEscape(resourceARN)
|
||||||
|
return c.doRestRequestAndDecode("TagResource", http.MethodPost, path, req, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3TablesClient) ListTagsForResource(resourceARN string) (*s3tables.ListTagsForResourceResponse, error) {
|
func (c *S3TablesClient) ListTagsForResource(resourceARN string) (*s3tables.ListTagsForResourceResponse, error) {
|
||||||
req := &s3tables.ListTagsForResourceRequest{
|
path := "/tag/" + url.PathEscape(resourceARN)
|
||||||
ResourceARN: resourceARN,
|
|
||||||
}
|
|
||||||
var result s3tables.ListTagsForResourceResponse
|
var result s3tables.ListTagsForResourceResponse
|
||||||
if err := c.doRequestAndDecode("ListTagsForResource", req, &result); err != nil {
|
if err := c.doRestRequestAndDecode("ListTagsForResource", http.MethodGet, path, nil, &result); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *S3TablesClient) UntagResource(resourceARN string, tagKeys []string) error {
|
func (c *S3TablesClient) UntagResource(resourceARN string, tagKeys []string) error {
|
||||||
req := &s3tables.UntagResourceRequest{
|
if len(tagKeys) == 0 {
|
||||||
ResourceARN: resourceARN,
|
return fmt.Errorf("tagKeys cannot be empty")
|
||||||
TagKeys: tagKeys,
|
|
||||||
}
|
}
|
||||||
return c.doRequestAndDecode("UntagResource", req, nil)
|
query := url.Values{}
|
||||||
|
for _, key := range tagKeys {
|
||||||
|
query.Add("tagKeys", key)
|
||||||
|
}
|
||||||
|
path := "/tag/" + url.PathEscape(resourceARN)
|
||||||
|
if encoded := query.Encode(); encoded != "" {
|
||||||
|
path = path + "?" + encoded
|
||||||
|
}
|
||||||
|
return c.doRestRequestAndDecode("UntagResource", http.MethodDelete, path, nil, nil)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -64,6 +64,10 @@ func TestS3TablesIntegration(t *testing.T) {
|
|||||||
t.Run("Tagging", func(t *testing.T) {
|
t.Run("Tagging", func(t *testing.T) {
|
||||||
testTagging(t, client)
|
testTagging(t, client)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("TargetOperations", func(t *testing.T) {
|
||||||
|
testTargetOperations(t, client)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func testTableBucketLifecycle(t *testing.T, client *S3TablesClient) {
|
func testTableBucketLifecycle(t *testing.T, client *S3TablesClient) {
|
||||||
@@ -355,6 +359,125 @@ func testTagging(t *testing.T, client *S3TablesClient) {
|
|||||||
t.Logf("✓ Verified tag removal")
|
t.Logf("✓ Verified tag removal")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testTargetOperations(t *testing.T, client *S3TablesClient) {
|
||||||
|
bucketName := "test-target-bucket-" + randomString(8)
|
||||||
|
|
||||||
|
var createResp s3tables.CreateTableBucketResponse
|
||||||
|
err := client.doTargetRequestAndDecode("CreateTableBucket", &s3tables.CreateTableBucketRequest{
|
||||||
|
Name: bucketName,
|
||||||
|
}, &createResp)
|
||||||
|
require.NoError(t, err, "Failed to create table bucket via target")
|
||||||
|
defer client.doTargetRequestAndDecode("DeleteTableBucket", &s3tables.DeleteTableBucketRequest{
|
||||||
|
TableBucketARN: createResp.ARN,
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
var listResp s3tables.ListTableBucketsResponse
|
||||||
|
err = client.doTargetRequestAndDecode("ListTableBuckets", &s3tables.ListTableBucketsRequest{}, &listResp)
|
||||||
|
require.NoError(t, err, "Failed to list table buckets via target")
|
||||||
|
found := false
|
||||||
|
for _, b := range listResp.TableBuckets {
|
||||||
|
if b.Name == bucketName {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert.True(t, found, "Created bucket should appear in target list")
|
||||||
|
|
||||||
|
var getResp s3tables.GetTableBucketResponse
|
||||||
|
err = client.doTargetRequestAndDecode("GetTableBucket", &s3tables.GetTableBucketRequest{
|
||||||
|
TableBucketARN: createResp.ARN,
|
||||||
|
}, &getResp)
|
||||||
|
require.NoError(t, err, "Failed to get table bucket via target")
|
||||||
|
assert.Equal(t, bucketName, getResp.Name)
|
||||||
|
|
||||||
|
namespaceName := "target_ns"
|
||||||
|
var createNsResp s3tables.CreateNamespaceResponse
|
||||||
|
err = client.doTargetRequestAndDecode("CreateNamespace", &s3tables.CreateNamespaceRequest{
|
||||||
|
TableBucketARN: createResp.ARN,
|
||||||
|
Namespace: []string{namespaceName},
|
||||||
|
}, &createNsResp)
|
||||||
|
require.NoError(t, err, "Failed to create namespace via target")
|
||||||
|
defer client.doTargetRequestAndDecode("DeleteNamespace", &s3tables.DeleteNamespaceRequest{
|
||||||
|
TableBucketARN: createResp.ARN,
|
||||||
|
Namespace: []string{namespaceName},
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
var listNsResp s3tables.ListNamespacesResponse
|
||||||
|
err = client.doTargetRequestAndDecode("ListNamespaces", &s3tables.ListNamespacesRequest{
|
||||||
|
TableBucketARN: createResp.ARN,
|
||||||
|
}, &listNsResp)
|
||||||
|
require.NoError(t, err, "Failed to list namespaces via target")
|
||||||
|
|
||||||
|
tableName := "target_table"
|
||||||
|
var createTableResp s3tables.CreateTableResponse
|
||||||
|
err = client.doTargetRequestAndDecode("CreateTable", &s3tables.CreateTableRequest{
|
||||||
|
TableBucketARN: createResp.ARN,
|
||||||
|
Namespace: []string{namespaceName},
|
||||||
|
Name: tableName,
|
||||||
|
Format: "ICEBERG",
|
||||||
|
}, &createTableResp)
|
||||||
|
require.NoError(t, err, "Failed to create table via target")
|
||||||
|
defer client.doTargetRequestAndDecode("DeleteTable", &s3tables.DeleteTableRequest{
|
||||||
|
TableBucketARN: createResp.ARN,
|
||||||
|
Namespace: []string{namespaceName},
|
||||||
|
Name: tableName,
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
var listTablesResp s3tables.ListTablesResponse
|
||||||
|
err = client.doTargetRequestAndDecode("ListTables", &s3tables.ListTablesRequest{
|
||||||
|
TableBucketARN: createResp.ARN,
|
||||||
|
Namespace: []string{namespaceName},
|
||||||
|
}, &listTablesResp)
|
||||||
|
require.NoError(t, err, "Failed to list tables via target")
|
||||||
|
|
||||||
|
var getTableResp s3tables.GetTableResponse
|
||||||
|
err = client.doTargetRequestAndDecode("GetTable", &s3tables.GetTableRequest{
|
||||||
|
TableBucketARN: createResp.ARN,
|
||||||
|
Namespace: []string{namespaceName},
|
||||||
|
Name: tableName,
|
||||||
|
}, &getTableResp)
|
||||||
|
require.NoError(t, err, "Failed to get table via target")
|
||||||
|
assert.Equal(t, tableName, getTableResp.Name)
|
||||||
|
|
||||||
|
policy := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":"*","Action":"s3tables:*","Resource":"*"}]}`
|
||||||
|
err = client.doTargetRequestAndDecode("PutTableBucketPolicy", &s3tables.PutTableBucketPolicyRequest{
|
||||||
|
TableBucketARN: createResp.ARN,
|
||||||
|
ResourcePolicy: policy,
|
||||||
|
}, nil)
|
||||||
|
require.NoError(t, err, "Failed to put bucket policy via target")
|
||||||
|
|
||||||
|
var getPolicyResp s3tables.GetTableBucketPolicyResponse
|
||||||
|
err = client.doTargetRequestAndDecode("GetTableBucketPolicy", &s3tables.GetTableBucketPolicyRequest{
|
||||||
|
TableBucketARN: createResp.ARN,
|
||||||
|
}, &getPolicyResp)
|
||||||
|
require.NoError(t, err, "Failed to get bucket policy via target")
|
||||||
|
assert.Equal(t, policy, getPolicyResp.ResourcePolicy)
|
||||||
|
|
||||||
|
err = client.doTargetRequestAndDecode("DeleteTableBucketPolicy", &s3tables.DeleteTableBucketPolicyRequest{
|
||||||
|
TableBucketARN: createResp.ARN,
|
||||||
|
}, nil)
|
||||||
|
require.NoError(t, err, "Failed to delete bucket policy via target")
|
||||||
|
|
||||||
|
err = client.doTargetRequestAndDecode("TagResource", &s3tables.TagResourceRequest{
|
||||||
|
ResourceARN: createResp.ARN,
|
||||||
|
Tags: map[string]string{"Environment": "test"},
|
||||||
|
}, nil)
|
||||||
|
require.NoError(t, err, "Failed to tag resource via target")
|
||||||
|
|
||||||
|
var listTagsResp s3tables.ListTagsForResourceResponse
|
||||||
|
err = client.doTargetRequestAndDecode("ListTagsForResource", &s3tables.ListTagsForResourceRequest{
|
||||||
|
ResourceARN: createResp.ARN,
|
||||||
|
}, &listTagsResp)
|
||||||
|
require.NoError(t, err, "Failed to list tags via target")
|
||||||
|
assert.Equal(t, "test", listTagsResp.Tags["Environment"])
|
||||||
|
|
||||||
|
err = client.doTargetRequestAndDecode("UntagResource", &s3tables.UntagResourceRequest{
|
||||||
|
ResourceARN: createResp.ARN,
|
||||||
|
TagKeys: []string{"Environment"},
|
||||||
|
}, nil)
|
||||||
|
require.NoError(t, err, "Failed to untag resource via target")
|
||||||
|
}
|
||||||
|
|
||||||
// Helper functions
|
// Helper functions
|
||||||
|
|
||||||
// findAvailablePort finds an available port by binding to port 0
|
// findAvailablePort finds an available port by binding to port 0
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
"regexp"
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
@@ -284,8 +285,12 @@ func (iam *IdentityAccessManagement) verifyV4Signature(r *http.Request, shouldCh
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 8. Verify the signature, trying with X-Forwarded-Prefix first
|
// 8. Verify the signature, trying with X-Forwarded-Prefix first
|
||||||
|
pathForSignature := r.URL.EscapedPath()
|
||||||
|
if pathForSignature == "" {
|
||||||
|
pathForSignature = r.URL.Path
|
||||||
|
}
|
||||||
if forwardedPrefix := r.Header.Get("X-Forwarded-Prefix"); forwardedPrefix != "" {
|
if forwardedPrefix := r.Header.Get("X-Forwarded-Prefix"); forwardedPrefix != "" {
|
||||||
cleanedPath := buildPathWithForwardedPrefix(forwardedPrefix, r.URL.Path)
|
cleanedPath := buildPathWithForwardedPrefix(forwardedPrefix, pathForSignature)
|
||||||
calculatedSignature, errCode = verify(cleanedPath)
|
calculatedSignature, errCode = verify(cleanedPath)
|
||||||
if errCode == s3err.ErrNone {
|
if errCode == s3err.ErrNone {
|
||||||
return identity, cred, calculatedSignature, authInfo, s3err.ErrNone
|
return identity, cred, calculatedSignature, authInfo, s3err.ErrNone
|
||||||
@@ -293,12 +298,20 @@ func (iam *IdentityAccessManagement) verifyV4Signature(r *http.Request, shouldCh
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 9. Verify with the original path
|
// 9. Verify with the original path
|
||||||
calculatedSignature, errCode = verify(r.URL.Path)
|
calculatedSignature, errCode = verify(pathForSignature)
|
||||||
if errCode != s3err.ErrNone {
|
if errCode == s3err.ErrNone {
|
||||||
return nil, nil, "", nil, errCode
|
return identity, cred, calculatedSignature, authInfo, s3err.ErrNone
|
||||||
}
|
}
|
||||||
|
|
||||||
return identity, cred, calculatedSignature, authInfo, s3err.ErrNone
|
// 10. Retry with decoded path if signature used raw path encoding
|
||||||
|
if decodedPath, decodeErr := url.PathUnescape(pathForSignature); decodeErr == nil && decodedPath != pathForSignature {
|
||||||
|
calculatedSignature, errCode = verify(decodedPath)
|
||||||
|
if errCode == s3err.ErrNone {
|
||||||
|
return identity, cred, calculatedSignature, authInfo, s3err.ErrNone
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil, "", nil, errCode
|
||||||
}
|
}
|
||||||
|
|
||||||
// validateSTSSessionToken validates an STS session token and extracts temporary credentials
|
// validateSTSSessionToken validates an STS session token and extracts temporary credentials
|
||||||
@@ -464,7 +477,7 @@ func extractV4AuthInfoFromHeader(r *http.Request) (*v4AuthInfo, s3err.ErrorCode)
|
|||||||
}
|
}
|
||||||
|
|
||||||
hashedPayload := getContentSha256Cksum(r)
|
hashedPayload := getContentSha256Cksum(r)
|
||||||
if signV4Values.Credential.scope.service != "s3" && hashedPayload == emptySHA256 && r.Body != nil {
|
if signV4Values.Credential.scope.service != "s3" && signV4Values.Credential.scope.service != "s3tables" && hashedPayload == emptySHA256 && r.Body != nil {
|
||||||
var hashErr error
|
var hashErr error
|
||||||
hashedPayload, hashErr = streamHashRequestBody(r, iamRequestBodyLimit)
|
hashedPayload, hashErr = streamHashRequestBody(r, iamRequestBodyLimit)
|
||||||
if hashErr != nil {
|
if hashErr != nil {
|
||||||
|
|||||||
@@ -428,6 +428,11 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
|
|||||||
// API Router
|
// API Router
|
||||||
apiRouter := router.PathPrefix("/").Subrouter()
|
apiRouter := router.PathPrefix("/").Subrouter()
|
||||||
|
|
||||||
|
// S3 Tables API endpoint
|
||||||
|
// POST / with X-Amz-Target: S3Tables.<OperationName>
|
||||||
|
// plus REST-style endpoints for AWS CLI
|
||||||
|
s3a.registerS3TablesRoutes(apiRouter)
|
||||||
|
|
||||||
// Readiness Probe
|
// Readiness Probe
|
||||||
apiRouter.Methods(http.MethodGet).Path("/status").HandlerFunc(s3a.StatusHandler)
|
apiRouter.Methods(http.MethodGet).Path("/status").HandlerFunc(s3a.StatusHandler)
|
||||||
apiRouter.Methods(http.MethodGet).Path("/healthz").HandlerFunc(s3a.StatusHandler)
|
apiRouter.Methods(http.MethodGet).Path("/healthz").HandlerFunc(s3a.StatusHandler)
|
||||||
@@ -658,10 +663,6 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// S3 Tables API endpoint
|
|
||||||
// POST / with X-Amz-Target: S3Tables.<OperationName>
|
|
||||||
s3a.registerS3TablesRoutes(apiRouter)
|
|
||||||
|
|
||||||
// STS API endpoint for AssumeRoleWithWebIdentity
|
// STS API endpoint for AssumeRoleWithWebIdentity
|
||||||
// POST /?Action=AssumeRoleWithWebIdentity&WebIdentityToken=...
|
// POST /?Action=AssumeRoleWithWebIdentity&WebIdentityToken=...
|
||||||
if s3a.stsHandlers != nil {
|
if s3a.stsHandlers != nil {
|
||||||
|
|||||||
@@ -1,7 +1,13 @@
|
|||||||
package s3api
|
package s3api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
@@ -13,31 +19,6 @@ import (
|
|||||||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
|
||||||
)
|
)
|
||||||
|
|
||||||
// s3TablesActionsMap contains all valid S3 Tables operations for O(1) lookup
|
|
||||||
var s3TablesActionsMap = map[string]struct{}{
|
|
||||||
"CreateTableBucket": {},
|
|
||||||
"GetTableBucket": {},
|
|
||||||
"ListTableBuckets": {},
|
|
||||||
"DeleteTableBucket": {},
|
|
||||||
"PutTableBucketPolicy": {},
|
|
||||||
"GetTableBucketPolicy": {},
|
|
||||||
"DeleteTableBucketPolicy": {},
|
|
||||||
"CreateNamespace": {},
|
|
||||||
"GetNamespace": {},
|
|
||||||
"ListNamespaces": {},
|
|
||||||
"DeleteNamespace": {},
|
|
||||||
"CreateTable": {},
|
|
||||||
"GetTable": {},
|
|
||||||
"ListTables": {},
|
|
||||||
"DeleteTable": {},
|
|
||||||
"PutTablePolicy": {},
|
|
||||||
"GetTablePolicy": {},
|
|
||||||
"DeleteTablePolicy": {},
|
|
||||||
"TagResource": {},
|
|
||||||
"ListTagsForResource": {},
|
|
||||||
"UntagResource": {},
|
|
||||||
}
|
|
||||||
|
|
||||||
// S3TablesApiServer wraps the S3 Tables handler with S3ApiServer's filer access
|
// S3TablesApiServer wraps the S3 Tables handler with S3ApiServer's filer access
|
||||||
type S3TablesApiServer struct {
|
type S3TablesApiServer struct {
|
||||||
s3a *S3ApiServer
|
s3a *S3ApiServer
|
||||||
@@ -77,41 +58,570 @@ func (s3a *S3ApiServer) registerS3TablesRoutes(router *mux.Router) {
|
|||||||
// Create S3 Tables handler
|
// Create S3 Tables handler
|
||||||
s3TablesApi := NewS3TablesApiServer(s3a)
|
s3TablesApi := NewS3TablesApiServer(s3a)
|
||||||
|
|
||||||
// S3 Tables API uses POST with x-amz-target header
|
// REST-style S3 Tables API routes (used by AWS CLI)
|
||||||
// The AWS CLI sends requests with:
|
targetMatcher := func(r *http.Request, rm *mux.RouteMatch) bool {
|
||||||
// - Content-Type: application/x-amz-json-1.1
|
return strings.HasPrefix(r.Header.Get("X-Amz-Target"), "S3Tables.")
|
||||||
// - X-Amz-Target: S3Tables.<OperationName>
|
|
||||||
|
|
||||||
// Matcher function to identify S3 Tables requests
|
|
||||||
s3TablesMatcher := func(r *http.Request, rm *mux.RouteMatch) bool {
|
|
||||||
// Check for X-Amz-Target header with S3Tables prefix
|
|
||||||
target := r.Header.Get("X-Amz-Target")
|
|
||||||
if target != "" && strings.HasPrefix(target, "S3Tables.") {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Also check for specific S3 Tables actions in query string (CLI fallback)
|
|
||||||
action := r.URL.Query().Get("Action")
|
|
||||||
if isS3TablesAction(action) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
router.Methods(http.MethodPost).Path("/").MatcherFunc(targetMatcher).
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.S3TablesHandler), "S3Tables-Target"))
|
||||||
|
router.Methods(http.MethodPut).Path("/buckets").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("CreateTableBucket", buildCreateTableBucketRequest)), "S3Tables-CreateTableBucket"))
|
||||||
|
router.Methods(http.MethodGet).Path("/buckets").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("ListTableBuckets", buildListTableBucketsRequest)), "S3Tables-ListTableBuckets"))
|
||||||
|
router.Methods(http.MethodGet).Path("/buckets/{tableBucketARN:arn:aws:s3tables:[^/]+:[^/]+:bucket/[^/]+}").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("GetTableBucket", buildTableBucketArnRequest)), "S3Tables-GetTableBucket"))
|
||||||
|
router.Methods(http.MethodDelete).Path("/buckets/{tableBucketARN:arn:aws:s3tables:[^/]+:[^/]+:bucket/[^/]+}").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("DeleteTableBucket", buildDeleteTableBucketRequest)), "S3Tables-DeleteTableBucket"))
|
||||||
|
router.Methods(http.MethodPut).Path("/buckets/{tableBucketARN:arn:aws:s3tables:[^/]+:[^/]+:bucket/[^/]+}/policy").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("PutTableBucketPolicy", buildPutTableBucketPolicyRequest)), "S3Tables-PutTableBucketPolicy"))
|
||||||
|
router.Methods(http.MethodGet).Path("/buckets/{tableBucketARN:arn:aws:s3tables:[^/]+:[^/]+:bucket/[^/]+}/policy").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("GetTableBucketPolicy", buildGetTableBucketPolicyRequest)), "S3Tables-GetTableBucketPolicy"))
|
||||||
|
router.Methods(http.MethodDelete).Path("/buckets/{tableBucketARN:arn:aws:s3tables:[^/]+:[^/]+:bucket/[^/]+}/policy").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("DeleteTableBucketPolicy", buildDeleteTableBucketPolicyRequest)), "S3Tables-DeleteTableBucketPolicy"))
|
||||||
|
|
||||||
// Register the S3 Tables handler wrapped with IAM authentication
|
router.Methods(http.MethodPut).Path("/namespaces/{tableBucketARN:arn:aws:s3tables:[^/]+:[^/]+:bucket/[^/]+}").
|
||||||
router.Methods(http.MethodPost).Path("/").MatcherFunc(s3TablesMatcher).
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("CreateNamespace", buildCreateNamespaceRequest)), "S3Tables-CreateNamespace"))
|
||||||
HandlerFunc(track(s3a.authenticateS3Tables(func(w http.ResponseWriter, r *http.Request) {
|
router.Methods(http.MethodGet).Path("/namespaces/{tableBucketARN:arn:aws:s3tables:[^/]+:[^/]+:bucket/[^/]+}").
|
||||||
s3TablesApi.S3TablesHandler(w, r)
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("ListNamespaces", buildListNamespacesRequest)), "S3Tables-ListNamespaces"))
|
||||||
}), "S3Tables"))
|
router.Methods(http.MethodGet).Path("/namespaces/{tableBucketARN:arn:aws:s3tables:[^/]+:[^/]+:bucket/[^/]+}/{namespace}").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("GetNamespace", buildGetNamespaceRequest)), "S3Tables-GetNamespace"))
|
||||||
|
router.Methods(http.MethodDelete).Path("/namespaces/{tableBucketARN:arn:aws:s3tables:[^/]+:[^/]+:bucket/[^/]+}/{namespace}").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("DeleteNamespace", buildDeleteNamespaceRequest)), "S3Tables-DeleteNamespace"))
|
||||||
|
|
||||||
|
router.Methods(http.MethodPut).Path("/tables/{tableBucketARN:arn:aws:s3tables:[^/]+:[^/]+:bucket/[^/]+}/{namespace}").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("CreateTable", buildCreateTableRequest)), "S3Tables-CreateTable"))
|
||||||
|
router.Methods(http.MethodGet).Path("/tables/{tableBucketARN:arn:aws:s3tables:[^/]+:[^/]+:bucket/[^/]+}").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("ListTables", buildListTablesRequest)), "S3Tables-ListTables"))
|
||||||
|
router.Methods(http.MethodDelete).Path("/tables/{tableBucketARN:arn:aws:s3tables:[^/]+:[^/]+:bucket/[^/]+}/{namespace}/{name}").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("DeleteTable", buildDeleteTableRequest)), "S3Tables-DeleteTable"))
|
||||||
|
|
||||||
|
router.Methods(http.MethodPut).Path("/tables/{tableBucketARN:arn:aws:s3tables:[^/]+:[^/]+:bucket/[^/]+}/{namespace}/{name}/policy").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("PutTablePolicy", buildPutTablePolicyRequest)), "S3Tables-PutTablePolicy"))
|
||||||
|
router.Methods(http.MethodGet).Path("/tables/{tableBucketARN:arn:aws:s3tables:[^/]+:[^/]+:bucket/[^/]+}/{namespace}/{name}/policy").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("GetTablePolicy", buildGetTablePolicyRequest)), "S3Tables-GetTablePolicy"))
|
||||||
|
router.Methods(http.MethodDelete).Path("/tables/{tableBucketARN:arn:aws:s3tables:[^/]+:[^/]+:bucket/[^/]+}/{namespace}/{name}/policy").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("DeleteTablePolicy", buildDeleteTablePolicyRequest)), "S3Tables-DeleteTablePolicy"))
|
||||||
|
|
||||||
|
router.Methods(http.MethodPost).Path("/tag/{resourceArn:arn:aws:s3tables:.*}").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("TagResource", buildTagResourceRequest)), "S3Tables-TagResource"))
|
||||||
|
router.Methods(http.MethodGet).Path("/tag/{resourceArn:arn:aws:s3tables:.*}").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("ListTagsForResource", buildListTagsForResourceRequest)), "S3Tables-ListTagsForResource"))
|
||||||
|
router.Methods(http.MethodDelete).Path("/tag/{resourceArn:arn:aws:s3tables:.*}").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("UntagResource", buildUntagResourceRequest)), "S3Tables-UntagResource"))
|
||||||
|
|
||||||
|
router.Methods(http.MethodGet).Path("/get-table").
|
||||||
|
HandlerFunc(track(s3a.authenticateS3Tables(s3TablesApi.handleRestOperation("GetTable", buildGetTableRequest)), "S3Tables-GetTable"))
|
||||||
|
|
||||||
glog.V(1).Infof("S3 Tables API enabled")
|
glog.V(1).Infof("S3 Tables API enabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
// isS3TablesAction checks if the action is an S3 Tables operation using O(1) map lookup
|
type s3tablesRequestBuilder func(r *http.Request) (interface{}, error)
|
||||||
func isS3TablesAction(action string) bool {
|
|
||||||
_, ok := s3TablesActionsMap[action]
|
func (st *S3TablesApiServer) handleRestOperation(operation string, builder s3tablesRequestBuilder) http.HandlerFunc {
|
||||||
return ok
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
payload, err := builder(r)
|
||||||
|
if err != nil {
|
||||||
|
writeS3TablesError(w, http.StatusBadRequest, s3tables.ErrCodeInvalidRequest, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := setS3TablesRequestBody(r, payload); err != nil {
|
||||||
|
writeS3TablesError(w, http.StatusInternalServerError, s3tables.ErrCodeInternalError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.Header.Set("X-Amz-Target", "S3Tables."+operation)
|
||||||
|
st.S3TablesHandler(w, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func setS3TablesRequestBody(r *http.Request, payload interface{}) error {
|
||||||
|
body, err := json.Marshal(payload)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
r.Body = io.NopCloser(bytes.NewReader(body))
|
||||||
|
r.ContentLength = int64(len(body))
|
||||||
|
r.Header.Set("Content-Type", "application/x-amz-json-1.1")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readS3TablesJSONBody(r *http.Request, v interface{}) error {
|
||||||
|
if r.Body == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
defer r.Body.Close()
|
||||||
|
const maxRequestBodySize = 10 * 1024 * 1024
|
||||||
|
if r.ContentLength > maxRequestBodySize {
|
||||||
|
return fmt.Errorf("request body too large: exceeds maximum size of %d bytes", maxRequestBodySize)
|
||||||
|
}
|
||||||
|
limitedReader := io.LimitReader(r.Body, maxRequestBodySize+1)
|
||||||
|
body, err := io.ReadAll(limitedReader)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(body) > maxRequestBodySize {
|
||||||
|
return fmt.Errorf("request body too large: exceeds maximum size of %d bytes", maxRequestBodySize)
|
||||||
|
}
|
||||||
|
if len(bytes.TrimSpace(body)) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return json.Unmarshal(body, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeS3TablesError(w http.ResponseWriter, status int, code, message string) {
|
||||||
|
w.Header().Set("Content-Type", "application/x-amz-json-1.1")
|
||||||
|
w.WriteHeader(status)
|
||||||
|
errorResponse := map[string]interface{}{
|
||||||
|
"__type": code,
|
||||||
|
"message": message,
|
||||||
|
}
|
||||||
|
if err := json.NewEncoder(w).Encode(errorResponse); err != nil {
|
||||||
|
glog.Errorf("failed to encode S3Tables error response (status=%d, code=%s, message=%q): %v", status, code, message, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getDecodedPathParam(r *http.Request, name string) (string, error) {
|
||||||
|
value := mux.Vars(r)[name]
|
||||||
|
if value == "" {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
decoded, err := url.PathUnescape(value)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
if decoded == ".." || strings.Contains(decoded, "../") || strings.Contains(decoded, `..\`) || strings.Contains(decoded, "\x00") {
|
||||||
|
return "", fmt.Errorf("invalid path parameter %s", name)
|
||||||
|
}
|
||||||
|
return decoded, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildTableBucketRequestWithARN(r *http.Request, constructor func(string) interface{}) (interface{}, error) {
|
||||||
|
arn, err := getDecodedPathParam(r, "tableBucketARN")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if arn == "" {
|
||||||
|
return nil, fmt.Errorf("tableBucketARN is required")
|
||||||
|
}
|
||||||
|
if _, err := s3tables.ParseBucketNameFromARN(arn); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return constructor(arn), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseOptionalIntParam(r *http.Request, name string) (int, error) {
|
||||||
|
value := r.URL.Query().Get(name)
|
||||||
|
if value == "" {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
parsed, err := strconv.Atoi(value)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("%s must be an integer", name)
|
||||||
|
}
|
||||||
|
if parsed <= 0 {
|
||||||
|
return 0, fmt.Errorf("%s must be a positive integer", name)
|
||||||
|
}
|
||||||
|
return parsed, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseOptionalNamespace(r *http.Request, name string) []string {
|
||||||
|
value := r.URL.Query().Get(name)
|
||||||
|
if value == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if _, err := s3tables.ValidateNamespace([]string{value}); err != nil {
|
||||||
|
glog.V(1).Infof("invalid namespace value for %s: %q: %v", name, value, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return []string{value}
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseTagKeys handles tag key parsing from query parameters.
|
||||||
|
// If a single value contains commas, it is split into multiple keys (e.g., "key1,key2,key3").
|
||||||
|
// Otherwise, multiple query values are returned as-is.
|
||||||
|
func parseTagKeys(values []string) []string {
|
||||||
|
if len(values) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
out := make([]string, 0, len(values))
|
||||||
|
for _, value := range values {
|
||||||
|
for _, part := range strings.Split(value, ",") {
|
||||||
|
part = strings.TrimSpace(part)
|
||||||
|
if part != "" {
|
||||||
|
out = append(out, part)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(out) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildCreateTableBucketRequest(r *http.Request) (interface{}, error) {
|
||||||
|
var req s3tables.CreateTableBucketRequest
|
||||||
|
if err := readS3TablesJSONBody(r, &req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildListTableBucketsRequest(r *http.Request) (interface{}, error) {
|
||||||
|
maxBuckets, err := parseOptionalIntParam(r, "maxBuckets")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &s3tables.ListTableBucketsRequest{
|
||||||
|
Prefix: r.URL.Query().Get("prefix"),
|
||||||
|
ContinuationToken: r.URL.Query().Get("continuationToken"),
|
||||||
|
MaxBuckets: maxBuckets,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildTableBucketArnRequest(r *http.Request) (interface{}, error) {
|
||||||
|
return buildTableBucketRequestWithARN(r, func(arn string) interface{} {
|
||||||
|
return &s3tables.GetTableBucketRequest{TableBucketARN: arn}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildDeleteTableBucketRequest(r *http.Request) (interface{}, error) {
|
||||||
|
return buildTableBucketRequestWithARN(r, func(arn string) interface{} {
|
||||||
|
return &s3tables.DeleteTableBucketRequest{TableBucketARN: arn}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildPutTableBucketPolicyRequest(r *http.Request) (interface{}, error) {
|
||||||
|
var req s3tables.PutTableBucketPolicyRequest
|
||||||
|
if err := readS3TablesJSONBody(r, &req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
tableBucketARN, err := getDecodedPathParam(r, "tableBucketARN")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.TableBucketARN = tableBucketARN
|
||||||
|
return &req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildGetTableBucketPolicyRequest(r *http.Request) (interface{}, error) {
|
||||||
|
return buildTableBucketRequestWithARN(r, func(arn string) interface{} {
|
||||||
|
return &s3tables.GetTableBucketPolicyRequest{TableBucketARN: arn}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildDeleteTableBucketPolicyRequest(r *http.Request) (interface{}, error) {
|
||||||
|
return buildTableBucketRequestWithARN(r, func(arn string) interface{} {
|
||||||
|
return &s3tables.DeleteTableBucketPolicyRequest{TableBucketARN: arn}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildCreateNamespaceRequest(r *http.Request) (interface{}, error) {
|
||||||
|
var req s3tables.CreateNamespaceRequest
|
||||||
|
if err := readS3TablesJSONBody(r, &req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
tableBucketARN, err := getDecodedPathParam(r, "tableBucketARN")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.TableBucketARN = tableBucketARN
|
||||||
|
return &req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildListNamespacesRequest(r *http.Request) (interface{}, error) {
|
||||||
|
tableBucketARN, err := getDecodedPathParam(r, "tableBucketARN")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
maxNamespaces, err := parseOptionalIntParam(r, "maxNamespaces")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &s3tables.ListNamespacesRequest{
|
||||||
|
TableBucketARN: tableBucketARN,
|
||||||
|
Prefix: r.URL.Query().Get("prefix"),
|
||||||
|
ContinuationToken: r.URL.Query().Get("continuationToken"),
|
||||||
|
MaxNamespaces: maxNamespaces,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildGetNamespaceRequest(r *http.Request) (interface{}, error) {
|
||||||
|
tableBucketARN, err := getDecodedPathParam(r, "tableBucketARN")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
namespace, err := getDecodedPathParam(r, "namespace")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if namespace == "" {
|
||||||
|
return nil, fmt.Errorf("namespace is required")
|
||||||
|
}
|
||||||
|
if _, err := s3tables.ValidateNamespace([]string{namespace}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &s3tables.GetNamespaceRequest{
|
||||||
|
TableBucketARN: tableBucketARN,
|
||||||
|
Namespace: []string{namespace},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildDeleteNamespaceRequest(r *http.Request) (interface{}, error) {
|
||||||
|
tableBucketARN, err := getDecodedPathParam(r, "tableBucketARN")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
namespace, err := getDecodedPathParam(r, "namespace")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if namespace == "" {
|
||||||
|
return nil, fmt.Errorf("namespace is required")
|
||||||
|
}
|
||||||
|
if _, err := s3tables.ValidateNamespace([]string{namespace}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &s3tables.DeleteNamespaceRequest{
|
||||||
|
TableBucketARN: tableBucketARN,
|
||||||
|
Namespace: []string{namespace},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildCreateTableRequest(r *http.Request) (interface{}, error) {
|
||||||
|
var req s3tables.CreateTableRequest
|
||||||
|
if err := readS3TablesJSONBody(r, &req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
tableBucketARN, err := getDecodedPathParam(r, "tableBucketARN")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
namespace, err := getDecodedPathParam(r, "namespace")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if namespace == "" {
|
||||||
|
return nil, fmt.Errorf("namespace is required")
|
||||||
|
}
|
||||||
|
if _, err := s3tables.ValidateNamespace([]string{namespace}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.TableBucketARN = tableBucketARN
|
||||||
|
req.Namespace = []string{namespace}
|
||||||
|
return &req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildListTablesRequest(r *http.Request) (interface{}, error) {
|
||||||
|
tableBucketARN, err := getDecodedPathParam(r, "tableBucketARN")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
maxTables, err := parseOptionalIntParam(r, "maxTables")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &s3tables.ListTablesRequest{
|
||||||
|
TableBucketARN: tableBucketARN,
|
||||||
|
Namespace: parseOptionalNamespace(r, "namespace"),
|
||||||
|
Prefix: r.URL.Query().Get("prefix"),
|
||||||
|
ContinuationToken: r.URL.Query().Get("continuationToken"),
|
||||||
|
MaxTables: maxTables,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildGetTableRequest(r *http.Request) (interface{}, error) {
|
||||||
|
query := r.URL.Query()
|
||||||
|
tableARN := query.Get("tableArn")
|
||||||
|
req := &s3tables.GetTableRequest{
|
||||||
|
TableARN: tableARN,
|
||||||
|
}
|
||||||
|
if tableARN == "" {
|
||||||
|
req.TableBucketARN = query.Get("tableBucketARN")
|
||||||
|
req.Namespace = parseOptionalNamespace(r, "namespace")
|
||||||
|
req.Name = query.Get("name")
|
||||||
|
if req.TableBucketARN == "" || len(req.Namespace) == 0 || req.Name == "" {
|
||||||
|
return nil, fmt.Errorf("either tableArn or (tableBucketARN, namespace, name) must be provided")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildDeleteTableRequest(r *http.Request) (interface{}, error) {
|
||||||
|
tableBucketARN, err := getDecodedPathParam(r, "tableBucketARN")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
namespace, err := getDecodedPathParam(r, "namespace")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if namespace == "" {
|
||||||
|
return nil, fmt.Errorf("namespace is required")
|
||||||
|
}
|
||||||
|
if _, err := s3tables.ValidateNamespace([]string{namespace}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
name, err := getDecodedPathParam(r, "name")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if name == "" {
|
||||||
|
return nil, fmt.Errorf("name is required")
|
||||||
|
}
|
||||||
|
if _, err := s3tables.ValidateTableName(name); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &s3tables.DeleteTableRequest{
|
||||||
|
TableBucketARN: tableBucketARN,
|
||||||
|
Namespace: []string{namespace},
|
||||||
|
Name: name,
|
||||||
|
VersionToken: r.URL.Query().Get("versionToken"),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildPutTablePolicyRequest(r *http.Request) (interface{}, error) {
|
||||||
|
var req s3tables.PutTablePolicyRequest
|
||||||
|
if err := readS3TablesJSONBody(r, &req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
tableBucketARN, err := getDecodedPathParam(r, "tableBucketARN")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
namespace, err := getDecodedPathParam(r, "namespace")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if namespace == "" {
|
||||||
|
return nil, fmt.Errorf("namespace is required")
|
||||||
|
}
|
||||||
|
if _, err := s3tables.ValidateNamespace([]string{namespace}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
name, err := getDecodedPathParam(r, "name")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if name == "" {
|
||||||
|
return nil, fmt.Errorf("name is required")
|
||||||
|
}
|
||||||
|
if _, err := s3tables.ValidateTableName(name); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.TableBucketARN = tableBucketARN
|
||||||
|
req.Namespace = []string{namespace}
|
||||||
|
req.Name = name
|
||||||
|
return &req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildGetTablePolicyRequest(r *http.Request) (interface{}, error) {
|
||||||
|
tableBucketARN, err := getDecodedPathParam(r, "tableBucketARN")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
namespace, err := getDecodedPathParam(r, "namespace")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if namespace == "" {
|
||||||
|
return nil, fmt.Errorf("namespace is required")
|
||||||
|
}
|
||||||
|
if _, err := s3tables.ValidateNamespace([]string{namespace}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
name, err := getDecodedPathParam(r, "name")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if name == "" {
|
||||||
|
return nil, fmt.Errorf("name is required")
|
||||||
|
}
|
||||||
|
if _, err := s3tables.ValidateTableName(name); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &s3tables.GetTablePolicyRequest{
|
||||||
|
TableBucketARN: tableBucketARN,
|
||||||
|
Namespace: []string{namespace},
|
||||||
|
Name: name,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildDeleteTablePolicyRequest(r *http.Request) (interface{}, error) {
|
||||||
|
tableBucketARN, err := getDecodedPathParam(r, "tableBucketARN")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
namespace, err := getDecodedPathParam(r, "namespace")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if namespace == "" {
|
||||||
|
return nil, fmt.Errorf("namespace is required")
|
||||||
|
}
|
||||||
|
if _, err := s3tables.ValidateNamespace([]string{namespace}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
name, err := getDecodedPathParam(r, "name")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if name == "" {
|
||||||
|
return nil, fmt.Errorf("name is required")
|
||||||
|
}
|
||||||
|
if _, err := s3tables.ValidateTableName(name); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &s3tables.DeleteTablePolicyRequest{
|
||||||
|
TableBucketARN: tableBucketARN,
|
||||||
|
Namespace: []string{namespace},
|
||||||
|
Name: name,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildTagResourceRequest(r *http.Request) (interface{}, error) {
|
||||||
|
var req s3tables.TagResourceRequest
|
||||||
|
if err := readS3TablesJSONBody(r, &req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resourceARN, err := getDecodedPathParam(r, "resourceArn")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if resourceARN == "" {
|
||||||
|
return nil, fmt.Errorf("resourceArn is required")
|
||||||
|
}
|
||||||
|
req.ResourceARN = resourceARN
|
||||||
|
return &req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildListTagsForResourceRequest(r *http.Request) (interface{}, error) {
|
||||||
|
resourceARN, err := getDecodedPathParam(r, "resourceArn")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if resourceARN == "" {
|
||||||
|
return nil, fmt.Errorf("resourceArn is required")
|
||||||
|
}
|
||||||
|
return &s3tables.ListTagsForResourceRequest{
|
||||||
|
ResourceARN: resourceARN,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildUntagResourceRequest(r *http.Request) (interface{}, error) {
|
||||||
|
resourceARN, err := getDecodedPathParam(r, "resourceArn")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if resourceARN == "" {
|
||||||
|
return nil, fmt.Errorf("resourceArn is required")
|
||||||
|
}
|
||||||
|
tagKeys := parseTagKeys(r.URL.Query()["tagKeys"])
|
||||||
|
if len(tagKeys) == 0 {
|
||||||
|
return nil, fmt.Errorf("tagKeys is required for %s", resourceARN)
|
||||||
|
}
|
||||||
|
return &s3tables.UntagResourceRequest{
|
||||||
|
ResourceARN: resourceARN,
|
||||||
|
TagKeys: tagKeys,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// authenticateS3Tables wraps the handler with IAM authentication using AuthSignatureOnly
|
// authenticateS3Tables wraps the handler with IAM authentication using AuthSignatureOnly
|
||||||
|
|||||||
@@ -74,17 +74,16 @@ type FilerClient interface {
|
|||||||
|
|
||||||
// HandleRequest is the main entry point for S3 Tables API requests
|
// HandleRequest is the main entry point for S3 Tables API requests
|
||||||
func (h *S3TablesHandler) HandleRequest(w http.ResponseWriter, r *http.Request, filerClient FilerClient) {
|
func (h *S3TablesHandler) HandleRequest(w http.ResponseWriter, r *http.Request, filerClient FilerClient) {
|
||||||
// S3 Tables API uses x-amz-target header to specify the operation
|
operation := r.Header.Get("X-Amz-Target")
|
||||||
target := r.Header.Get("X-Amz-Target")
|
if operation != "" {
|
||||||
if target == "" {
|
if idx := strings.LastIndex(operation, "."); idx != -1 {
|
||||||
// Try to get from query parameter for CLI compatibility
|
operation = operation[idx+1:]
|
||||||
target = r.URL.Query().Get("Action")
|
}
|
||||||
}
|
}
|
||||||
|
if operation == "" {
|
||||||
// Extract operation name (e.g., "S3Tables.CreateTableBucket" -> "CreateTableBucket")
|
glog.V(1).Infof("S3Tables: missing X-Amz-Target header")
|
||||||
operation := target
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "Missing X-Amz-Target header")
|
||||||
if idx := strings.LastIndex(target, "."); idx != -1 {
|
return
|
||||||
operation = target[idx+1:]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(3).Infof("S3Tables: handling operation %s", operation)
|
glog.V(3).Infof("S3Tables: handling operation %s", operation)
|
||||||
|
|||||||
@@ -38,6 +38,11 @@ func parseBucketNameFromARN(arn string) (string, error) {
|
|||||||
return bucketName, nil
|
return bucketName, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ParseBucketNameFromARN is a wrapper to validate bucket ARN for other packages.
|
||||||
|
func ParseBucketNameFromARN(arn string) (string, error) {
|
||||||
|
return parseBucketNameFromARN(arn)
|
||||||
|
}
|
||||||
|
|
||||||
// parseTableFromARN extracts bucket name, namespace, and table name from ARN
|
// parseTableFromARN extracts bucket name, namespace, and table name from ARN
|
||||||
// ARN format: arn:aws:s3tables:{region}:{account}:bucket/{bucket-name}/table/{namespace}/{table-name}
|
// ARN format: arn:aws:s3tables:{region}:{account}:bucket/{bucket-name}/table/{namespace}/{table-name}
|
||||||
func parseTableFromARN(arn string) (bucketName, namespace, tableName string, err error) {
|
func parseTableFromARN(arn string) (bucketName, namespace, tableName string, err error) {
|
||||||
@@ -240,6 +245,11 @@ func validateNamespace(namespace []string) (string, error) {
|
|||||||
return name, nil
|
return name, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ValidateNamespace is a wrapper to validate namespace for other packages.
|
||||||
|
func ValidateNamespace(namespace []string) (string, error) {
|
||||||
|
return validateNamespace(namespace)
|
||||||
|
}
|
||||||
|
|
||||||
// validateTableName validates a table name
|
// validateTableName validates a table name
|
||||||
func validateTableName(name string) (string, error) {
|
func validateTableName(name string) (string, error) {
|
||||||
if len(name) < 1 || len(name) > 255 {
|
if len(name) < 1 || len(name) > 255 {
|
||||||
@@ -265,6 +275,11 @@ func validateTableName(name string) (string, error) {
|
|||||||
return name, nil
|
return name, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ValidateTableName is a wrapper to validate table name for other packages.
|
||||||
|
func ValidateTableName(name string) (string, error) {
|
||||||
|
return validateTableName(name)
|
||||||
|
}
|
||||||
|
|
||||||
// flattenNamespace joins namespace elements into a single string (using dots as per AWS S3 Tables)
|
// flattenNamespace joins namespace elements into a single string (using dots as per AWS S3 Tables)
|
||||||
func flattenNamespace(namespace []string) string {
|
func flattenNamespace(namespace []string) string {
|
||||||
if len(namespace) == 0 {
|
if len(namespace) == 0 {
|
||||||
|
|||||||
Reference in New Issue
Block a user