Files
seaweedFS/test/s3tables/catalog/iceberg_catalog_test.go
2026-02-11 12:23:35 -08:00

584 lines
17 KiB
Go

// Package catalog provides integration tests for the Iceberg REST Catalog API.
// These tests use DuckDB running in Docker to verify catalog operations.
package catalog
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
)
// TestEnvironment contains the test environment configuration
type TestEnvironment struct {
seaweedDir string
weedBinary string
dataDir string
s3Port int
s3GrpcPort int
icebergPort int
masterPort int
masterGrpcPort int
filerPort int
filerGrpcPort int
volumePort int
volumeGrpcPort int
weedProcess *exec.Cmd
weedCancel context.CancelFunc
dockerAvailable bool
}
// hasDocker checks if Docker is available
func hasDocker() bool {
cmd := exec.Command("docker", "version")
return cmd.Run() == nil
}
// getFreePort returns an available ephemeral port and its listener
func getFreePort() (int, net.Listener, error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return 0, nil, err
}
addr := listener.Addr().(*net.TCPAddr)
return addr.Port, listener, nil
}
// NewTestEnvironment creates a new test environment
func NewTestEnvironment(t *testing.T) *TestEnvironment {
t.Helper()
// Find the SeaweedFS root directory
wd, err := os.Getwd()
if err != nil {
t.Fatalf("Failed to get working directory: %v", err)
}
// Navigate up to find the SeaweedFS root (contains go.mod)
seaweedDir := wd
for i := 0; i < 5; i++ {
if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil {
break
}
seaweedDir = filepath.Dir(seaweedDir)
}
// Check for weed binary
weedBinary := filepath.Join(seaweedDir, "weed", "weed")
if _, err := os.Stat(weedBinary); os.IsNotExist(err) {
// Try system PATH
weedBinary = "weed"
if _, err := exec.LookPath(weedBinary); err != nil {
t.Skip("weed binary not found, skipping integration test")
}
}
// Create temporary data directory
dataDir, err := os.MkdirTemp("", "seaweed-iceberg-test-*")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
// Allocate free ephemeral ports for each service
var listeners []net.Listener
defer func() {
for _, l := range listeners {
l.Close()
}
}()
var l net.Listener
s3Port, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for S3: %v", err)
}
listeners = append(listeners, l)
icebergPort, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for Iceberg: %v", err)
}
listeners = append(listeners, l)
s3GrpcPort, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for S3 gRPC: %v", err)
}
listeners = append(listeners, l)
masterPort, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for Master: %v", err)
}
listeners = append(listeners, l)
masterGrpcPort, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for Master gRPC: %v", err)
}
listeners = append(listeners, l)
filerPort, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for Filer: %v", err)
}
listeners = append(listeners, l)
filerGrpcPort, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for Filer gRPC: %v", err)
}
listeners = append(listeners, l)
volumePort, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for Volume: %v", err)
}
listeners = append(listeners, l)
volumeGrpcPort, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for Volume gRPC: %v", err)
}
listeners = append(listeners, l)
return &TestEnvironment{
seaweedDir: seaweedDir,
weedBinary: weedBinary,
dataDir: dataDir,
s3Port: s3Port,
s3GrpcPort: s3GrpcPort,
icebergPort: icebergPort,
masterPort: masterPort,
masterGrpcPort: masterGrpcPort,
filerPort: filerPort,
filerGrpcPort: filerGrpcPort,
volumePort: volumePort,
volumeGrpcPort: volumeGrpcPort,
dockerAvailable: hasDocker(),
}
}
// StartSeaweedFS starts a SeaweedFS mini cluster
func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
env.weedCancel = cancel
masterDir := filepath.Join(env.dataDir, "master")
filerDir := filepath.Join(env.dataDir, "filer")
volumeDir := filepath.Join(env.dataDir, "volume")
for _, dir := range []string{masterDir, filerDir, volumeDir} {
if err := os.MkdirAll(dir, 0755); err != nil {
t.Fatalf("Failed to create directory %s: %v", dir, err)
}
}
cmd := exec.CommandContext(ctx, env.weedBinary, "mini",
"-master.port", fmt.Sprintf("%d", env.masterPort),
"-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort),
"-volume.port", fmt.Sprintf("%d", env.volumePort),
"-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort),
"-filer.port", fmt.Sprintf("%d", env.filerPort),
"-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort),
"-s3.port", fmt.Sprintf("%d", env.s3Port),
"-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort),
"-s3.port.iceberg", fmt.Sprintf("%d", env.icebergPort),
"-ip.bind", "0.0.0.0",
"-dir", env.dataDir,
)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
t.Fatalf("Failed to start SeaweedFS: %v", err)
}
env.weedProcess = cmd
// Wait for services to be ready
if !env.waitForService(fmt.Sprintf("http://127.0.0.1:%d/v1/config", env.icebergPort), 30*time.Second) {
t.Fatalf("Iceberg REST API did not become ready")
}
}
// waitForService waits for a service to become available
func (env *TestEnvironment) waitForService(url string, timeout time.Duration) bool {
client := &http.Client{Timeout: 2 * time.Second}
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
resp, err := client.Get(url)
if err == nil {
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return true
}
}
time.Sleep(500 * time.Millisecond)
}
return false
}
// Cleanup stops SeaweedFS and cleans up resources
func (env *TestEnvironment) Cleanup(t *testing.T) {
t.Helper()
if env.weedCancel != nil {
env.weedCancel()
}
if env.weedProcess != nil {
// Give process time to shut down gracefully
time.Sleep(2 * time.Second)
env.weedProcess.Wait()
}
if env.dataDir != "" {
os.RemoveAll(env.dataDir)
}
}
// IcebergURL returns the Iceberg REST Catalog URL
func (env *TestEnvironment) IcebergURL() string {
return fmt.Sprintf("http://127.0.0.1:%d", env.icebergPort)
}
// TestIcebergConfig tests the /v1/config endpoint
func TestIcebergConfig(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
env := NewTestEnvironment(t)
defer env.Cleanup(t)
env.StartSeaweedFS(t)
// Test GET /v1/config
resp, err := http.Get(env.IcebergURL() + "/v1/config")
if err != nil {
t.Fatalf("Failed to get config: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected status 200, got %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Failed to read response body: %v", err)
}
// Verify response contains required fields
bodyStr := string(body)
if !strings.Contains(bodyStr, "defaults") || !strings.Contains(bodyStr, "overrides") {
t.Errorf("Config response missing required fields: %s", bodyStr)
}
}
// TestIcebergNamespaces tests namespace operations
func TestIcebergNamespaces(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
env := NewTestEnvironment(t)
defer env.Cleanup(t)
env.StartSeaweedFS(t)
// Create the default table bucket first via S3
createTableBucket(t, env, "warehouse")
// Test GET /v1/namespaces (should return empty list initially)
resp, err := http.Get(env.IcebergURL() + "/v1/namespaces")
if err != nil {
t.Fatalf("Failed to list namespaces: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("Expected status 200, got %d: %s", resp.StatusCode, body)
}
}
// TestStageCreateAndFinalizeFlow verifies staged create remains invisible until assert-create commit finalizes table creation.
func TestStageCreateAndFinalizeFlow(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
env := NewTestEnvironment(t)
defer env.Cleanup(t)
env.StartSeaweedFS(t)
createTableBucket(t, env, "warehouse")
namespace := "stage_ns"
tableName := "orders"
status, _, err := doIcebergJSONRequest(env, http.MethodPost, "/v1/namespaces", map[string]any{
"namespace": []string{namespace},
})
if err != nil {
t.Fatalf("Create namespace request failed: %v", err)
}
if status != http.StatusOK && status != http.StatusConflict {
t.Fatalf("Create namespace status = %d, want 200 or 409", status)
}
status, badReqResp, err := doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables", namespace), map[string]any{
"stage-create": true,
})
if err != nil {
t.Fatalf("Stage create missing-name request failed: %v", err)
}
if status != http.StatusBadRequest {
t.Fatalf("Stage create missing-name status = %d, want 400", status)
}
errorObj, _ := badReqResp["error"].(map[string]any)
if got := errorObj["type"]; got != "BadRequestException" {
t.Fatalf("error.type = %v, want BadRequestException", got)
}
msg, _ := errorObj["message"].(string)
if !strings.Contains(strings.ToLower(msg), "table name is required") {
t.Fatalf("error.message = %v, want it to include %q", errorObj["message"], "table name is required")
}
status, stageResp, err := doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables", namespace), map[string]any{
"name": tableName,
"stage-create": true,
})
if err != nil {
t.Fatalf("Stage create request failed: %v", err)
}
if status != http.StatusOK {
t.Fatalf("Stage create status = %d, want 200", status)
}
stageLocation, _ := stageResp["metadata-location"].(string)
if !strings.HasSuffix(stageLocation, "/metadata/v1.metadata.json") {
t.Fatalf("stage metadata-location = %q, want suffix /metadata/v1.metadata.json", stageLocation)
}
status, _, err = doIcebergJSONRequest(env, http.MethodGet, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), nil)
if err != nil {
t.Fatalf("Load staged table request failed: %v", err)
}
if status != http.StatusNotFound {
t.Fatalf("Load staged table status = %d, want 404", status)
}
status, commitResp, err := doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), map[string]any{
"requirements": []map[string]any{
{"type": "assert-create"},
},
"updates": []any{},
})
if err != nil {
t.Fatalf("Finalize commit request failed: %v", err)
}
if status != http.StatusOK {
t.Fatalf("Finalize commit status = %d, want 200", status)
}
commitLocation, _ := commitResp["metadata-location"].(string)
if !strings.HasSuffix(commitLocation, "/metadata/v1.metadata.json") {
t.Fatalf("final metadata-location = %q, want suffix /metadata/v1.metadata.json", commitLocation)
}
status, loadResp, err := doIcebergJSONRequest(env, http.MethodGet, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), nil)
if err != nil {
t.Fatalf("Load finalized table request failed: %v", err)
}
if status != http.StatusOK {
t.Fatalf("Load finalized table status = %d, want 200", status)
}
loadLocation, _ := loadResp["metadata-location"].(string)
if loadLocation != commitLocation {
t.Fatalf("loaded metadata-location = %q, want %q", loadLocation, commitLocation)
}
}
// TestCommitMissingTableWithoutAssertCreate ensures missing-table commits still require assert-create for creation.
func TestCommitMissingTableWithoutAssertCreate(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
env := NewTestEnvironment(t)
defer env.Cleanup(t)
env.StartSeaweedFS(t)
createTableBucket(t, env, "warehouse")
namespace := "stage_missing_assert_ns"
tableName := "missing_table"
status, _, err := doIcebergJSONRequest(env, http.MethodPost, "/v1/namespaces", map[string]any{
"namespace": []string{namespace},
})
if err != nil {
t.Fatalf("Create namespace request failed: %v", err)
}
if status != http.StatusOK && status != http.StatusConflict {
t.Fatalf("Create namespace status = %d, want 200 or 409", status)
}
status, _, err = doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), map[string]any{
"requirements": []any{},
"updates": []any{},
})
if err != nil {
t.Fatalf("Commit missing table request failed: %v", err)
}
if status != http.StatusNotFound {
t.Fatalf("Commit missing table status = %d, want 404", status)
}
}
// doIcebergJSONRequest decodes JSON object responses used by catalog tests.
func doIcebergJSONRequest(env *TestEnvironment, method, path string, payload any) (int, map[string]any, error) {
url := env.IcebergURL() + path
var bodyReader io.Reader
if payload != nil {
data, err := json.Marshal(payload)
if err != nil {
return 0, nil, err
}
bodyReader = bytes.NewReader(data)
}
req, err := http.NewRequest(method, url, bodyReader)
if err != nil {
return 0, nil, err
}
if payload != nil {
req.Header.Set("Content-Type", "application/json")
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return 0, nil, err
}
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
if err != nil {
return resp.StatusCode, nil, err
}
if len(data) == 0 {
return resp.StatusCode, nil, nil
}
var decoded map[string]any
if err := json.Unmarshal(data, &decoded); err != nil {
return resp.StatusCode, nil, fmt.Errorf("failed to decode %s %s response: %w body=%s", method, path, err, string(data))
}
return resp.StatusCode, decoded, nil
}
// createTableBucket creates a table bucket via the S3Tables REST API
func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) {
t.Helper()
// Use S3Tables REST API to create the bucket
endpoint := fmt.Sprintf("http://localhost:%d/buckets", env.s3Port)
reqBody := fmt.Sprintf(`{"name":"%s"}`, bucketName)
req, err := http.NewRequest(http.MethodPut, endpoint, strings.NewReader(reqBody))
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Content-Type", "application/x-amz-json-1.1")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to create table bucket %s: %v", bucketName, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusConflict {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("Failed to create table bucket %s, status %d: %s", bucketName, resp.StatusCode, body)
}
t.Logf("Created table bucket %s", bucketName)
}
// TestDuckDBIntegration tests Iceberg catalog operations using DuckDB
// This test requires Docker to be available
func TestDuckDBIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
env := NewTestEnvironment(t)
defer env.Cleanup(t)
if !env.dockerAvailable {
t.Skip("Docker not available, skipping DuckDB integration test")
}
env.StartSeaweedFS(t)
// Create a temporary SQL file for DuckDB to execute
sqlFile := filepath.Join(env.dataDir, "test.sql")
sqlContent := fmt.Sprintf(`
-- Install and load Iceberg extension
INSTALL iceberg;
LOAD iceberg;
-- List namespaces via Iceberg REST catalog (basic connectivity test)
-- Note: Full operations require setting up S3 credentials which is beyond this test
SELECT 'Iceberg extension loaded successfully' as result;
`)
if err := os.WriteFile(sqlFile, []byte(sqlContent), 0644); err != nil {
t.Fatalf("Failed to write SQL file: %v", err)
}
// Run DuckDB in Docker to test Iceberg connectivity
// Use host.docker.internal to connect to the host's Iceberg port
cmd := exec.Command("docker", "run", "--rm",
"-v", fmt.Sprintf("%s:/test", env.dataDir),
"--add-host", "host.docker.internal:host-gateway",
"--entrypoint", "duckdb",
"duckdb/duckdb:latest",
"-init", "/test/test.sql",
"-c", "SELECT 1",
)
output, err := cmd.CombinedOutput()
if err != nil {
t.Logf("DuckDB output: %s", output)
// Check for expected errors in certain CI environments
outputStr := string(output)
if strings.Contains(outputStr, "iceberg extension is not available") ||
strings.Contains(outputStr, "Failed to load") {
t.Skip("Skipping DuckDB test: Iceberg extension not available in Docker image")
}
// Any other error is unexpected
t.Fatalf("DuckDB command failed unexpectedly. Output: %s\nError: %v", output, err)
}
// Verify the test completed successfully
outputStr := string(output)
t.Logf("DuckDB output: %s", outputStr)
if !strings.Contains(outputStr, "Iceberg extension loaded successfully") {
t.Errorf("Expected success message in output, got: %s", outputStr)
}
}