Files
seaweedFS/test/s3tables/catalog/iceberg_catalog_test.go
Chris Lu 36c469e34e Enforce IAM for S3 Tables bucket creation (#8388)
* Enforce IAM for s3tables bucket creation

* Prefer IAM path when policies exist

* Ensure IAM enforcement honors default allow

* address comments

* Reused the precomputed principal when setting tableBucketMetadata.OwnerAccountID, avoiding the redundant getAccountID call.

* get identity

* fix

* dedup

* fix

* comments

* fix tests

* update iam config

* go fmt

* fix ports

* fix flags

* mini clean shutdown

* Revert "update iam config"

This reverts commit ca48fdbb0afa45657823d98657556c0bbf24f239.

Revert "mini clean shutdown"

This reverts commit 9e17f6baffd5dd7cc404d831d18dd618b9fe5049.

Revert "fix flags"

This reverts commit e9e7b29d2f77ee5cb82147d50621255410695ee3.

Revert "go fmt"

This reverts commit bd3241960b1d9484b7900190773b0ecb3f762c9a.

* test/s3tables: share single weed mini per test package via TestMain

Previously each top-level test function in the catalog and s3tables
package started and stopped its own weed mini instance. This caused
failures when a prior instance wasn't cleanly stopped before the next
one started (port conflicts, leaked global state).

Changes:
- catalog/iceberg_catalog_test.go: introduce TestMain that starts one
  shared TestEnvironment (external weed binary) before all tests and
  tears it down after. All individual test functions now use sharedEnv.
  Added randomSuffix() for unique resource names across tests.
- catalog/pyiceberg_test.go: updated to use sharedEnv instead of
  per-test environments.
- catalog/pyiceberg_test_helpers.go -> pyiceberg_test_helpers_test.go:
  renamed to a _test.go file so it can access TestEnvironment which is
  defined in a test file.
- table-buckets/setup.go: add package-level sharedCluster variable.
- table-buckets/s3tables_integration_test.go: introduce TestMain that
  starts one shared TestCluster before all tests. TestS3TablesIntegration
  now uses sharedCluster. Extract startMiniClusterInDir (no *testing.T)
  for TestMain use. TestS3TablesCreateBucketIAMPolicy keeps its own
  cluster (different IAM config). Remove miniClusterMutex (no longer
  needed). Fix Stop() to not panic when t is nil."

* delete

* parse

* default allow should work with anonymous

* fix port

* iceberg route

The failures are from Iceberg REST using the default bucket warehouse when no prefix is provided. Your tests create random buckets, so /v1/namespaces was looking in warehouse and failing. I updated the tests to use the prefixed Iceberg routes (/v1/{bucket}/...) via a small helper.

* test(s3tables): fix port conflicts and IAM ARN matching in integration tests

- Pass -master.dir explicitly to prevent filer store directory collision
  between shared cluster and per-test clusters running in the same process
- Pass -volume.port.public and -volume.publicUrl to prevent the global
  publicPort flag (mutated from 0 → concrete port by first cluster) from
  being reused by a second cluster, causing 'address already in use'
- Remove the flag-reset loop in Stop() that reset global flag values while
  other goroutines were reading them (race → panic)
- Fix IAM policy Resource ARN in TestS3TablesCreateBucketIAMPolicy to use
  wildcards (arn:aws:s3tables:*:*:bucket/<name>) because the handler
  generates ARNs with its own DefaultRegion (us-east-1) and principal name
  ('admin'), not the test constants testRegion/testAccountID
2026-02-19 22:52:05 -08:00

632 lines
19 KiB
Go

// Package catalog provides integration tests for the Iceberg REST Catalog API.
// These tests use DuckDB running in Docker to verify catalog operations.
package catalog
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
)
// sharedEnv is the single TestEnvironment shared across all tests in this package.
var sharedEnv *TestEnvironment
// TestMain starts one weed mini instance for the whole package and tears it down
// after all tests have run.
func TestMain(m *testing.M) {
flag.Parse()
if os.Getenv("SHORT") != "" || testing.Short() {
// Let tests self-skip when run with -short.
os.Exit(m.Run())
}
env, err := newTestEnvironmentForMain()
if err != nil {
fmt.Fprintf(os.Stderr, "SKIP: setup failed: %v\n", err)
os.Exit(0) // Skip all tests rather than fail
}
sharedEnv = env
if startErr := sharedEnv.startSeaweedFSForMain(); startErr != nil {
fmt.Fprintf(os.Stderr, "SKIP: weed mini failed to start: %v\n", startErr)
sharedEnv.cleanupForMain()
os.Exit(0)
}
code := m.Run()
sharedEnv.cleanupForMain()
os.Exit(code)
}
// TestEnvironment contains the test environment configuration
type TestEnvironment struct {
seaweedDir string
weedBinary string
dataDir string
s3Port int
s3GrpcPort int
icebergPort int
masterPort int
masterGrpcPort int
filerPort int
filerGrpcPort int
volumePort int
volumeGrpcPort int
weedProcess *exec.Cmd
weedCancel context.CancelFunc
dockerAvailable bool
}
// hasDocker checks if Docker is available
func hasDocker() bool {
cmd := exec.Command("docker", "version")
return cmd.Run() == nil
}
// getFreePort returns an available ephemeral port and its listener
func getFreePort() (int, net.Listener, error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return 0, nil, err
}
addr := listener.Addr().(*net.TCPAddr)
return addr.Port, listener, nil
}
// newTestEnvironmentForMain creates a TestEnvironment without calling t.Fatalf so it
// can be used from TestMain (which has no *testing.T).
func newTestEnvironmentForMain() (*TestEnvironment, error) {
// Find the SeaweedFS root directory
wd, err := os.Getwd()
if err != nil {
return nil, fmt.Errorf("get working directory: %w", err)
}
seaweedDir := wd
for i := 0; i < 5; i++ {
if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil {
break
}
seaweedDir = filepath.Dir(seaweedDir)
}
// Check for weed binary
weedBinary := filepath.Join(seaweedDir, "weed", "weed")
if _, err := os.Stat(weedBinary); os.IsNotExist(err) {
weedBinary = "weed"
if _, err := exec.LookPath(weedBinary); err != nil {
return nil, fmt.Errorf("weed binary not found")
}
}
// Create temporary data directory
dataDir, err := os.MkdirTemp("", "seaweed-iceberg-test-*")
if err != nil {
return nil, fmt.Errorf("create temp dir: %w", err)
}
// Allocate free ephemeral ports for each service
var listeners []net.Listener
closeListeners := func() {
for _, l := range listeners {
l.Close()
}
}
var l net.Listener
s3Port, l, err := getFreePort()
if err != nil {
closeListeners()
return nil, fmt.Errorf("get free port for S3: %w", err)
}
listeners = append(listeners, l)
icebergPort, l, err := getFreePort()
if err != nil {
closeListeners()
return nil, fmt.Errorf("get free port for Iceberg: %w", err)
}
listeners = append(listeners, l)
s3GrpcPort, l, err := getFreePort()
if err != nil {
closeListeners()
return nil, fmt.Errorf("get free port for S3 gRPC: %w", err)
}
listeners = append(listeners, l)
masterPort, l, err := getFreePort()
if err != nil {
closeListeners()
return nil, fmt.Errorf("get free port for Master: %w", err)
}
listeners = append(listeners, l)
masterGrpcPort, l, err := getFreePort()
if err != nil {
closeListeners()
return nil, fmt.Errorf("get free port for Master gRPC: %w", err)
}
listeners = append(listeners, l)
filerPort, l, err := getFreePort()
if err != nil {
closeListeners()
return nil, fmt.Errorf("get free port for Filer: %w", err)
}
listeners = append(listeners, l)
filerGrpcPort, l, err := getFreePort()
if err != nil {
closeListeners()
return nil, fmt.Errorf("get free port for Filer gRPC: %w", err)
}
listeners = append(listeners, l)
volumePort, l, err := getFreePort()
if err != nil {
closeListeners()
return nil, fmt.Errorf("get free port for Volume: %w", err)
}
listeners = append(listeners, l)
volumeGrpcPort, l, err := getFreePort()
if err != nil {
closeListeners()
return nil, fmt.Errorf("get free port for Volume gRPC: %w", err)
}
listeners = append(listeners, l)
// Release the port reservations so weed mini can bind to them
closeListeners()
return &TestEnvironment{
seaweedDir: seaweedDir,
weedBinary: weedBinary,
dataDir: dataDir,
s3Port: s3Port,
s3GrpcPort: s3GrpcPort,
icebergPort: icebergPort,
masterPort: masterPort,
masterGrpcPort: masterGrpcPort,
filerPort: filerPort,
filerGrpcPort: filerGrpcPort,
volumePort: volumePort,
volumeGrpcPort: volumeGrpcPort,
dockerAvailable: hasDocker(),
}, nil
}
// startSeaweedFSForMain starts weed mini without a *testing.T (for use in TestMain).
func (env *TestEnvironment) startSeaweedFSForMain() error {
ctx, cancel := context.WithCancel(context.Background())
env.weedCancel = cancel
masterDir := filepath.Join(env.dataDir, "master")
filerDir := filepath.Join(env.dataDir, "filer")
volumeDir := filepath.Join(env.dataDir, "volume")
for _, dir := range []string{masterDir, filerDir, volumeDir} {
if err := os.MkdirAll(dir, 0755); err != nil {
cancel()
return fmt.Errorf("create directory %s: %w", dir, err)
}
}
cmd := exec.CommandContext(ctx, env.weedBinary, "mini",
"-master.port", fmt.Sprintf("%d", env.masterPort),
"-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort),
"-volume.port", fmt.Sprintf("%d", env.volumePort),
"-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort),
"-filer.port", fmt.Sprintf("%d", env.filerPort),
"-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort),
"-s3.port", fmt.Sprintf("%d", env.s3Port),
"-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort),
"-s3.port.iceberg", fmt.Sprintf("%d", env.icebergPort),
"-ip.bind", "0.0.0.0",
"-dir", env.dataDir,
)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
cancel()
return fmt.Errorf("start SeaweedFS: %w", err)
}
env.weedProcess = cmd
if !env.waitForService(fmt.Sprintf("http://127.0.0.1:%d/v1/config", env.icebergPort), 30*time.Second) {
cancel()
cmd.Wait()
return fmt.Errorf("Iceberg REST API did not become ready")
}
return nil
}
// cleanupForMain stops SeaweedFS and cleans up resources (no *testing.T needed).
func (env *TestEnvironment) cleanupForMain() {
if env.weedCancel != nil {
env.weedCancel()
}
if env.weedProcess != nil {
time.Sleep(2 * time.Second)
env.weedProcess.Wait()
}
if env.dataDir != "" {
os.RemoveAll(env.dataDir)
}
}
// waitForService waits for a service to become available
func (env *TestEnvironment) waitForService(url string, timeout time.Duration) bool {
client := &http.Client{Timeout: 2 * time.Second}
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
resp, err := client.Get(url)
if err == nil {
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return true
}
}
time.Sleep(500 * time.Millisecond)
}
return false
}
// IcebergURL returns the Iceberg REST Catalog URL
func (env *TestEnvironment) IcebergURL() string {
return fmt.Sprintf("http://127.0.0.1:%d", env.icebergPort)
}
// TestIcebergConfig tests the /v1/config endpoint
func TestIcebergConfig(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
env := sharedEnv
// Test GET /v1/config
resp, err := http.Get(env.IcebergURL() + "/v1/config")
if err != nil {
t.Fatalf("Failed to get config: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Errorf("Expected status 200, got %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Failed to read response body: %v", err)
}
// Verify response contains required fields
bodyStr := string(body)
if !strings.Contains(bodyStr, "defaults") || !strings.Contains(bodyStr, "overrides") {
t.Errorf("Config response missing required fields: %s", bodyStr)
}
}
// TestIcebergNamespaces tests namespace operations
func TestIcebergNamespaces(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
env := sharedEnv
// Create the default table bucket first via S3
bucketName := "warehouse-ns-" + randomSuffix()
createTableBucket(t, env, bucketName)
// Test GET /v1/namespaces (should return empty list initially)
resp, err := http.Get(env.IcebergURL() + icebergPath(bucketName, "/v1/namespaces"))
if err != nil {
t.Fatalf("Failed to list namespaces: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("Expected status 200, got %d: %s", resp.StatusCode, body)
}
}
// TestStageCreateAndFinalizeFlow verifies staged create remains invisible until assert-create commit finalizes table creation.
func TestStageCreateAndFinalizeFlow(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
env := sharedEnv
bucketName := "warehouse-stage-" + randomSuffix()
createTableBucket(t, env, bucketName)
namespace := "stage_ns_" + randomSuffix()
tableName := "orders"
status, _, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, "/v1/namespaces"), map[string]any{
"namespace": []string{namespace},
})
if err != nil {
t.Fatalf("Create namespace request failed: %v", err)
}
if status != http.StatusOK && status != http.StatusConflict {
t.Fatalf("Create namespace status = %d, want 200 or 409", status)
}
status, badReqResp, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables", namespace)), map[string]any{
"stage-create": true,
})
if err != nil {
t.Fatalf("Stage create missing-name request failed: %v", err)
}
if status != http.StatusBadRequest {
t.Fatalf("Stage create missing-name status = %d, want 400", status)
}
errorObj, _ := badReqResp["error"].(map[string]any)
if got := errorObj["type"]; got != "BadRequestException" {
t.Fatalf("error.type = %v, want BadRequestException", got)
}
msg, _ := errorObj["message"].(string)
if !strings.Contains(strings.ToLower(msg), "table name is required") {
t.Fatalf("error.message = %v, want it to include %q", errorObj["message"], "table name is required")
}
status, stageResp, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables", namespace)), map[string]any{
"name": tableName,
"stage-create": true,
})
if err != nil {
t.Fatalf("Stage create request failed: %v", err)
}
if status != http.StatusOK {
t.Fatalf("Stage create status = %d, want 200", status)
}
stageLocation, _ := stageResp["metadata-location"].(string)
if !strings.HasSuffix(stageLocation, "/metadata/v1.metadata.json") {
t.Fatalf("stage metadata-location = %q, want suffix /metadata/v1.metadata.json", stageLocation)
}
status, _, err = doIcebergJSONRequest(env, http.MethodGet, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), nil)
if err != nil {
t.Fatalf("Load staged table request failed: %v", err)
}
if status != http.StatusNotFound {
t.Fatalf("Load staged table status = %d, want 404", status)
}
status, commitResp, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), map[string]any{
"requirements": []map[string]any{
{"type": "assert-create"},
},
"updates": []any{},
})
if err != nil {
t.Fatalf("Finalize commit request failed: %v", err)
}
if status != http.StatusOK {
t.Fatalf("Finalize commit status = %d, want 200", status)
}
commitLocation, _ := commitResp["metadata-location"].(string)
if !strings.HasSuffix(commitLocation, "/metadata/v1.metadata.json") {
t.Fatalf("final metadata-location = %q, want suffix /metadata/v1.metadata.json", commitLocation)
}
status, loadResp, err := doIcebergJSONRequest(env, http.MethodGet, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), nil)
if err != nil {
t.Fatalf("Load finalized table request failed: %v", err)
}
if status != http.StatusOK {
t.Fatalf("Load finalized table status = %d, want 200", status)
}
loadLocation, _ := loadResp["metadata-location"].(string)
if loadLocation != commitLocation {
t.Fatalf("loaded metadata-location = %q, want %q", loadLocation, commitLocation)
}
}
// TestCommitMissingTableWithoutAssertCreate ensures missing-table commits still require assert-create for creation.
func TestCommitMissingTableWithoutAssertCreate(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
env := sharedEnv
bucketName := "warehouse-missing-" + randomSuffix()
createTableBucket(t, env, bucketName)
namespace := "stage_missing_assert_ns_" + randomSuffix()
tableName := "missing_table"
status, _, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, "/v1/namespaces"), map[string]any{
"namespace": []string{namespace},
})
if err != nil {
t.Fatalf("Create namespace request failed: %v", err)
}
if status != http.StatusOK && status != http.StatusConflict {
t.Fatalf("Create namespace status = %d, want 200 or 409", status)
}
status, _, err = doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), map[string]any{
"requirements": []any{},
"updates": []any{},
})
if err != nil {
t.Fatalf("Commit missing table request failed: %v", err)
}
if status != http.StatusNotFound {
t.Fatalf("Commit missing table status = %d, want 404", status)
}
}
// doIcebergJSONRequest decodes JSON object responses used by catalog tests.
func doIcebergJSONRequest(env *TestEnvironment, method, path string, payload any) (int, map[string]any, error) {
url := env.IcebergURL() + path
var bodyReader io.Reader
if payload != nil {
data, err := json.Marshal(payload)
if err != nil {
return 0, nil, err
}
bodyReader = bytes.NewReader(data)
}
req, err := http.NewRequest(method, url, bodyReader)
if err != nil {
return 0, nil, err
}
if payload != nil {
req.Header.Set("Content-Type", "application/json")
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return 0, nil, err
}
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
if err != nil {
return resp.StatusCode, nil, err
}
if len(data) == 0 {
return resp.StatusCode, nil, nil
}
var decoded map[string]any
if err := json.Unmarshal(data, &decoded); err != nil {
return resp.StatusCode, nil, fmt.Errorf("failed to decode %s %s response: %w body=%s", method, path, err, string(data))
}
return resp.StatusCode, decoded, nil
}
// icebergPath inserts the table bucket prefix into Iceberg REST API paths.
// For example, "/v1/namespaces" with prefix "my-bucket" becomes
// "/v1/my-bucket/namespaces".
func icebergPath(prefix, path string) string {
if prefix == "" {
return path
}
const base = "/v1/"
if !strings.HasPrefix(path, base) {
return path
}
withPrefix := base + prefix + "/" + strings.TrimPrefix(path, base)
return withPrefix
}
// createTableBucket creates a table bucket via the S3Tables REST API
func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) {
t.Helper()
// Use S3Tables REST API to create the bucket
endpoint := fmt.Sprintf("http://localhost:%d/buckets", env.s3Port)
reqBody := fmt.Sprintf(`{"name":"%s"}`, bucketName)
req, err := http.NewRequest(http.MethodPut, endpoint, strings.NewReader(reqBody))
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
req.Header.Set("Content-Type", "application/x-amz-json-1.1")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to create table bucket %s: %v", bucketName, err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
t.Logf("Create table bucket %s response: status=%d, body=%s", bucketName, resp.StatusCode, string(body))
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusConflict {
t.Fatalf("Failed to create table bucket %s, status %d: %s", bucketName, resp.StatusCode, body)
}
t.Logf("Created table bucket %s", bucketName)
}
// randomSuffix returns a short random hex suffix for unique resource naming.
func randomSuffix() string {
return fmt.Sprintf("%x", time.Now().UnixNano()&0xffffffff)
}
// TestDuckDBIntegration tests Iceberg catalog operations using DuckDB
// This test requires Docker to be available
func TestDuckDBIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
env := sharedEnv
if !env.dockerAvailable {
t.Skip("Docker not available, skipping DuckDB integration test")
}
// Create a temporary SQL file for DuckDB to execute
sqlFile := filepath.Join(env.dataDir, "test.sql")
sqlContent := fmt.Sprintf(`
-- Install and load Iceberg extension
INSTALL iceberg;
LOAD iceberg;
-- List namespaces via Iceberg REST catalog (basic connectivity test)
-- Note: Full operations require setting up S3 credentials which is beyond this test
SELECT 'Iceberg extension loaded successfully' as result;
`)
if err := os.WriteFile(sqlFile, []byte(sqlContent), 0644); err != nil {
t.Fatalf("Failed to write SQL file: %v", err)
}
// Run DuckDB in Docker to test Iceberg connectivity
// Use host.docker.internal to connect to the host's Iceberg port
cmd := exec.Command("docker", "run", "--rm",
"-v", fmt.Sprintf("%s:/test", env.dataDir),
"--add-host", "host.docker.internal:host-gateway",
"--entrypoint", "duckdb",
"duckdb/duckdb:latest",
"-init", "/test/test.sql",
"-c", "SELECT 1",
)
output, err := cmd.CombinedOutput()
if err != nil {
t.Logf("DuckDB output: %s", output)
// Check for expected errors in certain CI environments
outputStr := string(output)
if strings.Contains(outputStr, "iceberg extension is not available") ||
strings.Contains(outputStr, "Failed to load") {
t.Skip("Skipping DuckDB test: Iceberg extension not available in Docker image")
}
// Any other error is unexpected
t.Fatalf("DuckDB command failed unexpectedly. Output: %s\nError: %v", output, err)
}
// Verify the test completed successfully
outputStr := string(output)
t.Logf("DuckDB output: %s", outputStr)
if !strings.Contains(outputStr, "Iceberg extension loaded successfully") {
t.Errorf("Expected success message in output, got: %s", outputStr)
}
}