iceberg: wire pagination for list namespaces/tables REST APIs (#8275)
* s3api/iceberg: wire list pagination tokens and page size * fmt * Update weed/s3api/iceberg/iceberg.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -368,6 +369,38 @@ func buildTableBucketARN(bucketName string) string {
|
|||||||
return arn
|
return arn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultListPageSize = 1000
|
||||||
|
maxListPageSize = 1000
|
||||||
|
)
|
||||||
|
|
||||||
|
func getPaginationQueryParam(r *http.Request, primary, fallback string) string {
|
||||||
|
if v := strings.TrimSpace(r.URL.Query().Get(primary)); v != "" {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
return strings.TrimSpace(r.URL.Query().Get(fallback))
|
||||||
|
}
|
||||||
|
|
||||||
|
func parsePagination(r *http.Request) (pageToken string, pageSize int, err error) {
|
||||||
|
pageToken = getPaginationQueryParam(r, "pageToken", "page-token")
|
||||||
|
pageSize = defaultListPageSize
|
||||||
|
|
||||||
|
pageSizeValue := getPaginationQueryParam(r, "pageSize", "page-size")
|
||||||
|
if pageSizeValue == "" {
|
||||||
|
return pageToken, pageSize, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
parsedPageSize, parseErr := strconv.Atoi(pageSizeValue)
|
||||||
|
if parseErr != nil || parsedPageSize <= 0 {
|
||||||
|
return pageToken, 0, fmt.Errorf("invalid pageSize %q: must be a positive integer", pageSizeValue)
|
||||||
|
}
|
||||||
|
if parsedPageSize > maxListPageSize {
|
||||||
|
return pageToken, 0, fmt.Errorf("invalid pageSize %q: must be <= %d", pageSizeValue, maxListPageSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
return pageToken, parsedPageSize, nil
|
||||||
|
}
|
||||||
|
|
||||||
// handleConfig returns catalog configuration.
|
// handleConfig returns catalog configuration.
|
||||||
func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
@@ -388,14 +421,21 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) {
|
|||||||
// Extract identity from context
|
// Extract identity from context
|
||||||
identityName := s3_constants.GetIdentityNameFromContext(r)
|
identityName := s3_constants.GetIdentityNameFromContext(r)
|
||||||
|
|
||||||
|
pageToken, pageSize, err := parsePagination(r)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusBadRequest, "BadRequestException", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Use S3 Tables manager to list namespaces
|
// Use S3 Tables manager to list namespaces
|
||||||
var resp s3tables.ListNamespacesResponse
|
var resp s3tables.ListNamespacesResponse
|
||||||
req := &s3tables.ListNamespacesRequest{
|
req := &s3tables.ListNamespacesRequest{
|
||||||
TableBucketARN: bucketARN,
|
TableBucketARN: bucketARN,
|
||||||
MaxNamespaces: 1000,
|
ContinuationToken: pageToken,
|
||||||
|
MaxNamespaces: pageSize,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
mgrClient := s3tables.NewManagerClient(client)
|
mgrClient := s3tables.NewManagerClient(client)
|
||||||
return s.tablesManager.Execute(r.Context(), mgrClient, "ListNamespaces", req, &resp, identityName)
|
return s.tablesManager.Execute(r.Context(), mgrClient, "ListNamespaces", req, &resp, identityName)
|
||||||
})
|
})
|
||||||
@@ -413,7 +453,8 @@ func (s *Server) handleListNamespaces(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
result := ListNamespacesResponse{
|
result := ListNamespacesResponse{
|
||||||
Namespaces: namespaces,
|
NextPageToken: resp.ContinuationToken,
|
||||||
|
Namespaces: namespaces,
|
||||||
}
|
}
|
||||||
writeJSON(w, http.StatusOK, result)
|
writeJSON(w, http.StatusOK, result)
|
||||||
}
|
}
|
||||||
@@ -615,14 +656,21 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) {
|
|||||||
// Extract identity from context
|
// Extract identity from context
|
||||||
identityName := s3_constants.GetIdentityNameFromContext(r)
|
identityName := s3_constants.GetIdentityNameFromContext(r)
|
||||||
|
|
||||||
|
pageToken, pageSize, err := parsePagination(r)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusBadRequest, "BadRequestException", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
listReq := &s3tables.ListTablesRequest{
|
listReq := &s3tables.ListTablesRequest{
|
||||||
TableBucketARN: bucketARN,
|
TableBucketARN: bucketARN,
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
MaxTables: 1000,
|
ContinuationToken: pageToken,
|
||||||
|
MaxTables: pageSize,
|
||||||
}
|
}
|
||||||
var listResp s3tables.ListTablesResponse
|
var listResp s3tables.ListTablesResponse
|
||||||
|
|
||||||
err := s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
err = s.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
||||||
mgrClient := s3tables.NewManagerClient(client)
|
mgrClient := s3tables.NewManagerClient(client)
|
||||||
return s.tablesManager.Execute(r.Context(), mgrClient, "ListTables", listReq, &listResp, identityName)
|
return s.tablesManager.Execute(r.Context(), mgrClient, "ListTables", listReq, &listResp, identityName)
|
||||||
})
|
})
|
||||||
@@ -647,7 +695,8 @@ func (s *Server) handleListTables(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
result := ListTablesResponse{
|
result := ListTablesResponse{
|
||||||
Identifiers: identifiers,
|
NextPageToken: listResp.ContinuationToken,
|
||||||
|
Identifiers: identifiers,
|
||||||
}
|
}
|
||||||
writeJSON(w, http.StatusOK, result)
|
writeJSON(w, http.StatusOK, result)
|
||||||
}
|
}
|
||||||
|
|||||||
69
weed/s3api/iceberg/iceberg_pagination_test.go
Normal file
69
weed/s3api/iceberg/iceberg_pagination_test.go
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
package iceberg
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParsePaginationDefaultValues(t *testing.T) {
|
||||||
|
req := httptest.NewRequest("GET", "/v1/namespaces", nil)
|
||||||
|
|
||||||
|
pageToken, pageSize, err := parsePagination(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("parsePagination() error = %v", err)
|
||||||
|
}
|
||||||
|
if pageToken != "" {
|
||||||
|
t.Fatalf("pageToken = %q, want empty", pageToken)
|
||||||
|
}
|
||||||
|
if pageSize != defaultListPageSize {
|
||||||
|
t.Fatalf("pageSize = %d, want %d", pageSize, defaultListPageSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParsePaginationUsesCamelCaseParameters(t *testing.T) {
|
||||||
|
req := httptest.NewRequest("GET", "/v1/namespaces?pageToken=abc&pageSize=25", nil)
|
||||||
|
|
||||||
|
pageToken, pageSize, err := parsePagination(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("parsePagination() error = %v", err)
|
||||||
|
}
|
||||||
|
if pageToken != "abc" {
|
||||||
|
t.Fatalf("pageToken = %q, want %q", pageToken, "abc")
|
||||||
|
}
|
||||||
|
if pageSize != 25 {
|
||||||
|
t.Fatalf("pageSize = %d, want %d", pageSize, 25)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParsePaginationSupportsHyphenatedFallback(t *testing.T) {
|
||||||
|
req := httptest.NewRequest("GET", "/v1/namespaces?page-token=abc&page-size=17", nil)
|
||||||
|
|
||||||
|
pageToken, pageSize, err := parsePagination(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("parsePagination() error = %v", err)
|
||||||
|
}
|
||||||
|
if pageToken != "abc" {
|
||||||
|
t.Fatalf("pageToken = %q, want %q", pageToken, "abc")
|
||||||
|
}
|
||||||
|
if pageSize != 17 {
|
||||||
|
t.Fatalf("pageSize = %d, want %d", pageSize, 17)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParsePaginationRejectsInvalidPageSize(t *testing.T) {
|
||||||
|
testCases := []string{
|
||||||
|
"/v1/namespaces?pageSize=0",
|
||||||
|
"/v1/namespaces?pageSize=-1",
|
||||||
|
"/v1/namespaces?pageSize=foo",
|
||||||
|
"/v1/namespaces?pageSize=1001",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, rawURL := range testCases {
|
||||||
|
t.Run(rawURL, func(t *testing.T) {
|
||||||
|
req := httptest.NewRequest("GET", rawURL, nil)
|
||||||
|
if _, _, err := parsePagination(req); err == nil {
|
||||||
|
t.Fatalf("parsePagination() expected error for url %q", rawURL)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -54,4 +54,3 @@ func TestCheckEcVolumeStatusCountOnlyDataShards(t *testing.T) {
|
|||||||
t.Fatalf("expected shardCount=3, got %d", shardCount)
|
t.Fatalf("expected shardCount=3, got %d", shardCount)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user