* feat: Add Iceberg REST Catalog server Implement Iceberg REST Catalog API on a separate port (default 8181) that exposes S3 Tables metadata through the Apache Iceberg REST protocol. - Add new weed/s3api/iceberg package with REST handlers - Implement /v1/config endpoint returning catalog configuration - Implement namespace endpoints (list/create/get/head/delete) - Implement table endpoints (list/create/load/head/delete/update) - Add -port.iceberg flag to S3 standalone server (s3.go) - Add -s3.port.iceberg flag to combined server mode (server.go) - Add -s3.port.iceberg flag to mini cluster mode (mini.go) - Support prefix-based routing for multiple catalogs The Iceberg REST server reuses S3 Tables metadata storage under /table-buckets and enables DuckDB, Spark, and other Iceberg clients to connect to SeaweedFS as a catalog. * feat: Add Iceberg Catalog pages to admin UI Add admin UI pages to browse Iceberg catalogs, namespaces, and tables. - Add Iceberg Catalog menu item under Object Store navigation - Create iceberg_catalog.templ showing catalog overview with REST info - Create iceberg_namespaces.templ listing namespaces in a catalog - Create iceberg_tables.templ listing tables in a namespace - Add handlers and routes in admin_handlers.go - Add Iceberg data provider methods in s3tables_management.go - Add Iceberg data types in types.go The Iceberg Catalog pages provide visibility into the same S3 Tables data through an Iceberg-centric lens, including REST endpoint examples for DuckDB and PyIceberg. * test: Add Iceberg catalog integration tests and reorg s3tables tests - Reorganize existing s3tables tests to test/s3tables/table-buckets/ - Add new test/s3tables/catalog/ for Iceberg REST catalog tests - Add TestIcebergConfig to verify /v1/config endpoint - Add TestIcebergNamespaces to verify namespace listing - Add TestDuckDBIntegration for DuckDB connectivity (requires Docker) - Update CI workflow to use new test paths * fix: Generate proper random UUIDs for Iceberg tables Address code review feedback: - Replace placeholder UUID with crypto/rand-based UUID v4 generation - Add detailed TODO comments for handleUpdateTable stub explaining the required atomic metadata swap implementation * fix: Serve Iceberg on localhost listener when binding to different interface Address code review feedback: properly serve the localhost listener when the Iceberg server is bound to a non-localhost interface. * ci: Add Iceberg catalog integration tests to CI Add new job to run Iceberg catalog tests in CI, along with: - Iceberg package build verification - Iceberg unit tests - Iceberg go vet checks - Iceberg format checks * fix: Address code review feedback for Iceberg implementation - fix: Replace hardcoded account ID with s3_constants.AccountAdminId in buildTableBucketARN() - fix: Improve UUID generation error handling with deterministic fallback (timestamp + PID + counter) - fix: Update handleUpdateTable to return HTTP 501 Not Implemented instead of fake success - fix: Better error handling in handleNamespaceExists to distinguish 404 from 500 errors - fix: Use relative URL in template instead of hardcoded localhost:8181 - fix: Add HTTP timeout to test's waitForService function to avoid hangs - fix: Use dynamic ephemeral ports in integration tests to avoid flaky parallel failures - fix: Add Iceberg port to final port configuration logging in mini.go * fix: Address critical issues in Iceberg implementation - fix: Cache table UUIDs to ensure persistence across LoadTable calls The UUID now remains stable for the lifetime of the server session. TODO: For production, UUIDs should be persisted in S3 Tables metadata. - fix: Remove redundant URL-encoded namespace parsing mux router already decodes %1F to \x1F before passing to handlers. Redundant ReplaceAll call could cause bugs with literal %1F in namespace. * fix: Improve test robustness and reduce code duplication - fix: Make DuckDB test more robust by failing on unexpected errors Instead of silently logging errors, now explicitly check for expected conditions (extension not available) and skip the test appropriately. - fix: Extract username helper method to reduce duplication Created getUsername() helper in AdminHandlers to avoid duplicating the username retrieval logic across Iceberg page handlers. * fix: Add mutex protection to table UUID cache Protects concurrent access to the tableUUIDs map with sync.RWMutex. Uses read-lock for fast path when UUID already cached, and write-lock for generating new UUIDs. Includes double-check pattern to handle race condition between read-unlock and write-lock. * style: fix go fmt errors * feat(iceberg): persist table UUID in S3 Tables metadata * feat(admin): configure Iceberg port in Admin UI and commands * refactor: address review comments (flags, tests, handlers) - command/mini: fix tracking of explicit s3.port.iceberg flag - command/admin: add explicit -iceberg.port flag - admin/handlers: reuse getUsername helper - tests: use 127.0.0.1 for ephemeral ports and os.Stat for file size check * test: check error from FileStat in verify_gc_empty_test
321 lines
8.5 KiB
Go
321 lines
8.5 KiB
Go
// Package catalog provides integration tests for the Iceberg REST Catalog API.
|
|
// These tests use DuckDB running in Docker to verify catalog operations.
|
|
package catalog
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
// TestEnvironment contains the test environment configuration
|
|
type TestEnvironment struct {
|
|
seaweedDir string
|
|
weedBinary string
|
|
dataDir string
|
|
s3Port int
|
|
icebergPort int
|
|
masterPort int
|
|
filerPort int
|
|
volumePort int
|
|
weedProcess *exec.Cmd
|
|
weedCancel context.CancelFunc
|
|
dockerAvailable bool
|
|
}
|
|
|
|
// hasDocker checks if Docker is available
|
|
func hasDocker() bool {
|
|
cmd := exec.Command("docker", "version")
|
|
return cmd.Run() == nil
|
|
}
|
|
|
|
// getFreePort returns an available ephemeral port
|
|
func getFreePort() (int, error) {
|
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer listener.Close()
|
|
|
|
addr := listener.Addr().(*net.TCPAddr)
|
|
return addr.Port, nil
|
|
}
|
|
|
|
// NewTestEnvironment creates a new test environment
|
|
func NewTestEnvironment(t *testing.T) *TestEnvironment {
|
|
t.Helper()
|
|
|
|
// Find the SeaweedFS root directory
|
|
wd, err := os.Getwd()
|
|
if err != nil {
|
|
t.Fatalf("Failed to get working directory: %v", err)
|
|
}
|
|
|
|
// Navigate up to find the SeaweedFS root (contains go.mod)
|
|
seaweedDir := wd
|
|
for i := 0; i < 5; i++ {
|
|
if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil {
|
|
break
|
|
}
|
|
seaweedDir = filepath.Dir(seaweedDir)
|
|
}
|
|
|
|
// Check for weed binary
|
|
weedBinary := filepath.Join(seaweedDir, "weed", "weed")
|
|
if _, err := os.Stat(weedBinary); os.IsNotExist(err) {
|
|
// Try system PATH
|
|
weedBinary = "weed"
|
|
if _, err := exec.LookPath(weedBinary); err != nil {
|
|
t.Skip("weed binary not found, skipping integration test")
|
|
}
|
|
}
|
|
|
|
// Create temporary data directory
|
|
dataDir, err := os.MkdirTemp("", "seaweed-iceberg-test-*")
|
|
if err != nil {
|
|
t.Fatalf("Failed to create temp dir: %v", err)
|
|
}
|
|
|
|
// Allocate free ephemeral ports for each service
|
|
s3Port, err := getFreePort()
|
|
if err != nil {
|
|
t.Fatalf("Failed to get free port for S3: %v", err)
|
|
}
|
|
icebergPort, err := getFreePort()
|
|
if err != nil {
|
|
t.Fatalf("Failed to get free port for Iceberg: %v", err)
|
|
}
|
|
masterPort, err := getFreePort()
|
|
if err != nil {
|
|
t.Fatalf("Failed to get free port for Master: %v", err)
|
|
}
|
|
filerPort, err := getFreePort()
|
|
if err != nil {
|
|
t.Fatalf("Failed to get free port for Filer: %v", err)
|
|
}
|
|
volumePort, err := getFreePort()
|
|
if err != nil {
|
|
t.Fatalf("Failed to get free port for Volume: %v", err)
|
|
}
|
|
|
|
return &TestEnvironment{
|
|
seaweedDir: seaweedDir,
|
|
weedBinary: weedBinary,
|
|
dataDir: dataDir,
|
|
s3Port: s3Port,
|
|
icebergPort: icebergPort,
|
|
masterPort: masterPort,
|
|
filerPort: filerPort,
|
|
volumePort: volumePort,
|
|
dockerAvailable: hasDocker(),
|
|
}
|
|
}
|
|
|
|
// StartSeaweedFS starts a SeaweedFS mini cluster
|
|
func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
|
|
t.Helper()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
env.weedCancel = cancel
|
|
|
|
masterDir := filepath.Join(env.dataDir, "master")
|
|
filerDir := filepath.Join(env.dataDir, "filer")
|
|
volumeDir := filepath.Join(env.dataDir, "volume")
|
|
|
|
for _, dir := range []string{masterDir, filerDir, volumeDir} {
|
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
|
t.Fatalf("Failed to create directory %s: %v", dir, err)
|
|
}
|
|
}
|
|
|
|
cmd := exec.CommandContext(ctx, env.weedBinary, "mini",
|
|
"-master.port", fmt.Sprintf("%d", env.masterPort),
|
|
"-volume.port", fmt.Sprintf("%d", env.volumePort),
|
|
"-filer.port", fmt.Sprintf("%d", env.filerPort),
|
|
"-s3.port", fmt.Sprintf("%d", env.s3Port),
|
|
"-s3.port.iceberg", fmt.Sprintf("%d", env.icebergPort),
|
|
"-dir", env.dataDir,
|
|
)
|
|
cmd.Stdout = os.Stdout
|
|
cmd.Stderr = os.Stderr
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
t.Fatalf("Failed to start SeaweedFS: %v", err)
|
|
}
|
|
env.weedProcess = cmd
|
|
|
|
// Wait for services to be ready
|
|
if !env.waitForService(fmt.Sprintf("http://127.0.0.1:%d/v1/config", env.icebergPort), 30*time.Second) {
|
|
t.Fatalf("Iceberg REST API did not become ready")
|
|
}
|
|
}
|
|
|
|
// waitForService waits for a service to become available
|
|
func (env *TestEnvironment) waitForService(url string, timeout time.Duration) bool {
|
|
client := &http.Client{Timeout: 2 * time.Second}
|
|
deadline := time.Now().Add(timeout)
|
|
for time.Now().Before(deadline) {
|
|
resp, err := client.Get(url)
|
|
if err == nil {
|
|
resp.Body.Close()
|
|
if resp.StatusCode == http.StatusOK {
|
|
return true
|
|
}
|
|
}
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Cleanup stops SeaweedFS and cleans up resources
|
|
func (env *TestEnvironment) Cleanup(t *testing.T) {
|
|
t.Helper()
|
|
|
|
if env.weedCancel != nil {
|
|
env.weedCancel()
|
|
}
|
|
|
|
if env.weedProcess != nil {
|
|
// Give process time to shut down gracefully
|
|
time.Sleep(2 * time.Second)
|
|
env.weedProcess.Wait()
|
|
}
|
|
|
|
if env.dataDir != "" {
|
|
os.RemoveAll(env.dataDir)
|
|
}
|
|
}
|
|
|
|
// IcebergURL returns the Iceberg REST Catalog URL
|
|
func (env *TestEnvironment) IcebergURL() string {
|
|
return fmt.Sprintf("http://127.0.0.1:%d", env.icebergPort)
|
|
}
|
|
|
|
// TestIcebergConfig tests the /v1/config endpoint
|
|
func TestIcebergConfig(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping integration test in short mode")
|
|
}
|
|
|
|
env := NewTestEnvironment(t)
|
|
defer env.Cleanup(t)
|
|
|
|
env.StartSeaweedFS(t)
|
|
|
|
// Test GET /v1/config
|
|
resp, err := http.Get(env.IcebergURL() + "/v1/config")
|
|
if err != nil {
|
|
t.Fatalf("Failed to get config: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
t.Errorf("Expected status 200, got %d", resp.StatusCode)
|
|
}
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
t.Fatalf("Failed to read response body: %v", err)
|
|
}
|
|
|
|
// Verify response contains required fields
|
|
bodyStr := string(body)
|
|
if !strings.Contains(bodyStr, "defaults") || !strings.Contains(bodyStr, "overrides") {
|
|
t.Errorf("Config response missing required fields: %s", bodyStr)
|
|
}
|
|
}
|
|
|
|
// TestIcebergNamespaces tests namespace operations
|
|
func TestIcebergNamespaces(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping integration test in short mode")
|
|
}
|
|
|
|
env := NewTestEnvironment(t)
|
|
defer env.Cleanup(t)
|
|
|
|
env.StartSeaweedFS(t)
|
|
|
|
// Test GET /v1/namespaces (should return empty list initially)
|
|
resp, err := http.Get(env.IcebergURL() + "/v1/namespaces")
|
|
if err != nil {
|
|
t.Fatalf("Failed to list namespaces: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
t.Fatalf("Expected status 200, got %d: %s", resp.StatusCode, body)
|
|
}
|
|
}
|
|
|
|
// TestDuckDBIntegration tests Iceberg catalog operations using DuckDB
|
|
// This test requires Docker to be available
|
|
func TestDuckDBIntegration(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping integration test in short mode")
|
|
}
|
|
|
|
env := NewTestEnvironment(t)
|
|
defer env.Cleanup(t)
|
|
|
|
if !env.dockerAvailable {
|
|
t.Skip("Docker not available, skipping DuckDB integration test")
|
|
}
|
|
|
|
env.StartSeaweedFS(t)
|
|
|
|
// Create a temporary SQL file for DuckDB to execute
|
|
sqlFile := filepath.Join(env.dataDir, "test.sql")
|
|
sqlContent := fmt.Sprintf(`
|
|
-- Install and load Iceberg extension
|
|
INSTALL iceberg;
|
|
LOAD iceberg;
|
|
|
|
-- List namespaces via Iceberg REST catalog (basic connectivity test)
|
|
-- Note: Full operations require setting up S3 credentials which is beyond this test
|
|
SELECT 'Iceberg extension loaded successfully' as result;
|
|
`)
|
|
|
|
if err := os.WriteFile(sqlFile, []byte(sqlContent), 0644); err != nil {
|
|
t.Fatalf("Failed to write SQL file: %v", err)
|
|
}
|
|
|
|
// Run DuckDB in Docker to test Iceberg connectivity
|
|
// Use host.docker.internal to connect to the host's Iceberg port
|
|
cmd := exec.Command("docker", "run", "--rm",
|
|
"-v", fmt.Sprintf("%s:/test", env.dataDir),
|
|
"--add-host", "host.docker.internal:host-gateway",
|
|
"duckdb/duckdb:latest",
|
|
"-c", ".read /test/test.sql",
|
|
)
|
|
|
|
output, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
t.Logf("DuckDB output: %s", output)
|
|
// Check for expected errors in certain CI environments
|
|
outputStr := string(output)
|
|
if strings.Contains(outputStr, "iceberg extension is not available") ||
|
|
strings.Contains(outputStr, "Failed to load") {
|
|
t.Skip("Skipping DuckDB test: Iceberg extension not available in Docker image")
|
|
}
|
|
// Any other error is unexpected
|
|
t.Fatalf("DuckDB command failed unexpectedly. Output: %s\nError: %v", output, err)
|
|
}
|
|
|
|
// Verify the test completed successfully
|
|
outputStr := string(output)
|
|
t.Logf("DuckDB output: %s", outputStr)
|
|
if !strings.Contains(outputStr, "Iceberg extension loaded successfully") {
|
|
t.Errorf("Expected success message in output, got: %s", outputStr)
|
|
}
|
|
}
|