feat: Add Iceberg REST Catalog server and admin UI (#8175)
* feat: Add Iceberg REST Catalog server Implement Iceberg REST Catalog API on a separate port (default 8181) that exposes S3 Tables metadata through the Apache Iceberg REST protocol. - Add new weed/s3api/iceberg package with REST handlers - Implement /v1/config endpoint returning catalog configuration - Implement namespace endpoints (list/create/get/head/delete) - Implement table endpoints (list/create/load/head/delete/update) - Add -port.iceberg flag to S3 standalone server (s3.go) - Add -s3.port.iceberg flag to combined server mode (server.go) - Add -s3.port.iceberg flag to mini cluster mode (mini.go) - Support prefix-based routing for multiple catalogs The Iceberg REST server reuses S3 Tables metadata storage under /table-buckets and enables DuckDB, Spark, and other Iceberg clients to connect to SeaweedFS as a catalog. * feat: Add Iceberg Catalog pages to admin UI Add admin UI pages to browse Iceberg catalogs, namespaces, and tables. - Add Iceberg Catalog menu item under Object Store navigation - Create iceberg_catalog.templ showing catalog overview with REST info - Create iceberg_namespaces.templ listing namespaces in a catalog - Create iceberg_tables.templ listing tables in a namespace - Add handlers and routes in admin_handlers.go - Add Iceberg data provider methods in s3tables_management.go - Add Iceberg data types in types.go The Iceberg Catalog pages provide visibility into the same S3 Tables data through an Iceberg-centric lens, including REST endpoint examples for DuckDB and PyIceberg. * test: Add Iceberg catalog integration tests and reorg s3tables tests - Reorganize existing s3tables tests to test/s3tables/table-buckets/ - Add new test/s3tables/catalog/ for Iceberg REST catalog tests - Add TestIcebergConfig to verify /v1/config endpoint - Add TestIcebergNamespaces to verify namespace listing - Add TestDuckDBIntegration for DuckDB connectivity (requires Docker) - Update CI workflow to use new test paths * fix: Generate proper random UUIDs for Iceberg tables Address code review feedback: - Replace placeholder UUID with crypto/rand-based UUID v4 generation - Add detailed TODO comments for handleUpdateTable stub explaining the required atomic metadata swap implementation * fix: Serve Iceberg on localhost listener when binding to different interface Address code review feedback: properly serve the localhost listener when the Iceberg server is bound to a non-localhost interface. * ci: Add Iceberg catalog integration tests to CI Add new job to run Iceberg catalog tests in CI, along with: - Iceberg package build verification - Iceberg unit tests - Iceberg go vet checks - Iceberg format checks * fix: Address code review feedback for Iceberg implementation - fix: Replace hardcoded account ID with s3_constants.AccountAdminId in buildTableBucketARN() - fix: Improve UUID generation error handling with deterministic fallback (timestamp + PID + counter) - fix: Update handleUpdateTable to return HTTP 501 Not Implemented instead of fake success - fix: Better error handling in handleNamespaceExists to distinguish 404 from 500 errors - fix: Use relative URL in template instead of hardcoded localhost:8181 - fix: Add HTTP timeout to test's waitForService function to avoid hangs - fix: Use dynamic ephemeral ports in integration tests to avoid flaky parallel failures - fix: Add Iceberg port to final port configuration logging in mini.go * fix: Address critical issues in Iceberg implementation - fix: Cache table UUIDs to ensure persistence across LoadTable calls The UUID now remains stable for the lifetime of the server session. TODO: For production, UUIDs should be persisted in S3 Tables metadata. - fix: Remove redundant URL-encoded namespace parsing mux router already decodes %1F to \x1F before passing to handlers. Redundant ReplaceAll call could cause bugs with literal %1F in namespace. * fix: Improve test robustness and reduce code duplication - fix: Make DuckDB test more robust by failing on unexpected errors Instead of silently logging errors, now explicitly check for expected conditions (extension not available) and skip the test appropriately. - fix: Extract username helper method to reduce duplication Created getUsername() helper in AdminHandlers to avoid duplicating the username retrieval logic across Iceberg page handlers. * fix: Add mutex protection to table UUID cache Protects concurrent access to the tableUUIDs map with sync.RWMutex. Uses read-lock for fast path when UUID already cached, and write-lock for generating new UUIDs. Includes double-check pattern to handle race condition between read-unlock and write-lock. * style: fix go fmt errors * feat(iceberg): persist table UUID in S3 Tables metadata * feat(admin): configure Iceberg port in Admin UI and commands * refactor: address review comments (flags, tests, handlers) - command/mini: fix tracking of explicit s3.port.iceberg flag - command/admin: add explicit -iceberg.port flag - admin/handlers: reuse getUsername helper - tests: use 127.0.0.1 for ephemeral ports and os.Stat for file size check * test: check error from FileStat in verify_gc_empty_test
This commit is contained in:
320
test/s3tables/catalog/iceberg_catalog_test.go
Normal file
320
test/s3tables/catalog/iceberg_catalog_test.go
Normal file
@@ -0,0 +1,320 @@
|
||||
// Package catalog provides integration tests for the Iceberg REST Catalog API.
|
||||
// These tests use DuckDB running in Docker to verify catalog operations.
|
||||
package catalog
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestEnvironment contains the test environment configuration
|
||||
type TestEnvironment struct {
|
||||
seaweedDir string
|
||||
weedBinary string
|
||||
dataDir string
|
||||
s3Port int
|
||||
icebergPort int
|
||||
masterPort int
|
||||
filerPort int
|
||||
volumePort int
|
||||
weedProcess *exec.Cmd
|
||||
weedCancel context.CancelFunc
|
||||
dockerAvailable bool
|
||||
}
|
||||
|
||||
// hasDocker checks if Docker is available
|
||||
func hasDocker() bool {
|
||||
cmd := exec.Command("docker", "version")
|
||||
return cmd.Run() == nil
|
||||
}
|
||||
|
||||
// getFreePort returns an available ephemeral port
|
||||
func getFreePort() (int, error) {
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer listener.Close()
|
||||
|
||||
addr := listener.Addr().(*net.TCPAddr)
|
||||
return addr.Port, nil
|
||||
}
|
||||
|
||||
// NewTestEnvironment creates a new test environment
|
||||
func NewTestEnvironment(t *testing.T) *TestEnvironment {
|
||||
t.Helper()
|
||||
|
||||
// Find the SeaweedFS root directory
|
||||
wd, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get working directory: %v", err)
|
||||
}
|
||||
|
||||
// Navigate up to find the SeaweedFS root (contains go.mod)
|
||||
seaweedDir := wd
|
||||
for i := 0; i < 5; i++ {
|
||||
if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil {
|
||||
break
|
||||
}
|
||||
seaweedDir = filepath.Dir(seaweedDir)
|
||||
}
|
||||
|
||||
// Check for weed binary
|
||||
weedBinary := filepath.Join(seaweedDir, "weed", "weed")
|
||||
if _, err := os.Stat(weedBinary); os.IsNotExist(err) {
|
||||
// Try system PATH
|
||||
weedBinary = "weed"
|
||||
if _, err := exec.LookPath(weedBinary); err != nil {
|
||||
t.Skip("weed binary not found, skipping integration test")
|
||||
}
|
||||
}
|
||||
|
||||
// Create temporary data directory
|
||||
dataDir, err := os.MkdirTemp("", "seaweed-iceberg-test-*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create temp dir: %v", err)
|
||||
}
|
||||
|
||||
// Allocate free ephemeral ports for each service
|
||||
s3Port, err := getFreePort()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get free port for S3: %v", err)
|
||||
}
|
||||
icebergPort, err := getFreePort()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get free port for Iceberg: %v", err)
|
||||
}
|
||||
masterPort, err := getFreePort()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get free port for Master: %v", err)
|
||||
}
|
||||
filerPort, err := getFreePort()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get free port for Filer: %v", err)
|
||||
}
|
||||
volumePort, err := getFreePort()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get free port for Volume: %v", err)
|
||||
}
|
||||
|
||||
return &TestEnvironment{
|
||||
seaweedDir: seaweedDir,
|
||||
weedBinary: weedBinary,
|
||||
dataDir: dataDir,
|
||||
s3Port: s3Port,
|
||||
icebergPort: icebergPort,
|
||||
masterPort: masterPort,
|
||||
filerPort: filerPort,
|
||||
volumePort: volumePort,
|
||||
dockerAvailable: hasDocker(),
|
||||
}
|
||||
}
|
||||
|
||||
// StartSeaweedFS starts a SeaweedFS mini cluster
|
||||
func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
env.weedCancel = cancel
|
||||
|
||||
masterDir := filepath.Join(env.dataDir, "master")
|
||||
filerDir := filepath.Join(env.dataDir, "filer")
|
||||
volumeDir := filepath.Join(env.dataDir, "volume")
|
||||
|
||||
for _, dir := range []string{masterDir, filerDir, volumeDir} {
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
t.Fatalf("Failed to create directory %s: %v", dir, err)
|
||||
}
|
||||
}
|
||||
|
||||
cmd := exec.CommandContext(ctx, env.weedBinary, "mini",
|
||||
"-master.port", fmt.Sprintf("%d", env.masterPort),
|
||||
"-volume.port", fmt.Sprintf("%d", env.volumePort),
|
||||
"-filer.port", fmt.Sprintf("%d", env.filerPort),
|
||||
"-s3.port", fmt.Sprintf("%d", env.s3Port),
|
||||
"-s3.port.iceberg", fmt.Sprintf("%d", env.icebergPort),
|
||||
"-dir", env.dataDir,
|
||||
)
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatalf("Failed to start SeaweedFS: %v", err)
|
||||
}
|
||||
env.weedProcess = cmd
|
||||
|
||||
// Wait for services to be ready
|
||||
if !env.waitForService(fmt.Sprintf("http://127.0.0.1:%d/v1/config", env.icebergPort), 30*time.Second) {
|
||||
t.Fatalf("Iceberg REST API did not become ready")
|
||||
}
|
||||
}
|
||||
|
||||
// waitForService waits for a service to become available
|
||||
func (env *TestEnvironment) waitForService(url string, timeout time.Duration) bool {
|
||||
client := &http.Client{Timeout: 2 * time.Second}
|
||||
deadline := time.Now().Add(timeout)
|
||||
for time.Now().Before(deadline) {
|
||||
resp, err := client.Get(url)
|
||||
if err == nil {
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
return true
|
||||
}
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Cleanup stops SeaweedFS and cleans up resources
|
||||
func (env *TestEnvironment) Cleanup(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
if env.weedCancel != nil {
|
||||
env.weedCancel()
|
||||
}
|
||||
|
||||
if env.weedProcess != nil {
|
||||
// Give process time to shut down gracefully
|
||||
time.Sleep(2 * time.Second)
|
||||
env.weedProcess.Wait()
|
||||
}
|
||||
|
||||
if env.dataDir != "" {
|
||||
os.RemoveAll(env.dataDir)
|
||||
}
|
||||
}
|
||||
|
||||
// IcebergURL returns the Iceberg REST Catalog URL
|
||||
func (env *TestEnvironment) IcebergURL() string {
|
||||
return fmt.Sprintf("http://127.0.0.1:%d", env.icebergPort)
|
||||
}
|
||||
|
||||
// TestIcebergConfig tests the /v1/config endpoint
|
||||
func TestIcebergConfig(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
|
||||
env := NewTestEnvironment(t)
|
||||
defer env.Cleanup(t)
|
||||
|
||||
env.StartSeaweedFS(t)
|
||||
|
||||
// Test GET /v1/config
|
||||
resp, err := http.Get(env.IcebergURL() + "/v1/config")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get config: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Errorf("Expected status 200, got %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read response body: %v", err)
|
||||
}
|
||||
|
||||
// Verify response contains required fields
|
||||
bodyStr := string(body)
|
||||
if !strings.Contains(bodyStr, "defaults") || !strings.Contains(bodyStr, "overrides") {
|
||||
t.Errorf("Config response missing required fields: %s", bodyStr)
|
||||
}
|
||||
}
|
||||
|
||||
// TestIcebergNamespaces tests namespace operations
|
||||
func TestIcebergNamespaces(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
|
||||
env := NewTestEnvironment(t)
|
||||
defer env.Cleanup(t)
|
||||
|
||||
env.StartSeaweedFS(t)
|
||||
|
||||
// Test GET /v1/namespaces (should return empty list initially)
|
||||
resp, err := http.Get(env.IcebergURL() + "/v1/namespaces")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to list namespaces: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
t.Fatalf("Expected status 200, got %d: %s", resp.StatusCode, body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDuckDBIntegration tests Iceberg catalog operations using DuckDB
|
||||
// This test requires Docker to be available
|
||||
func TestDuckDBIntegration(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
|
||||
env := NewTestEnvironment(t)
|
||||
defer env.Cleanup(t)
|
||||
|
||||
if !env.dockerAvailable {
|
||||
t.Skip("Docker not available, skipping DuckDB integration test")
|
||||
}
|
||||
|
||||
env.StartSeaweedFS(t)
|
||||
|
||||
// Create a temporary SQL file for DuckDB to execute
|
||||
sqlFile := filepath.Join(env.dataDir, "test.sql")
|
||||
sqlContent := fmt.Sprintf(`
|
||||
-- Install and load Iceberg extension
|
||||
INSTALL iceberg;
|
||||
LOAD iceberg;
|
||||
|
||||
-- List namespaces via Iceberg REST catalog (basic connectivity test)
|
||||
-- Note: Full operations require setting up S3 credentials which is beyond this test
|
||||
SELECT 'Iceberg extension loaded successfully' as result;
|
||||
`)
|
||||
|
||||
if err := os.WriteFile(sqlFile, []byte(sqlContent), 0644); err != nil {
|
||||
t.Fatalf("Failed to write SQL file: %v", err)
|
||||
}
|
||||
|
||||
// Run DuckDB in Docker to test Iceberg connectivity
|
||||
// Use host.docker.internal to connect to the host's Iceberg port
|
||||
cmd := exec.Command("docker", "run", "--rm",
|
||||
"-v", fmt.Sprintf("%s:/test", env.dataDir),
|
||||
"--add-host", "host.docker.internal:host-gateway",
|
||||
"duckdb/duckdb:latest",
|
||||
"-c", ".read /test/test.sql",
|
||||
)
|
||||
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
t.Logf("DuckDB output: %s", output)
|
||||
// Check for expected errors in certain CI environments
|
||||
outputStr := string(output)
|
||||
if strings.Contains(outputStr, "iceberg extension is not available") ||
|
||||
strings.Contains(outputStr, "Failed to load") {
|
||||
t.Skip("Skipping DuckDB test: Iceberg extension not available in Docker image")
|
||||
}
|
||||
// Any other error is unexpected
|
||||
t.Fatalf("DuckDB command failed unexpectedly. Output: %s\nError: %v", output, err)
|
||||
}
|
||||
|
||||
// Verify the test completed successfully
|
||||
outputStr := string(output)
|
||||
t.Logf("DuckDB output: %s", outputStr)
|
||||
if !strings.Contains(outputStr, "Iceberg extension loaded successfully") {
|
||||
t.Errorf("Expected success message in output, got: %s", outputStr)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user