Remove premature HTTP error writes from within WithFilerClient closure to prevent duplicate status code responses. Error handling is now consistently performed at the top level using isAuthError.
625 lines
19 KiB
Go
625 lines
19 KiB
Go
package s3tables
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
)
|
|
|
|
// handleCreateTable creates a new table in a namespace
|
|
func (h *S3TablesHandler) handleCreateTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
|
|
|
var req CreateTableRequest
|
|
if err := h.readRequestBody(r, &req); err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
if req.TableBucketARN == "" {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
|
|
return fmt.Errorf("tableBucketARN is required")
|
|
}
|
|
|
|
namespaceName, err := validateNamespace(req.Namespace)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
if req.Name == "" {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "name is required")
|
|
return fmt.Errorf("name is required")
|
|
}
|
|
|
|
if req.Format == "" {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "format is required")
|
|
return fmt.Errorf("format is required")
|
|
}
|
|
|
|
// Validate format
|
|
if req.Format != "ICEBERG" {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "only ICEBERG format is supported")
|
|
return fmt.Errorf("invalid format")
|
|
}
|
|
|
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
// Validate table name
|
|
tableName, err := validateTableName(req.Name)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
// Check if namespace exists
|
|
namespacePath := getNamespacePath(bucketName, namespaceName)
|
|
var namespaceMetadata namespaceMetadata
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
data, err := h.getExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyMetadata)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := json.Unmarshal(data, &namespaceMetadata); err != nil {
|
|
return fmt.Errorf("failed to unmarshal namespace metadata: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchNamespace, fmt.Sprintf("namespace %s not found", namespaceName))
|
|
} else {
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to check namespace: %v", err))
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Check ownership
|
|
if accountID := h.getAccountID(r); accountID != namespaceMetadata.OwnerAccountID {
|
|
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create table in this namespace")
|
|
return ErrAccessDenied
|
|
}
|
|
|
|
tablePath := getTablePath(bucketName, namespaceName, tableName)
|
|
|
|
// Check if table already exists
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
_, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata)
|
|
return err
|
|
})
|
|
|
|
if err == nil {
|
|
h.writeError(w, http.StatusConflict, ErrCodeTableAlreadyExists, fmt.Sprintf("table %s already exists", tableName))
|
|
return fmt.Errorf("table already exists")
|
|
} else if !errors.Is(err, filer_pb.ErrNotFound) {
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to check table: %v", err))
|
|
return err
|
|
}
|
|
|
|
// Create the table
|
|
now := time.Now()
|
|
versionToken := generateVersionToken()
|
|
|
|
metadata := &tableMetadataInternal{
|
|
Name: tableName,
|
|
Namespace: namespaceName,
|
|
Format: req.Format,
|
|
CreatedAt: now,
|
|
ModifiedAt: now,
|
|
OwnerAccountID: h.getAccountID(r),
|
|
VersionToken: versionToken,
|
|
Schema: req.Metadata,
|
|
}
|
|
|
|
metadataBytes, err := json.Marshal(metadata)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to marshal table metadata")
|
|
return fmt.Errorf("failed to marshal metadata: %w", err)
|
|
}
|
|
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
// Create table directory
|
|
if err := h.createDirectory(r.Context(), client, tablePath); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create data subdirectory for Iceberg files
|
|
dataPath := tablePath + "/data"
|
|
if err := h.createDirectory(r.Context(), client, dataPath); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Set metadata as extended attribute
|
|
if err := h.setExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata, metadataBytes); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Set tags if provided
|
|
if len(req.Tags) > 0 {
|
|
tagsBytes, err := json.Marshal(req.Tags)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal tags: %w", err)
|
|
}
|
|
if err := h.setExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyTags, tagsBytes); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to create table")
|
|
return err
|
|
}
|
|
|
|
tableARN := h.generateTableARN(r, bucketName, namespaceName+"/"+tableName)
|
|
|
|
resp := &CreateTableResponse{
|
|
TableARN: tableARN,
|
|
VersionToken: versionToken,
|
|
}
|
|
|
|
h.writeJSON(w, http.StatusOK, resp)
|
|
return nil
|
|
}
|
|
|
|
// handleGetTable gets details of a table
|
|
func (h *S3TablesHandler) handleGetTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
|
|
|
var req GetTableRequest
|
|
if err := h.readRequestBody(r, &req); err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
var bucketName, namespace, tableName string
|
|
var err error
|
|
|
|
// Support getting by ARN or by bucket/namespace/name
|
|
if req.TableARN != "" {
|
|
bucketName, namespace, tableName, err = parseTableFromARN(req.TableARN)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
} else if req.TableBucketARN != "" && len(req.Namespace) > 0 && req.Name != "" {
|
|
bucketName, err = parseBucketNameFromARN(req.TableBucketARN)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
namespace, err = validateNamespace(req.Namespace)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
tableName, err = validateTableName(req.Name)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
} else {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "either tableARN or (tableBucketARN, namespace, name) is required")
|
|
return fmt.Errorf("missing required parameters")
|
|
}
|
|
|
|
tablePath := getTablePath(bucketName, namespace, tableName)
|
|
|
|
var metadata tableMetadataInternal
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
data, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := json.Unmarshal(data, &metadata); err != nil {
|
|
return fmt.Errorf("failed to unmarshal table metadata: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", tableName))
|
|
} else {
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to get table: %v", err))
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Check ownership
|
|
if accountID := h.getAccountID(r); accountID != metadata.OwnerAccountID {
|
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", tableName))
|
|
return ErrAccessDenied
|
|
}
|
|
|
|
tableARN := h.generateTableARN(r, bucketName, namespace+"/"+tableName)
|
|
|
|
resp := &GetTableResponse{
|
|
Name: metadata.Name,
|
|
TableARN: tableARN,
|
|
Namespace: []string{metadata.Namespace},
|
|
Format: metadata.Format,
|
|
CreatedAt: metadata.CreatedAt,
|
|
ModifiedAt: metadata.ModifiedAt,
|
|
OwnerAccountID: metadata.OwnerAccountID,
|
|
MetadataLocation: metadata.MetadataLocation,
|
|
VersionToken: metadata.VersionToken,
|
|
}
|
|
|
|
h.writeJSON(w, http.StatusOK, resp)
|
|
return nil
|
|
}
|
|
|
|
// handleListTables lists all tables in a namespace or bucket
|
|
func (h *S3TablesHandler) handleListTables(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
|
|
|
var req ListTablesRequest
|
|
if err := h.readRequestBody(r, &req); err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
if req.TableBucketARN == "" {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN is required")
|
|
return fmt.Errorf("tableBucketARN is required")
|
|
}
|
|
|
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
maxTables := req.MaxTables
|
|
if maxTables <= 0 {
|
|
maxTables = 100
|
|
}
|
|
|
|
var tables []TableSummary
|
|
var paginationToken string
|
|
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
var err error
|
|
if len(req.Namespace) > 0 {
|
|
namespaceName, err := validateNamespace(req.Namespace)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check permission (check namespace ownership)
|
|
namespacePath := getNamespacePath(bucketName, namespaceName)
|
|
var nsMeta namespaceMetadata
|
|
data, err := h.getExtendedAttribute(r.Context(), client, namespacePath, ExtendedKeyMetadata)
|
|
if err != nil {
|
|
return err // Not Found handled by caller
|
|
}
|
|
if err := json.Unmarshal(data, &nsMeta); err != nil {
|
|
return err
|
|
}
|
|
if accountID := h.getAccountID(r); accountID != nsMeta.OwnerAccountID {
|
|
return ErrAccessDenied
|
|
}
|
|
|
|
tables, paginationToken, err = h.listTablesInNamespaceWithClient(r, client, bucketName, namespaceName, req.Prefix, req.ContinuationToken, maxTables)
|
|
} else {
|
|
// Check permission (check bucket ownership)
|
|
bucketPath := getTableBucketPath(bucketName)
|
|
var bucketMeta tableBucketMetadata
|
|
data, err := h.getExtendedAttribute(r.Context(), client, bucketPath, ExtendedKeyMetadata)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := json.Unmarshal(data, &bucketMeta); err != nil {
|
|
return err
|
|
}
|
|
if accountID := h.getAccountID(r); accountID != bucketMeta.OwnerAccountID {
|
|
return ErrAccessDenied
|
|
}
|
|
|
|
tables, paginationToken, err = h.listTablesInAllNamespaces(r, client, bucketName, req.Prefix, req.ContinuationToken, maxTables)
|
|
}
|
|
return err
|
|
})
|
|
|
|
if err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
// If the bucket or namespace directory is not found, return an empty result
|
|
tables = []TableSummary{}
|
|
paginationToken = ""
|
|
} else if isAuthError(err) {
|
|
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "Access Denied")
|
|
} else {
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to list tables: %v", err))
|
|
}
|
|
return err
|
|
}
|
|
|
|
resp := &ListTablesResponse{
|
|
Tables: tables,
|
|
ContinuationToken: paginationToken,
|
|
}
|
|
|
|
h.writeJSON(w, http.StatusOK, resp)
|
|
return nil
|
|
}
|
|
|
|
// listTablesInNamespaceWithClient lists tables in a specific namespace
|
|
func (h *S3TablesHandler) listTablesInNamespaceWithClient(r *http.Request, client filer_pb.SeaweedFilerClient, bucketName, namespaceName, prefix, continuationToken string, maxTables int) ([]TableSummary, string, error) {
|
|
namespacePath := getNamespacePath(bucketName, namespaceName)
|
|
return h.listTablesWithClient(r, client, namespacePath, bucketName, namespaceName, prefix, continuationToken, maxTables)
|
|
}
|
|
|
|
func (h *S3TablesHandler) listTablesWithClient(r *http.Request, client filer_pb.SeaweedFilerClient, dirPath, bucketName, namespaceName, prefix, continuationToken string, maxTables int) ([]TableSummary, string, error) {
|
|
var tables []TableSummary
|
|
lastFileName := continuationToken
|
|
ctx := r.Context()
|
|
|
|
for len(tables) < maxTables {
|
|
resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
|
|
Directory: dirPath,
|
|
Limit: uint32(maxTables * 2),
|
|
StartFromFileName: lastFileName,
|
|
InclusiveStartFrom: lastFileName == "" || lastFileName == continuationToken,
|
|
})
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
|
|
hasMore := false
|
|
for {
|
|
entry, respErr := resp.Recv()
|
|
if respErr != nil {
|
|
if respErr == io.EOF {
|
|
break
|
|
}
|
|
return nil, "", respErr
|
|
}
|
|
if entry.Entry == nil {
|
|
continue
|
|
}
|
|
|
|
// Skip the start item if it was included in the previous page
|
|
if len(tables) == 0 && continuationToken != "" && entry.Entry.Name == continuationToken {
|
|
continue
|
|
}
|
|
|
|
hasMore = true
|
|
lastFileName = entry.Entry.Name
|
|
|
|
if !entry.Entry.IsDirectory {
|
|
continue
|
|
}
|
|
|
|
// Skip hidden entries
|
|
if strings.HasPrefix(entry.Entry.Name, ".") {
|
|
continue
|
|
}
|
|
|
|
// Apply prefix filter
|
|
if prefix != "" && !strings.HasPrefix(entry.Entry.Name, prefix) {
|
|
continue
|
|
}
|
|
|
|
// Read table metadata from extended attribute
|
|
data, ok := entry.Entry.Extended[ExtendedKeyMetadata]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
var metadata tableMetadataInternal
|
|
if err := json.Unmarshal(data, &metadata); err != nil {
|
|
continue
|
|
}
|
|
|
|
if metadata.OwnerAccountID != h.getAccountID(r) {
|
|
continue
|
|
}
|
|
|
|
tableARN := h.generateTableARN(r, bucketName, namespaceName+"/"+entry.Entry.Name)
|
|
|
|
tables = append(tables, TableSummary{
|
|
Name: entry.Entry.Name,
|
|
TableARN: tableARN,
|
|
Namespace: []string{namespaceName},
|
|
CreatedAt: metadata.CreatedAt,
|
|
ModifiedAt: metadata.ModifiedAt,
|
|
})
|
|
|
|
if len(tables) >= maxTables {
|
|
return tables, lastFileName, nil
|
|
}
|
|
}
|
|
|
|
if !hasMore {
|
|
break
|
|
}
|
|
}
|
|
|
|
if len(tables) < maxTables {
|
|
lastFileName = ""
|
|
}
|
|
return tables, lastFileName, nil
|
|
}
|
|
|
|
func (h *S3TablesHandler) listTablesInAllNamespaces(r *http.Request, client filer_pb.SeaweedFilerClient, bucketName, prefix, continuationToken string, maxTables int) ([]TableSummary, string, error) {
|
|
bucketPath := getTableBucketPath(bucketName)
|
|
ctx := r.Context()
|
|
|
|
var continuationNamespace string
|
|
var startTableName string
|
|
if continuationToken != "" {
|
|
if parts := strings.SplitN(continuationToken, "/", 2); len(parts) == 2 {
|
|
continuationNamespace = parts[0]
|
|
startTableName = parts[1]
|
|
} else {
|
|
continuationNamespace = continuationToken
|
|
}
|
|
}
|
|
|
|
var tables []TableSummary
|
|
lastNamespace := continuationNamespace
|
|
for {
|
|
// List namespaces in batches
|
|
resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
|
|
Directory: bucketPath,
|
|
Limit: 100,
|
|
StartFromFileName: lastNamespace,
|
|
InclusiveStartFrom: (lastNamespace == continuationNamespace && startTableName != "") || (lastNamespace == "" && continuationNamespace == ""),
|
|
})
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
|
|
hasMore := false
|
|
for {
|
|
entry, respErr := resp.Recv()
|
|
if respErr != nil {
|
|
if respErr == io.EOF {
|
|
break
|
|
}
|
|
return nil, "", respErr
|
|
}
|
|
if entry.Entry == nil {
|
|
continue
|
|
}
|
|
|
|
hasMore = true
|
|
lastNamespace = entry.Entry.Name
|
|
|
|
if !entry.Entry.IsDirectory || strings.HasPrefix(entry.Entry.Name, ".") {
|
|
continue
|
|
}
|
|
|
|
namespace := entry.Entry.Name
|
|
tableNameFilter := ""
|
|
if namespace == continuationNamespace {
|
|
tableNameFilter = startTableName
|
|
}
|
|
|
|
nsTables, nsToken, err := h.listTablesInNamespaceWithClient(r, client, bucketName, namespace, prefix, tableNameFilter, maxTables-len(tables))
|
|
if err != nil {
|
|
glog.Warningf("S3Tables: failed to list tables in namespace %s/%s: %v", bucketName, namespace, err)
|
|
continue
|
|
}
|
|
|
|
tables = append(tables, nsTables...)
|
|
|
|
if namespace == continuationNamespace {
|
|
startTableName = ""
|
|
}
|
|
|
|
if len(tables) >= maxTables {
|
|
paginationToken := namespace + "/" + nsToken
|
|
if nsToken == "" {
|
|
// If we hit the limit exactly at the end of a namespace, the next token should be the next namespace
|
|
paginationToken = namespace // This will start from the NEXT namespace in the outer loop
|
|
}
|
|
return tables, paginationToken, nil
|
|
}
|
|
}
|
|
|
|
if !hasMore {
|
|
break
|
|
}
|
|
}
|
|
|
|
return tables, "", nil
|
|
}
|
|
|
|
// handleDeleteTable deletes a table from a namespace
|
|
func (h *S3TablesHandler) handleDeleteTable(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
|
|
|
|
var req DeleteTableRequest
|
|
if err := h.readRequestBody(r, &req); err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
if req.TableBucketARN == "" || len(req.Namespace) == 0 || req.Name == "" {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, "tableBucketARN, namespace, and name are required")
|
|
return fmt.Errorf("missing required parameters")
|
|
}
|
|
|
|
namespaceName, err := validateNamespace(req.Namespace)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
bucketName, err := parseBucketNameFromARN(req.TableBucketARN)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
tableName, err := validateTableName(req.Name)
|
|
if err != nil {
|
|
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
|
|
return err
|
|
}
|
|
|
|
tablePath := getTablePath(bucketName, namespaceName, tableName)
|
|
|
|
// Check if table exists and enforce VersionToken if provided
|
|
var metadata tableMetadataInternal
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
data, err := h.getExtendedAttribute(r.Context(), client, tablePath, ExtendedKeyMetadata)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := json.Unmarshal(data, &metadata); err != nil {
|
|
return fmt.Errorf("failed to unmarshal table metadata: %w", err)
|
|
}
|
|
|
|
if req.VersionToken != "" {
|
|
if metadata.VersionToken != req.VersionToken {
|
|
return ErrVersionTokenMismatch
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
if errors.Is(err, filer_pb.ErrNotFound) {
|
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", tableName))
|
|
} else if errors.Is(err, ErrVersionTokenMismatch) {
|
|
h.writeError(w, http.StatusConflict, ErrCodeConflict, "version token mismatch")
|
|
} else {
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, fmt.Sprintf("failed to check table: %v", err))
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Check ownership
|
|
if accountID := h.getAccountID(r); accountID != metadata.OwnerAccountID {
|
|
h.writeError(w, http.StatusNotFound, ErrCodeNoSuchTable, fmt.Sprintf("table %s not found", tableName))
|
|
return ErrAccessDenied
|
|
}
|
|
|
|
// Delete the table
|
|
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
return h.deleteDirectory(r.Context(), client, tablePath)
|
|
})
|
|
|
|
if err != nil {
|
|
h.writeError(w, http.StatusInternalServerError, ErrCodeInternalError, "failed to delete table")
|
|
return err
|
|
}
|
|
|
|
h.writeJSON(w, http.StatusOK, nil)
|
|
return nil
|
|
}
|