// Package catalog provides integration tests for the Iceberg REST Catalog API. // These tests use DuckDB running in Docker to verify catalog operations. package catalog import ( "bytes" "context" "encoding/json" "flag" "fmt" "io" "net" "net/http" "os" "os/exec" "path/filepath" "strings" "testing" "time" ) // sharedEnv is the single TestEnvironment shared across all tests in this package. var sharedEnv *TestEnvironment // TestMain starts one weed mini instance for the whole package and tears it down // after all tests have run. func TestMain(m *testing.M) { flag.Parse() if os.Getenv("SHORT") != "" || testing.Short() { // Let tests self-skip when run with -short. os.Exit(m.Run()) } env, err := newTestEnvironmentForMain() if err != nil { fmt.Fprintf(os.Stderr, "SKIP: setup failed: %v\n", err) os.Exit(0) // Skip all tests rather than fail } sharedEnv = env if startErr := sharedEnv.startSeaweedFSForMain(); startErr != nil { fmt.Fprintf(os.Stderr, "SKIP: weed mini failed to start: %v\n", startErr) sharedEnv.cleanupForMain() os.Exit(0) } code := m.Run() sharedEnv.cleanupForMain() os.Exit(code) } // TestEnvironment contains the test environment configuration type TestEnvironment struct { seaweedDir string weedBinary string dataDir string s3Port int s3GrpcPort int icebergPort int masterPort int masterGrpcPort int filerPort int filerGrpcPort int volumePort int volumeGrpcPort int weedProcess *exec.Cmd weedCancel context.CancelFunc dockerAvailable bool } // hasDocker checks if Docker is available func hasDocker() bool { cmd := exec.Command("docker", "version") return cmd.Run() == nil } // getFreePort returns an available ephemeral port and its listener func getFreePort() (int, net.Listener, error) { listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { return 0, nil, err } addr := listener.Addr().(*net.TCPAddr) return addr.Port, listener, nil } // newTestEnvironmentForMain creates a TestEnvironment without calling t.Fatalf so it // can be used from TestMain (which has no *testing.T). func newTestEnvironmentForMain() (*TestEnvironment, error) { // Find the SeaweedFS root directory wd, err := os.Getwd() if err != nil { return nil, fmt.Errorf("get working directory: %w", err) } seaweedDir := wd for i := 0; i < 5; i++ { if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil { break } seaweedDir = filepath.Dir(seaweedDir) } // Check for weed binary weedBinary := filepath.Join(seaweedDir, "weed", "weed") if _, err := os.Stat(weedBinary); os.IsNotExist(err) { weedBinary = "weed" if _, err := exec.LookPath(weedBinary); err != nil { return nil, fmt.Errorf("weed binary not found") } } // Create temporary data directory dataDir, err := os.MkdirTemp("", "seaweed-iceberg-test-*") if err != nil { return nil, fmt.Errorf("create temp dir: %w", err) } // Allocate free ephemeral ports for each service var listeners []net.Listener closeListeners := func() { for _, l := range listeners { l.Close() } } var l net.Listener s3Port, l, err := getFreePort() if err != nil { closeListeners() return nil, fmt.Errorf("get free port for S3: %w", err) } listeners = append(listeners, l) icebergPort, l, err := getFreePort() if err != nil { closeListeners() return nil, fmt.Errorf("get free port for Iceberg: %w", err) } listeners = append(listeners, l) s3GrpcPort, l, err := getFreePort() if err != nil { closeListeners() return nil, fmt.Errorf("get free port for S3 gRPC: %w", err) } listeners = append(listeners, l) masterPort, l, err := getFreePort() if err != nil { closeListeners() return nil, fmt.Errorf("get free port for Master: %w", err) } listeners = append(listeners, l) masterGrpcPort, l, err := getFreePort() if err != nil { closeListeners() return nil, fmt.Errorf("get free port for Master gRPC: %w", err) } listeners = append(listeners, l) filerPort, l, err := getFreePort() if err != nil { closeListeners() return nil, fmt.Errorf("get free port for Filer: %w", err) } listeners = append(listeners, l) filerGrpcPort, l, err := getFreePort() if err != nil { closeListeners() return nil, fmt.Errorf("get free port for Filer gRPC: %w", err) } listeners = append(listeners, l) volumePort, l, err := getFreePort() if err != nil { closeListeners() return nil, fmt.Errorf("get free port for Volume: %w", err) } listeners = append(listeners, l) volumeGrpcPort, l, err := getFreePort() if err != nil { closeListeners() return nil, fmt.Errorf("get free port for Volume gRPC: %w", err) } listeners = append(listeners, l) // Release the port reservations so weed mini can bind to them closeListeners() return &TestEnvironment{ seaweedDir: seaweedDir, weedBinary: weedBinary, dataDir: dataDir, s3Port: s3Port, s3GrpcPort: s3GrpcPort, icebergPort: icebergPort, masterPort: masterPort, masterGrpcPort: masterGrpcPort, filerPort: filerPort, filerGrpcPort: filerGrpcPort, volumePort: volumePort, volumeGrpcPort: volumeGrpcPort, dockerAvailable: hasDocker(), }, nil } // startSeaweedFSForMain starts weed mini without a *testing.T (for use in TestMain). func (env *TestEnvironment) startSeaweedFSForMain() error { ctx, cancel := context.WithCancel(context.Background()) env.weedCancel = cancel masterDir := filepath.Join(env.dataDir, "master") filerDir := filepath.Join(env.dataDir, "filer") volumeDir := filepath.Join(env.dataDir, "volume") for _, dir := range []string{masterDir, filerDir, volumeDir} { if err := os.MkdirAll(dir, 0755); err != nil { cancel() return fmt.Errorf("create directory %s: %w", dir, err) } } cmd := exec.CommandContext(ctx, env.weedBinary, "mini", "-master.port", fmt.Sprintf("%d", env.masterPort), "-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort), "-volume.port", fmt.Sprintf("%d", env.volumePort), "-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort), "-filer.port", fmt.Sprintf("%d", env.filerPort), "-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort), "-s3.port", fmt.Sprintf("%d", env.s3Port), "-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort), "-s3.port.iceberg", fmt.Sprintf("%d", env.icebergPort), "-ip.bind", "0.0.0.0", "-dir", env.dataDir, ) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { cancel() return fmt.Errorf("start SeaweedFS: %w", err) } env.weedProcess = cmd if !env.waitForService(fmt.Sprintf("http://127.0.0.1:%d/v1/config", env.icebergPort), 30*time.Second) { cancel() cmd.Wait() return fmt.Errorf("Iceberg REST API did not become ready") } return nil } // cleanupForMain stops SeaweedFS and cleans up resources (no *testing.T needed). func (env *TestEnvironment) cleanupForMain() { if env.weedCancel != nil { env.weedCancel() } if env.weedProcess != nil { time.Sleep(2 * time.Second) env.weedProcess.Wait() } if env.dataDir != "" { os.RemoveAll(env.dataDir) } } // waitForService waits for a service to become available func (env *TestEnvironment) waitForService(url string, timeout time.Duration) bool { client := &http.Client{Timeout: 2 * time.Second} deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { resp, err := client.Get(url) if err == nil { resp.Body.Close() if resp.StatusCode == http.StatusOK { return true } } time.Sleep(500 * time.Millisecond) } return false } // IcebergURL returns the Iceberg REST Catalog URL func (env *TestEnvironment) IcebergURL() string { return fmt.Sprintf("http://127.0.0.1:%d", env.icebergPort) } // TestIcebergConfig tests the /v1/config endpoint func TestIcebergConfig(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } env := sharedEnv // Test GET /v1/config resp, err := http.Get(env.IcebergURL() + "/v1/config") if err != nil { t.Fatalf("Failed to get config: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Errorf("Expected status 200, got %d", resp.StatusCode) } body, err := io.ReadAll(resp.Body) if err != nil { t.Fatalf("Failed to read response body: %v", err) } // Verify response contains required fields bodyStr := string(body) if !strings.Contains(bodyStr, "defaults") || !strings.Contains(bodyStr, "overrides") { t.Errorf("Config response missing required fields: %s", bodyStr) } } // TestIcebergNamespaces tests namespace operations func TestIcebergNamespaces(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } env := sharedEnv // Create the default table bucket first via S3 bucketName := "warehouse-ns-" + randomSuffix() createTableBucket(t, env, bucketName) // Test GET /v1/namespaces (should return empty list initially) resp, err := http.Get(env.IcebergURL() + icebergPath(bucketName, "/v1/namespaces")) if err != nil { t.Fatalf("Failed to list namespaces: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) t.Fatalf("Expected status 200, got %d: %s", resp.StatusCode, body) } } // TestStageCreateAndFinalizeFlow verifies staged create remains invisible until assert-create commit finalizes table creation. func TestStageCreateAndFinalizeFlow(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } env := sharedEnv bucketName := "warehouse-stage-" + randomSuffix() createTableBucket(t, env, bucketName) namespace := "stage_ns_" + randomSuffix() tableName := "orders" status, _, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, "/v1/namespaces"), map[string]any{ "namespace": []string{namespace}, }) if err != nil { t.Fatalf("Create namespace request failed: %v", err) } if status != http.StatusOK && status != http.StatusConflict { t.Fatalf("Create namespace status = %d, want 200 or 409", status) } status, badReqResp, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables", namespace)), map[string]any{ "stage-create": true, }) if err != nil { t.Fatalf("Stage create missing-name request failed: %v", err) } if status != http.StatusBadRequest { t.Fatalf("Stage create missing-name status = %d, want 400", status) } errorObj, _ := badReqResp["error"].(map[string]any) if got := errorObj["type"]; got != "BadRequestException" { t.Fatalf("error.type = %v, want BadRequestException", got) } msg, _ := errorObj["message"].(string) if !strings.Contains(strings.ToLower(msg), "table name is required") { t.Fatalf("error.message = %v, want it to include %q", errorObj["message"], "table name is required") } status, stageResp, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables", namespace)), map[string]any{ "name": tableName, "stage-create": true, }) if err != nil { t.Fatalf("Stage create request failed: %v", err) } if status != http.StatusOK { t.Fatalf("Stage create status = %d, want 200", status) } stageLocation, _ := stageResp["metadata-location"].(string) if !strings.HasSuffix(stageLocation, "/metadata/v1.metadata.json") { t.Fatalf("stage metadata-location = %q, want suffix /metadata/v1.metadata.json", stageLocation) } status, _, err = doIcebergJSONRequest(env, http.MethodGet, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), nil) if err != nil { t.Fatalf("Load staged table request failed: %v", err) } if status != http.StatusNotFound { t.Fatalf("Load staged table status = %d, want 404", status) } status, commitResp, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), map[string]any{ "requirements": []map[string]any{ {"type": "assert-create"}, }, "updates": []any{}, }) if err != nil { t.Fatalf("Finalize commit request failed: %v", err) } if status != http.StatusOK { t.Fatalf("Finalize commit status = %d, want 200", status) } commitLocation, _ := commitResp["metadata-location"].(string) if !strings.HasSuffix(commitLocation, "/metadata/v1.metadata.json") { t.Fatalf("final metadata-location = %q, want suffix /metadata/v1.metadata.json", commitLocation) } status, loadResp, err := doIcebergJSONRequest(env, http.MethodGet, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), nil) if err != nil { t.Fatalf("Load finalized table request failed: %v", err) } if status != http.StatusOK { t.Fatalf("Load finalized table status = %d, want 200", status) } loadLocation, _ := loadResp["metadata-location"].(string) if loadLocation != commitLocation { t.Fatalf("loaded metadata-location = %q, want %q", loadLocation, commitLocation) } } // TestCommitMissingTableWithoutAssertCreate ensures missing-table commits still require assert-create for creation. func TestCommitMissingTableWithoutAssertCreate(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } env := sharedEnv bucketName := "warehouse-missing-" + randomSuffix() createTableBucket(t, env, bucketName) namespace := "stage_missing_assert_ns_" + randomSuffix() tableName := "missing_table" status, _, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, "/v1/namespaces"), map[string]any{ "namespace": []string{namespace}, }) if err != nil { t.Fatalf("Create namespace request failed: %v", err) } if status != http.StatusOK && status != http.StatusConflict { t.Fatalf("Create namespace status = %d, want 200 or 409", status) } status, _, err = doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), map[string]any{ "requirements": []any{}, "updates": []any{}, }) if err != nil { t.Fatalf("Commit missing table request failed: %v", err) } if status != http.StatusNotFound { t.Fatalf("Commit missing table status = %d, want 404", status) } } // doIcebergJSONRequest decodes JSON object responses used by catalog tests. func doIcebergJSONRequest(env *TestEnvironment, method, path string, payload any) (int, map[string]any, error) { url := env.IcebergURL() + path var bodyReader io.Reader if payload != nil { data, err := json.Marshal(payload) if err != nil { return 0, nil, err } bodyReader = bytes.NewReader(data) } req, err := http.NewRequest(method, url, bodyReader) if err != nil { return 0, nil, err } if payload != nil { req.Header.Set("Content-Type", "application/json") } resp, err := http.DefaultClient.Do(req) if err != nil { return 0, nil, err } defer resp.Body.Close() data, err := io.ReadAll(resp.Body) if err != nil { return resp.StatusCode, nil, err } if len(data) == 0 { return resp.StatusCode, nil, nil } var decoded map[string]any if err := json.Unmarshal(data, &decoded); err != nil { return resp.StatusCode, nil, fmt.Errorf("failed to decode %s %s response: %w body=%s", method, path, err, string(data)) } return resp.StatusCode, decoded, nil } // icebergPath inserts the table bucket prefix into Iceberg REST API paths. // For example, "/v1/namespaces" with prefix "my-bucket" becomes // "/v1/my-bucket/namespaces". func icebergPath(prefix, path string) string { if prefix == "" { return path } const base = "/v1/" if !strings.HasPrefix(path, base) { return path } withPrefix := base + prefix + "/" + strings.TrimPrefix(path, base) return withPrefix } // createTableBucket creates a table bucket via the S3Tables REST API func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { t.Helper() // Use S3Tables REST API to create the bucket endpoint := fmt.Sprintf("http://localhost:%d/buckets", env.s3Port) reqBody := fmt.Sprintf(`{"name":"%s"}`, bucketName) req, err := http.NewRequest(http.MethodPut, endpoint, strings.NewReader(reqBody)) if err != nil { t.Fatalf("Failed to create request: %v", err) } req.Header.Set("Content-Type", "application/x-amz-json-1.1") resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("Failed to create table bucket %s: %v", bucketName, err) } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) t.Logf("Create table bucket %s response: status=%d, body=%s", bucketName, resp.StatusCode, string(body)) if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusConflict { t.Fatalf("Failed to create table bucket %s, status %d: %s", bucketName, resp.StatusCode, body) } t.Logf("Created table bucket %s", bucketName) } // randomSuffix returns a short random hex suffix for unique resource naming. func randomSuffix() string { return fmt.Sprintf("%x", time.Now().UnixNano()&0xffffffff) } // TestDuckDBIntegration tests Iceberg catalog operations using DuckDB // This test requires Docker to be available func TestDuckDBIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } env := sharedEnv if !env.dockerAvailable { t.Skip("Docker not available, skipping DuckDB integration test") } // Create a temporary SQL file for DuckDB to execute sqlFile := filepath.Join(env.dataDir, "test.sql") sqlContent := fmt.Sprintf(` -- Install and load Iceberg extension INSTALL iceberg; LOAD iceberg; -- List namespaces via Iceberg REST catalog (basic connectivity test) -- Note: Full operations require setting up S3 credentials which is beyond this test SELECT 'Iceberg extension loaded successfully' as result; `) if err := os.WriteFile(sqlFile, []byte(sqlContent), 0644); err != nil { t.Fatalf("Failed to write SQL file: %v", err) } // Run DuckDB in Docker to test Iceberg connectivity // Use host.docker.internal to connect to the host's Iceberg port cmd := exec.Command("docker", "run", "--rm", "-v", fmt.Sprintf("%s:/test", env.dataDir), "--add-host", "host.docker.internal:host-gateway", "--entrypoint", "duckdb", "duckdb/duckdb:latest", "-init", "/test/test.sql", "-c", "SELECT 1", ) output, err := cmd.CombinedOutput() if err != nil { t.Logf("DuckDB output: %s", output) // Check for expected errors in certain CI environments outputStr := string(output) if strings.Contains(outputStr, "iceberg extension is not available") || strings.Contains(outputStr, "Failed to load") { t.Skip("Skipping DuckDB test: Iceberg extension not available in Docker image") } // Any other error is unexpected t.Fatalf("DuckDB command failed unexpectedly. Output: %s\nError: %v", output, err) } // Verify the test completed successfully outputStr := string(output) t.Logf("DuckDB output: %s", outputStr) if !strings.Contains(outputStr, "Iceberg extension loaded successfully") { t.Errorf("Expected success message in output, got: %s", outputStr) } }