s3tables: support multi-level namespaces in parser/admin paths (#8273)
* s3tables: support multi-level namespace normalization * admin: handle namespace parsing errors centrally * admin: clean namespace validation duplication
This commit is contained in:
@@ -63,6 +63,19 @@ type tableBucketMetadata struct {
|
||||
|
||||
const s3TablesAdminListLimit = 1000
|
||||
|
||||
func parseNamespaceInput(namespace string) ([]string, error) {
|
||||
return s3tables.ParseNamespace(namespace)
|
||||
}
|
||||
|
||||
func (s *AdminServer) parseNamespaceFromGin(c *gin.Context, namespace string) ([]string, bool) {
|
||||
parts, err := parseNamespaceInput(namespace)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid namespace: " + err.Error()})
|
||||
return nil, false
|
||||
}
|
||||
return parts, true
|
||||
}
|
||||
|
||||
func newS3TablesManager() *s3tables.Manager {
|
||||
manager := s3tables.NewManager()
|
||||
manager.SetAccountID(s3_constants.AccountAdminId)
|
||||
@@ -158,7 +171,11 @@ func (s *AdminServer) GetS3TablesTablesData(ctx context.Context, bucketArn, name
|
||||
var resp s3tables.ListTablesResponse
|
||||
var ns []string
|
||||
if namespace != "" {
|
||||
ns = []string{namespace}
|
||||
parts, err := parseNamespaceInput(namespace)
|
||||
if err != nil {
|
||||
return S3TablesTablesData{}, err
|
||||
}
|
||||
ns = parts
|
||||
}
|
||||
req := &s3tables.ListTablesRequest{TableBucketARN: bucketArn, Namespace: ns, MaxTables: s3TablesAdminListLimit}
|
||||
if err := s.executeS3TablesOperation(ctx, "ListTables", req, &resp); err != nil {
|
||||
@@ -257,9 +274,13 @@ func (s *AdminServer) GetIcebergTablesData(ctx context.Context, catalogName, buc
|
||||
// GetIcebergTableDetailsData returns Iceberg table metadata and snapshot information.
|
||||
func (s *AdminServer) GetIcebergTableDetailsData(ctx context.Context, catalogName, bucketArn, namespace, tableName string) (IcebergTableDetailsData, error) {
|
||||
var resp s3tables.GetTableResponse
|
||||
namespaceParts, err := parseNamespaceInput(namespace)
|
||||
if err != nil {
|
||||
return IcebergTableDetailsData{}, err
|
||||
}
|
||||
req := &s3tables.GetTableRequest{
|
||||
TableBucketARN: bucketArn,
|
||||
Namespace: []string{namespace},
|
||||
Namespace: namespaceParts,
|
||||
Name: tableName,
|
||||
}
|
||||
if err := s.executeS3TablesOperation(ctx, "GetTable", req, &resp); err != nil {
|
||||
@@ -686,7 +707,11 @@ func (s *AdminServer) CreateS3TablesNamespace(c *gin.Context) {
|
||||
c.JSON(400, gin.H{"error": "bucket_arn and name are required"})
|
||||
return
|
||||
}
|
||||
createReq := &s3tables.CreateNamespaceRequest{TableBucketARN: req.BucketARN, Namespace: []string{req.Name}}
|
||||
namespaceParts, ok := s.parseNamespaceFromGin(c, req.Name)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
createReq := &s3tables.CreateNamespaceRequest{TableBucketARN: req.BucketARN, Namespace: namespaceParts}
|
||||
var resp s3tables.CreateNamespaceResponse
|
||||
if err := s.executeS3TablesOperation(c.Request.Context(), "CreateNamespace", createReq, &resp); err != nil {
|
||||
writeS3TablesError(c, err)
|
||||
@@ -705,7 +730,11 @@ func (s *AdminServer) DeleteS3TablesNamespace(c *gin.Context) {
|
||||
c.JSON(400, gin.H{"error": "bucket and name query parameters are required"})
|
||||
return
|
||||
}
|
||||
req := &s3tables.DeleteNamespaceRequest{TableBucketARN: bucketArn, Namespace: []string{namespace}}
|
||||
namespaceParts, ok := s.parseNamespaceFromGin(c, namespace)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
req := &s3tables.DeleteNamespaceRequest{TableBucketARN: bucketArn, Namespace: namespaceParts}
|
||||
if err := s.executeS3TablesOperation(c.Request.Context(), "DeleteNamespace", req, nil); err != nil {
|
||||
writeS3TablesError(c, err)
|
||||
return
|
||||
@@ -745,6 +774,10 @@ func (s *AdminServer) CreateS3TablesTable(c *gin.Context) {
|
||||
c.JSON(400, gin.H{"error": "bucket_arn, namespace, and name are required"})
|
||||
return
|
||||
}
|
||||
namespaceParts, ok := s.parseNamespaceFromGin(c, req.Namespace)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
format := req.Format
|
||||
if format == "" {
|
||||
format = "ICEBERG"
|
||||
@@ -757,7 +790,7 @@ func (s *AdminServer) CreateS3TablesTable(c *gin.Context) {
|
||||
}
|
||||
createReq := &s3tables.CreateTableRequest{
|
||||
TableBucketARN: req.BucketARN,
|
||||
Namespace: []string{req.Namespace},
|
||||
Namespace: namespaceParts,
|
||||
Name: req.Name,
|
||||
Format: format,
|
||||
Tags: req.Tags,
|
||||
@@ -780,7 +813,11 @@ func (s *AdminServer) DeleteS3TablesTable(c *gin.Context) {
|
||||
c.JSON(400, gin.H{"error": "bucket, namespace, and name query parameters are required"})
|
||||
return
|
||||
}
|
||||
req := &s3tables.DeleteTableRequest{TableBucketARN: bucketArn, Namespace: []string{namespace}, Name: name, VersionToken: version}
|
||||
namespaceParts, ok := s.parseNamespaceFromGin(c, namespace)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
req := &s3tables.DeleteTableRequest{TableBucketARN: bucketArn, Namespace: namespaceParts, Name: name, VersionToken: version}
|
||||
if err := s.executeS3TablesOperation(c.Request.Context(), "DeleteTable", req, nil); err != nil {
|
||||
writeS3TablesError(c, err)
|
||||
return
|
||||
@@ -853,7 +890,11 @@ func (s *AdminServer) PutS3TablesTablePolicy(c *gin.Context) {
|
||||
c.JSON(400, gin.H{"error": "bucket_arn, namespace, name, and policy are required"})
|
||||
return
|
||||
}
|
||||
putReq := &s3tables.PutTablePolicyRequest{TableBucketARN: req.BucketARN, Namespace: []string{req.Namespace}, Name: req.Name, ResourcePolicy: req.Policy}
|
||||
namespaceParts, ok := s.parseNamespaceFromGin(c, req.Namespace)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
putReq := &s3tables.PutTablePolicyRequest{TableBucketARN: req.BucketARN, Namespace: namespaceParts, Name: req.Name, ResourcePolicy: req.Policy}
|
||||
if err := s.executeS3TablesOperation(c.Request.Context(), "PutTablePolicy", putReq, nil); err != nil {
|
||||
writeS3TablesError(c, err)
|
||||
return
|
||||
@@ -869,7 +910,11 @@ func (s *AdminServer) GetS3TablesTablePolicy(c *gin.Context) {
|
||||
c.JSON(400, gin.H{"error": "bucket, namespace, and name query parameters are required"})
|
||||
return
|
||||
}
|
||||
getReq := &s3tables.GetTablePolicyRequest{TableBucketARN: bucketArn, Namespace: []string{namespace}, Name: name}
|
||||
namespaceParts, ok := s.parseNamespaceFromGin(c, namespace)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
getReq := &s3tables.GetTablePolicyRequest{TableBucketARN: bucketArn, Namespace: namespaceParts, Name: name}
|
||||
var resp s3tables.GetTablePolicyResponse
|
||||
if err := s.executeS3TablesOperation(c.Request.Context(), "GetTablePolicy", getReq, &resp); err != nil {
|
||||
writeS3TablesError(c, err)
|
||||
@@ -886,7 +931,11 @@ func (s *AdminServer) DeleteS3TablesTablePolicy(c *gin.Context) {
|
||||
c.JSON(400, gin.H{"error": "bucket, namespace, and name query parameters are required"})
|
||||
return
|
||||
}
|
||||
deleteReq := &s3tables.DeleteTablePolicyRequest{TableBucketARN: bucketArn, Namespace: []string{namespace}, Name: name}
|
||||
namespaceParts, ok := s.parseNamespaceFromGin(c, namespace)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
deleteReq := &s3tables.DeleteTablePolicyRequest{TableBucketARN: bucketArn, Namespace: namespaceParts, Name: name}
|
||||
if err := s.executeS3TablesOperation(c.Request.Context(), "DeleteTablePolicy", deleteReq, nil); err != nil {
|
||||
writeS3TablesError(c, err)
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user