iceberg: persist namespace properties for create/get (#8276)
* iceberg: persist namespace properties via s3tables metadata * iceberg: simplify namespace properties normalization * s3tables: broaden namespace properties round-trip test * adjust logs * adjust logs
This commit is contained in:
@@ -401,6 +401,13 @@ func parsePagination(r *http.Request) (pageToken string, pageSize int, err error
|
|||||||
return pageToken, parsedPageSize, nil
|
return pageToken, parsedPageSize, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func normalizeNamespaceProperties(properties map[string]string) map[string]string {
|
||||||
|
if properties == nil {
|
||||||
|
return map[string]string{}
|
||||||
|
}
|
||||||
|
return properties
|
||||||
|
}
|
||||||
|
|
||||||
// handleConfig returns catalog configuration.
|
// handleConfig returns catalog configuration.
|
||||||
func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
@@ -482,36 +489,29 @@ func (s *Server) handleCreateNamespace(w http.ResponseWriter, r *http.Request) {
|
|||||||
createReq := &s3tables.CreateNamespaceRequest{
|
createReq := &s3tables.CreateNamespaceRequest{
|
||||||
TableBucketARN: bucketARN,
|
TableBucketARN: bucketARN,
|
||||||
Namespace: req.Namespace,
|
Namespace: req.Namespace,
|
||||||
|
Properties: normalizeNamespaceProperties(req.Properties),
|
||||||
}
|
}
|
||||||
var createResp s3tables.CreateNamespaceResponse
|
var createResp s3tables.CreateNamespaceResponse
|
||||||
|
|
||||||
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
mgrClient := s3tables.NewManagerClient(client)
|
mgrClient := s3tables.NewManagerClient(client)
|
||||||
glog.Errorf("Iceberg: handleCreateNamespace calling Execute with identityName=%s", identityName)
|
glog.V(2).Infof("Iceberg: handleCreateNamespace calling Execute with identityName=%s", identityName)
|
||||||
return s.tablesManager.Execute(r.Context(), mgrClient, "CreateNamespace", createReq, &createResp, identityName)
|
return s.tablesManager.Execute(r.Context(), mgrClient, "CreateNamespace", createReq, &createResp, identityName)
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Iceberg: handleCreateNamespace error: %v", err)
|
|
||||||
|
|
||||||
if strings.Contains(err.Error(), "already exists") {
|
if strings.Contains(err.Error(), "already exists") {
|
||||||
writeError(w, http.StatusConflict, "AlreadyExistsException", err.Error())
|
writeError(w, http.StatusConflict, "AlreadyExistsException", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
glog.Infof("Iceberg: CreateNamespace error: %v", err)
|
glog.Errorf("Iceberg: CreateNamespace error: %v", err)
|
||||||
writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error())
|
writeError(w, http.StatusInternalServerError, "InternalServerError", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Standardize property initialization for consistency with GetNamespace
|
|
||||||
props := req.Properties
|
|
||||||
if props == nil {
|
|
||||||
props = make(map[string]string)
|
|
||||||
}
|
|
||||||
|
|
||||||
result := CreateNamespaceResponse{
|
result := CreateNamespaceResponse{
|
||||||
Namespace: req.Namespace,
|
Namespace: Namespace(createResp.Namespace),
|
||||||
Properties: props,
|
Properties: normalizeNamespaceProperties(createResp.Properties),
|
||||||
}
|
}
|
||||||
writeJSON(w, http.StatusOK, result)
|
writeJSON(w, http.StatusOK, result)
|
||||||
}
|
}
|
||||||
@@ -554,8 +554,8 @@ func (s *Server) handleGetNamespace(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
result := GetNamespaceResponse{
|
result := GetNamespaceResponse{
|
||||||
Namespace: namespace,
|
Namespace: Namespace(getResp.Namespace),
|
||||||
Properties: make(map[string]string),
|
Properties: normalizeNamespaceProperties(getResp.Properties),
|
||||||
}
|
}
|
||||||
writeJSON(w, http.StatusOK, result)
|
writeJSON(w, http.StatusOK, result)
|
||||||
}
|
}
|
||||||
|
|||||||
29
weed/s3api/iceberg/iceberg_namespace_properties_test.go
Normal file
29
weed/s3api/iceberg/iceberg_namespace_properties_test.go
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
package iceberg
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestNormalizeNamespacePropertiesNil(t *testing.T) {
|
||||||
|
properties := normalizeNamespaceProperties(nil)
|
||||||
|
if properties == nil {
|
||||||
|
t.Fatalf("normalizeNamespaceProperties(nil) returned nil map")
|
||||||
|
}
|
||||||
|
if len(properties) != 0 {
|
||||||
|
t.Fatalf("normalizeNamespaceProperties(nil) length = %d, want 0", len(properties))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNormalizeNamespacePropertiesReturnsInputWhenSet(t *testing.T) {
|
||||||
|
input := map[string]string{
|
||||||
|
"owner": "analytics",
|
||||||
|
}
|
||||||
|
|
||||||
|
properties := normalizeNamespaceProperties(input)
|
||||||
|
if properties["owner"] != "analytics" {
|
||||||
|
t.Fatalf("normalized properties value = %q, want %q", properties["owner"], "analytics")
|
||||||
|
}
|
||||||
|
|
||||||
|
input["owner"] = "updated"
|
||||||
|
if properties["owner"] != "updated" {
|
||||||
|
t.Fatalf("normalizeNamespaceProperties should reuse the input map when non-nil")
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -147,6 +147,7 @@ func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.R
|
|||||||
Namespace: req.Namespace,
|
Namespace: req.Namespace,
|
||||||
CreatedAt: now,
|
CreatedAt: now,
|
||||||
OwnerAccountID: bucketMetadata.OwnerAccountID,
|
OwnerAccountID: bucketMetadata.OwnerAccountID,
|
||||||
|
Properties: req.Properties,
|
||||||
}
|
}
|
||||||
|
|
||||||
metadataBytes, err := json.Marshal(metadata)
|
metadataBytes, err := json.Marshal(metadata)
|
||||||
@@ -177,6 +178,7 @@ func (h *S3TablesHandler) handleCreateNamespace(w http.ResponseWriter, r *http.R
|
|||||||
resp := &CreateNamespaceResponse{
|
resp := &CreateNamespaceResponse{
|
||||||
Namespace: req.Namespace,
|
Namespace: req.Namespace,
|
||||||
TableBucketARN: req.TableBucketARN,
|
TableBucketARN: req.TableBucketARN,
|
||||||
|
Properties: req.Properties,
|
||||||
}
|
}
|
||||||
|
|
||||||
h.writeJSON(w, http.StatusOK, resp)
|
h.writeJSON(w, http.StatusOK, resp)
|
||||||
@@ -265,6 +267,7 @@ func (h *S3TablesHandler) handleGetNamespace(w http.ResponseWriter, r *http.Requ
|
|||||||
Namespace: metadata.Namespace,
|
Namespace: metadata.Namespace,
|
||||||
CreatedAt: metadata.CreatedAt,
|
CreatedAt: metadata.CreatedAt,
|
||||||
OwnerAccountID: metadata.OwnerAccountID,
|
OwnerAccountID: metadata.OwnerAccountID,
|
||||||
|
Properties: metadata.Properties,
|
||||||
}
|
}
|
||||||
|
|
||||||
h.writeJSON(w, http.StatusOK, resp)
|
h.writeJSON(w, http.StatusOK, resp)
|
||||||
|
|||||||
@@ -77,19 +77,22 @@ type DeleteTableBucketPolicyRequest struct {
|
|||||||
// Namespace types
|
// Namespace types
|
||||||
|
|
||||||
type Namespace struct {
|
type Namespace struct {
|
||||||
Namespace []string `json:"namespace"`
|
Namespace []string `json:"namespace"`
|
||||||
CreatedAt time.Time `json:"createdAt"`
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
OwnerAccountID string `json:"ownerAccountId"`
|
OwnerAccountID string `json:"ownerAccountId"`
|
||||||
|
Properties map[string]string `json:"properties,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type CreateNamespaceRequest struct {
|
type CreateNamespaceRequest struct {
|
||||||
TableBucketARN string `json:"tableBucketARN"`
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
Namespace []string `json:"namespace"`
|
Namespace []string `json:"namespace"`
|
||||||
|
Properties map[string]string `json:"properties,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type CreateNamespaceResponse struct {
|
type CreateNamespaceResponse struct {
|
||||||
Namespace []string `json:"namespace"`
|
Namespace []string `json:"namespace"`
|
||||||
TableBucketARN string `json:"tableBucketARN"`
|
TableBucketARN string `json:"tableBucketARN"`
|
||||||
|
Properties map[string]string `json:"properties,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type GetNamespaceRequest struct {
|
type GetNamespaceRequest struct {
|
||||||
@@ -98,9 +101,10 @@ type GetNamespaceRequest struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type GetNamespaceResponse struct {
|
type GetNamespaceResponse struct {
|
||||||
Namespace []string `json:"namespace"`
|
Namespace []string `json:"namespace"`
|
||||||
CreatedAt time.Time `json:"createdAt"`
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
OwnerAccountID string `json:"ownerAccountId"`
|
OwnerAccountID string `json:"ownerAccountId"`
|
||||||
|
Properties map[string]string `json:"properties,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ListNamespacesRequest struct {
|
type ListNamespacesRequest struct {
|
||||||
|
|||||||
@@ -128,9 +128,10 @@ type tableBucketMetadata struct {
|
|||||||
|
|
||||||
// namespaceMetadata stores metadata for a namespace
|
// namespaceMetadata stores metadata for a namespace
|
||||||
type namespaceMetadata struct {
|
type namespaceMetadata struct {
|
||||||
Namespace []string `json:"namespace"`
|
Namespace []string `json:"namespace"`
|
||||||
CreatedAt time.Time `json:"createdAt"`
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
OwnerAccountID string `json:"ownerAccountId"`
|
OwnerAccountID string `json:"ownerAccountId"`
|
||||||
|
Properties map[string]string `json:"properties,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// tableMetadataInternal stores metadata for a table
|
// tableMetadataInternal stores metadata for a table
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
package s3tables
|
package s3tables
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
@@ -124,3 +126,61 @@ func TestExpandNamespace(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNamespaceMetadataPropertiesRoundTrip(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
metadata namespaceMetadata
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "with properties",
|
||||||
|
metadata: namespaceMetadata{
|
||||||
|
Namespace: []string{"analytics"},
|
||||||
|
Properties: map[string]string{"owner": "finance"},
|
||||||
|
OwnerAccountID: "123456789012",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "nil properties",
|
||||||
|
metadata: namespaceMetadata{
|
||||||
|
Namespace: []string{"analytics"},
|
||||||
|
Properties: nil,
|
||||||
|
OwnerAccountID: "123456789012",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty properties",
|
||||||
|
metadata: namespaceMetadata{
|
||||||
|
Namespace: []string{"analytics"},
|
||||||
|
Properties: map[string]string{},
|
||||||
|
OwnerAccountID: "123456789012",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
data, err := json.Marshal(tc.metadata)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("json.Marshal(metadata) returned error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var decoded namespaceMetadata
|
||||||
|
if err := json.Unmarshal(data, &decoded); err != nil {
|
||||||
|
t.Fatalf("json.Unmarshal(data) returned error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Due to `omitempty`, nil and empty maps are unmarshaled as nil.
|
||||||
|
if len(tc.metadata.Properties) == 0 {
|
||||||
|
if decoded.Properties != nil {
|
||||||
|
t.Fatalf("expected nil properties for empty/nil input, got %v", decoded.Properties)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(decoded.Properties, tc.metadata.Properties) {
|
||||||
|
t.Fatalf("decoded.Properties = %v, want %v", decoded.Properties, tc.metadata.Properties)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user