s3tables: add handler_ prefix to operation handler files
- Rename bucket_create.go → handler_bucket_create.go - Rename bucket_get_list_delete.go → handler_bucket_get_list_delete.go - Rename namespace.go → handler_namespace.go - Rename table.go → handler_table.go - Rename policy.go → handler_policy.go Improves file organization by clearly identifying handler implementations. No code changes, refactoring only.
This commit is contained in:
409
weed/s3api/s3tables/handler_table.go
Normal file
409
weed/s3api/s3tables/handler_table.go
Normal file
@@ -0,0 +1,409 @@
|
||||
package s3tables
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
||||
)
|
||||
|
||||
// handleCreateTable creates a new table in a namespace
|
||||
func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||
var req CreateTableRequest
|
||||
if err := h.readRequestBody(r, &req); err != nil {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
if req.TableBucketARN == "" {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
|
||||
return fmt.Errorf("tableBucketARN is required")
|
||||
}
|
||||
|
||||
if req.Namespace == "" {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "namespace is required")
|
||||
return fmt.Errorf("namespace is required")
|
||||
}
|
||||
|
||||
if req.Name == "" {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "name is required")
|
||||
return fmt.Errorf("name is required")
|
||||
}
|
||||
|
||||
if req.Format == "" {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "format is required")
|
||||
return fmt.Errorf("format is required")
|
||||
}
|
||||
|
||||
// Validate format
|
||||
if req.Format != "ICEBERG" {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "only ICEBERG format is supported")
|
||||
return fmt.Errorf("invalid format")
|
||||
}
|
||||
|
||||
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||
if err != nil {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// Validate table name
|
||||
if len(req.Name) < 1 || len(req.Name) > 255 {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "table name must be between 1 and 255 characters")
|
||||
return fmt.Errorf("invalid table name length")
|
||||
}
|
||||
|
||||
// Check if namespace exists
|
||||
namespacePath := getNamespacePath(bucketName, req.Namespace)
|
||||
var namespaceExists bool
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
_, err := h.getExtendedAttribute(client, namespacePath, ExtendedKeyMetadata)
|
||||
namespaceExists = err == nil
|
||||
return nil
|
||||
})
|
||||
|
||||
if !namespaceExists {
|
||||
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", req.Namespace))
|
||||
return fmt.Errorf("namespace not found")
|
||||
}
|
||||
|
||||
tablePath := getTablePath(bucketName, req.Namespace, req.Name)
|
||||
|
||||
// Check if table already exists
|
||||
exists := false
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
_, err := h.getExtendedAttribute(client, tablePath, ExtendedKeyMetadata)
|
||||
exists = err == nil
|
||||
return nil
|
||||
})
|
||||
|
||||
if exists {
|
||||
h.writeError(w, http.StatusConflict, ErrCodeTableAlreadyExists, fmt.Sprintf("table %s already exists", req.Name))
|
||||
return fmt.Errorf("table already exists")
|
||||
}
|
||||
|
||||
// Create the table
|
||||
now := time.Now()
|
||||
versionToken := generateVersionToken()
|
||||
|
||||
metadata := &tableMetadataInternal{
|
||||
Name: req.Name,
|
||||
Namespace: req.Namespace,
|
||||
Format: req.Format,
|
||||
CreatedAt: now,
|
||||
ModifiedAt: now,
|
||||
OwnerID: h.accountID,
|
||||
VersionToken: versionToken,
|
||||
Schema: req.Metadata,
|
||||
}
|
||||
|
||||
metadataBytes, _ := json.Marshal(metadata)
|
||||
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
// Create table directory
|
||||
if err := h.createDirectory(client, tablePath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create data subdirectory for Iceberg files
|
||||
dataPath := tablePath + "/data"
|
||||
if err := h.createDirectory(client, dataPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set metadata as extended attribute
|
||||
if err := h.setExtendedAttribute(client, tablePath, ExtendedKeyMetadata, metadataBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set tags if provided
|
||||
if len(req.Tags) > 0 {
|
||||
tagsBytes, _ := json.Marshal(req.Tags)
|
||||
if err := h.setExtendedAttribute(client, tablePath, ExtendedKeyTags, tagsBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to create table")
|
||||
return err
|
||||
}
|
||||
|
||||
tableARN := h.generateTableARN(bucketName, req.Namespace, req.Name)
|
||||
|
||||
resp := &CreateTableResponse{
|
||||
TableARN: tableARN,
|
||||
VersionToken: versionToken,
|
||||
}
|
||||
|
||||
h.writeJSON(w, http.StatusOK, resp)
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleGetTable gets details of a table
|
||||
func (h *S3TablesHandler) handleGetTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||
var req GetTableRequest
|
||||
if err := h.readRequestBody(r, &req); err != nil {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
var bucketName, namespace, tableName string
|
||||
var err error
|
||||
|
||||
// Support getting by ARN or by bucket/namespace/name
|
||||
if req.TableARN != "" {
|
||||
bucketName, namespace, tableName, err = parseTableFromARN(req.TableARN)
|
||||
if err != nil {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||
return err
|
||||
}
|
||||
} else if req.TableBucketARN != "" && req.Namespace != "" && req.Name != "" {
|
||||
bucketName, err = parseBucketNameFromARN(req.TableBucketARN)
|
||||
if err != nil {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||
return err
|
||||
}
|
||||
namespace = req.Namespace
|
||||
tableName = req.Name
|
||||
} else {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "either tableARN or (tableBucketARN, namespace, name) is required")
|
||||
return fmt.Errorf("missing required parameters")
|
||||
}
|
||||
|
||||
tablePath := getTablePath(bucketName, namespace, tableName)
|
||||
|
||||
var metadata tableMetadataInternal
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
data, err := h.getExtendedAttribute(client, tablePath, ExtendedKeyMetadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return json.Unmarshal(data, &metadata)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", tableName))
|
||||
return err
|
||||
}
|
||||
|
||||
tableARN := h.generateTableARN(bucketName, namespace, tableName)
|
||||
|
||||
resp := &GetTableResponse{
|
||||
Name: metadata.Name,
|
||||
TableARN: tableARN,
|
||||
Namespace: []string{metadata.Namespace},
|
||||
Format: metadata.Format,
|
||||
CreatedAt: metadata.CreatedAt,
|
||||
ModifiedAt: metadata.ModifiedAt,
|
||||
OwnerAccountID: metadata.OwnerID,
|
||||
MetadataLocation: metadata.MetadataLocation,
|
||||
VersionToken: metadata.VersionToken,
|
||||
}
|
||||
|
||||
h.writeJSON(w, http.StatusOK, resp)
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleListTables lists all tables in a namespace or bucket
|
||||
func (h *S3TablesHandler) handleListTables(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||
var req ListTablesRequest
|
||||
if err := h.readRequestBody(r, &req); err != nil {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
if req.TableBucketARN == "" {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
|
||||
return fmt.Errorf("tableBucketARN is required")
|
||||
}
|
||||
|
||||
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||
if err != nil {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
maxTables := req.MaxTables
|
||||
if maxTables <= 0 {
|
||||
maxTables = 100
|
||||
}
|
||||
|
||||
var tables []TableSummary
|
||||
|
||||
// If namespace is specified, list tables in that namespace only
|
||||
if req.Namespace != "" {
|
||||
err = h.listTablesInNamespace(filerClient, bucketName, req.Namespace, req.Prefix, maxTables, &tables)
|
||||
} else {
|
||||
// List tables in all namespaces
|
||||
err = h.listTablesInAllNamespaces(filerClient, bucketName, req.Prefix, maxTables, &tables)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
tables = []TableSummary{}
|
||||
}
|
||||
|
||||
resp := &ListTablesResponse{
|
||||
Tables: tables,
|
||||
}
|
||||
|
||||
h.writeJSON(w, http.StatusOK, resp)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *S3TablesHandler) listTablesInNamespace(filerClient FilerClient, bucketName, namespace, prefix string, maxTables int, tables *[]TableSummary) error {
|
||||
namespacePath := getNamespacePath(bucketName, namespace)
|
||||
|
||||
return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
resp, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
|
||||
Directory: namespacePath,
|
||||
Limit: uint32(maxTables),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
entry, err := resp.Recv()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if entry.Entry == nil || !entry.Entry.IsDirectory {
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip hidden entries
|
||||
if strings.HasPrefix(entry.Entry.Name, ".") {
|
||||
continue
|
||||
}
|
||||
|
||||
// Apply prefix filter
|
||||
if prefix != "" && !strings.HasPrefix(entry.Entry.Name, prefix) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Read table metadata from extended attribute
|
||||
data, ok := entry.Entry.Extended[ExtendedKeyMetadata]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
var metadata tableMetadataInternal
|
||||
if err := json.Unmarshal(data, &metadata); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
tableARN := h.generateTableARN(bucketName, namespace, entry.Entry.Name)
|
||||
|
||||
*tables = append(*tables, TableSummary{
|
||||
Name: metadata.Name,
|
||||
TableARN: tableARN,
|
||||
Namespace: []string{namespace},
|
||||
CreatedAt: metadata.CreatedAt,
|
||||
ModifiedAt: metadata.ModifiedAt,
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (h *S3TablesHandler) listTablesInAllNamespaces(filerClient FilerClient, bucketName, prefix string, maxTables int, tables *[]TableSummary) error {
|
||||
bucketPath := getTableBucketPath(bucketName)
|
||||
|
||||
return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
// List all namespaces first
|
||||
resp, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
|
||||
Directory: bucketPath,
|
||||
Limit: 1000,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
entry, err := resp.Recv()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if entry.Entry == nil || !entry.Entry.IsDirectory {
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip hidden entries
|
||||
if strings.HasPrefix(entry.Entry.Name, ".") {
|
||||
continue
|
||||
}
|
||||
|
||||
namespace := entry.Entry.Name
|
||||
|
||||
// List tables in this namespace
|
||||
if err := h.listTablesInNamespace(filerClient, bucketName, namespace, prefix, maxTables-len(*tables), tables); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(*tables) >= maxTables {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// handleDeleteTable deletes a table from a namespace
|
||||
func (h *S3TablesHandler) handleDeleteTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
||||
var req DeleteTableRequest
|
||||
if err := h.readRequestBody(r, &req); err != nil {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
if req.TableBucketARN == "" || req.Namespace == "" || req.Name == "" {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required")
|
||||
return fmt.Errorf("missing required parameters")
|
||||
}
|
||||
|
||||
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
||||
if err != nil {
|
||||
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
tablePath := getTablePath(bucketName, req.Namespace, req.Name)
|
||||
|
||||
// Check if table exists
|
||||
var tableExists bool
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
_, err := h.getExtendedAttribute(client, tablePath, ExtendedKeyMetadata)
|
||||
tableExists = err == nil
|
||||
return nil
|
||||
})
|
||||
|
||||
if !tableExists {
|
||||
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", req.Name))
|
||||
return fmt.Errorf("table not found")
|
||||
}
|
||||
|
||||
// Delete the table
|
||||
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||
return h.deleteDirectory(client, tablePath)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete table")
|
||||
return err
|
||||
}
|
||||
|
||||
h.writeJSON(w, http.StatusOK, nil)
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user