* Enforce IAM for s3tables bucket creation
* Prefer IAM path when policies exist
* Ensure IAM enforcement honors default allow
* address comments
* Reused the precomputed principal when setting tableBucketMetadata.OwnerAccountID, avoiding the redundant getAccountID call.
* get identity
* fix
* dedup
* fix
* comments
* fix tests
* update iam config
* go fmt
* fix ports
* fix flags
* mini clean shutdown
* Revert "update iam config"
This reverts commit ca48fdbb0afa45657823d98657556c0bbf24f239.
Revert "mini clean shutdown"
This reverts commit 9e17f6baffd5dd7cc404d831d18dd618b9fe5049.
Revert "fix flags"
This reverts commit e9e7b29d2f77ee5cb82147d50621255410695ee3.
Revert "go fmt"
This reverts commit bd3241960b1d9484b7900190773b0ecb3f762c9a.
* test/s3tables: share single weed mini per test package via TestMain
Previously each top-level test function in the catalog and s3tables
package started and stopped its own weed mini instance. This caused
failures when a prior instance wasn't cleanly stopped before the next
one started (port conflicts, leaked global state).
Changes:
- catalog/iceberg_catalog_test.go: introduce TestMain that starts one
shared TestEnvironment (external weed binary) before all tests and
tears it down after. All individual test functions now use sharedEnv.
Added randomSuffix() for unique resource names across tests.
- catalog/pyiceberg_test.go: updated to use sharedEnv instead of
per-test environments.
- catalog/pyiceberg_test_helpers.go -> pyiceberg_test_helpers_test.go:
renamed to a _test.go file so it can access TestEnvironment which is
defined in a test file.
- table-buckets/setup.go: add package-level sharedCluster variable.
- table-buckets/s3tables_integration_test.go: introduce TestMain that
starts one shared TestCluster before all tests. TestS3TablesIntegration
now uses sharedCluster. Extract startMiniClusterInDir (no *testing.T)
for TestMain use. TestS3TablesCreateBucketIAMPolicy keeps its own
cluster (different IAM config). Remove miniClusterMutex (no longer
needed). Fix Stop() to not panic when t is nil."
* delete
* parse
* default allow should work with anonymous
* fix port
* iceberg route
The failures are from Iceberg REST using the default bucket warehouse when no prefix is provided. Your tests create random buckets, so /v1/namespaces was looking in warehouse and failing. I updated the tests to use the prefixed Iceberg routes (/v1/{bucket}/...) via a small helper.
* test(s3tables): fix port conflicts and IAM ARN matching in integration tests
- Pass -master.dir explicitly to prevent filer store directory collision
between shared cluster and per-test clusters running in the same process
- Pass -volume.port.public and -volume.publicUrl to prevent the global
publicPort flag (mutated from 0 → concrete port by first cluster) from
being reused by a second cluster, causing 'address already in use'
- Remove the flag-reset loop in Stop() that reset global flag values while
other goroutines were reading them (race → panic)
- Fix IAM policy Resource ARN in TestS3TablesCreateBucketIAMPolicy to use
wildcards (arn:aws:s3tables:*:*:bucket/<name>) because the handler
generates ARNs with its own DefaultRegion (us-east-1) and principal name
('admin'), not the test constants testRegion/testAccountID
632 lines
19 KiB
Go
632 lines
19 KiB
Go
// Package catalog provides integration tests for the Iceberg REST Catalog API.
|
|
// These tests use DuckDB running in Docker to verify catalog operations.
|
|
package catalog
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
// sharedEnv is the single TestEnvironment shared across all tests in this package.
|
|
var sharedEnv *TestEnvironment
|
|
|
|
// TestMain starts one weed mini instance for the whole package and tears it down
|
|
// after all tests have run.
|
|
func TestMain(m *testing.M) {
|
|
flag.Parse()
|
|
if os.Getenv("SHORT") != "" || testing.Short() {
|
|
// Let tests self-skip when run with -short.
|
|
os.Exit(m.Run())
|
|
}
|
|
|
|
env, err := newTestEnvironmentForMain()
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "SKIP: setup failed: %v\n", err)
|
|
os.Exit(0) // Skip all tests rather than fail
|
|
}
|
|
sharedEnv = env
|
|
|
|
if startErr := sharedEnv.startSeaweedFSForMain(); startErr != nil {
|
|
fmt.Fprintf(os.Stderr, "SKIP: weed mini failed to start: %v\n", startErr)
|
|
sharedEnv.cleanupForMain()
|
|
os.Exit(0)
|
|
}
|
|
|
|
code := m.Run()
|
|
|
|
sharedEnv.cleanupForMain()
|
|
os.Exit(code)
|
|
}
|
|
|
|
// TestEnvironment contains the test environment configuration
|
|
type TestEnvironment struct {
|
|
seaweedDir string
|
|
weedBinary string
|
|
dataDir string
|
|
s3Port int
|
|
s3GrpcPort int
|
|
icebergPort int
|
|
masterPort int
|
|
masterGrpcPort int
|
|
filerPort int
|
|
filerGrpcPort int
|
|
volumePort int
|
|
volumeGrpcPort int
|
|
weedProcess *exec.Cmd
|
|
weedCancel context.CancelFunc
|
|
dockerAvailable bool
|
|
}
|
|
|
|
// hasDocker checks if Docker is available
|
|
func hasDocker() bool {
|
|
cmd := exec.Command("docker", "version")
|
|
return cmd.Run() == nil
|
|
}
|
|
|
|
// getFreePort returns an available ephemeral port and its listener
|
|
func getFreePort() (int, net.Listener, error) {
|
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
|
|
addr := listener.Addr().(*net.TCPAddr)
|
|
return addr.Port, listener, nil
|
|
}
|
|
|
|
// newTestEnvironmentForMain creates a TestEnvironment without calling t.Fatalf so it
|
|
// can be used from TestMain (which has no *testing.T).
|
|
func newTestEnvironmentForMain() (*TestEnvironment, error) {
|
|
// Find the SeaweedFS root directory
|
|
wd, err := os.Getwd()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get working directory: %w", err)
|
|
}
|
|
|
|
seaweedDir := wd
|
|
for i := 0; i < 5; i++ {
|
|
if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil {
|
|
break
|
|
}
|
|
seaweedDir = filepath.Dir(seaweedDir)
|
|
}
|
|
|
|
// Check for weed binary
|
|
weedBinary := filepath.Join(seaweedDir, "weed", "weed")
|
|
if _, err := os.Stat(weedBinary); os.IsNotExist(err) {
|
|
weedBinary = "weed"
|
|
if _, err := exec.LookPath(weedBinary); err != nil {
|
|
return nil, fmt.Errorf("weed binary not found")
|
|
}
|
|
}
|
|
|
|
// Create temporary data directory
|
|
dataDir, err := os.MkdirTemp("", "seaweed-iceberg-test-*")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create temp dir: %w", err)
|
|
}
|
|
|
|
// Allocate free ephemeral ports for each service
|
|
var listeners []net.Listener
|
|
closeListeners := func() {
|
|
for _, l := range listeners {
|
|
l.Close()
|
|
}
|
|
}
|
|
|
|
var l net.Listener
|
|
s3Port, l, err := getFreePort()
|
|
if err != nil {
|
|
closeListeners()
|
|
return nil, fmt.Errorf("get free port for S3: %w", err)
|
|
}
|
|
listeners = append(listeners, l)
|
|
|
|
icebergPort, l, err := getFreePort()
|
|
if err != nil {
|
|
closeListeners()
|
|
return nil, fmt.Errorf("get free port for Iceberg: %w", err)
|
|
}
|
|
listeners = append(listeners, l)
|
|
|
|
s3GrpcPort, l, err := getFreePort()
|
|
if err != nil {
|
|
closeListeners()
|
|
return nil, fmt.Errorf("get free port for S3 gRPC: %w", err)
|
|
}
|
|
listeners = append(listeners, l)
|
|
|
|
masterPort, l, err := getFreePort()
|
|
if err != nil {
|
|
closeListeners()
|
|
return nil, fmt.Errorf("get free port for Master: %w", err)
|
|
}
|
|
listeners = append(listeners, l)
|
|
|
|
masterGrpcPort, l, err := getFreePort()
|
|
if err != nil {
|
|
closeListeners()
|
|
return nil, fmt.Errorf("get free port for Master gRPC: %w", err)
|
|
}
|
|
listeners = append(listeners, l)
|
|
|
|
filerPort, l, err := getFreePort()
|
|
if err != nil {
|
|
closeListeners()
|
|
return nil, fmt.Errorf("get free port for Filer: %w", err)
|
|
}
|
|
listeners = append(listeners, l)
|
|
|
|
filerGrpcPort, l, err := getFreePort()
|
|
if err != nil {
|
|
closeListeners()
|
|
return nil, fmt.Errorf("get free port for Filer gRPC: %w", err)
|
|
}
|
|
listeners = append(listeners, l)
|
|
|
|
volumePort, l, err := getFreePort()
|
|
if err != nil {
|
|
closeListeners()
|
|
return nil, fmt.Errorf("get free port for Volume: %w", err)
|
|
}
|
|
listeners = append(listeners, l)
|
|
|
|
volumeGrpcPort, l, err := getFreePort()
|
|
if err != nil {
|
|
closeListeners()
|
|
return nil, fmt.Errorf("get free port for Volume gRPC: %w", err)
|
|
}
|
|
listeners = append(listeners, l)
|
|
|
|
// Release the port reservations so weed mini can bind to them
|
|
closeListeners()
|
|
|
|
return &TestEnvironment{
|
|
seaweedDir: seaweedDir,
|
|
weedBinary: weedBinary,
|
|
dataDir: dataDir,
|
|
s3Port: s3Port,
|
|
s3GrpcPort: s3GrpcPort,
|
|
icebergPort: icebergPort,
|
|
masterPort: masterPort,
|
|
masterGrpcPort: masterGrpcPort,
|
|
filerPort: filerPort,
|
|
filerGrpcPort: filerGrpcPort,
|
|
volumePort: volumePort,
|
|
volumeGrpcPort: volumeGrpcPort,
|
|
dockerAvailable: hasDocker(),
|
|
}, nil
|
|
}
|
|
|
|
// startSeaweedFSForMain starts weed mini without a *testing.T (for use in TestMain).
|
|
func (env *TestEnvironment) startSeaweedFSForMain() error {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
env.weedCancel = cancel
|
|
|
|
masterDir := filepath.Join(env.dataDir, "master")
|
|
filerDir := filepath.Join(env.dataDir, "filer")
|
|
volumeDir := filepath.Join(env.dataDir, "volume")
|
|
|
|
for _, dir := range []string{masterDir, filerDir, volumeDir} {
|
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
|
cancel()
|
|
return fmt.Errorf("create directory %s: %w", dir, err)
|
|
}
|
|
}
|
|
|
|
cmd := exec.CommandContext(ctx, env.weedBinary, "mini",
|
|
"-master.port", fmt.Sprintf("%d", env.masterPort),
|
|
"-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort),
|
|
"-volume.port", fmt.Sprintf("%d", env.volumePort),
|
|
"-volume.port.grpc", fmt.Sprintf("%d", env.volumeGrpcPort),
|
|
"-filer.port", fmt.Sprintf("%d", env.filerPort),
|
|
"-filer.port.grpc", fmt.Sprintf("%d", env.filerGrpcPort),
|
|
"-s3.port", fmt.Sprintf("%d", env.s3Port),
|
|
"-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort),
|
|
"-s3.port.iceberg", fmt.Sprintf("%d", env.icebergPort),
|
|
"-ip.bind", "0.0.0.0",
|
|
"-dir", env.dataDir,
|
|
)
|
|
cmd.Stdout = os.Stdout
|
|
cmd.Stderr = os.Stderr
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
cancel()
|
|
return fmt.Errorf("start SeaweedFS: %w", err)
|
|
}
|
|
env.weedProcess = cmd
|
|
|
|
if !env.waitForService(fmt.Sprintf("http://127.0.0.1:%d/v1/config", env.icebergPort), 30*time.Second) {
|
|
cancel()
|
|
cmd.Wait()
|
|
return fmt.Errorf("Iceberg REST API did not become ready")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// cleanupForMain stops SeaweedFS and cleans up resources (no *testing.T needed).
|
|
func (env *TestEnvironment) cleanupForMain() {
|
|
if env.weedCancel != nil {
|
|
env.weedCancel()
|
|
}
|
|
if env.weedProcess != nil {
|
|
time.Sleep(2 * time.Second)
|
|
env.weedProcess.Wait()
|
|
}
|
|
if env.dataDir != "" {
|
|
os.RemoveAll(env.dataDir)
|
|
}
|
|
}
|
|
|
|
// waitForService waits for a service to become available
|
|
func (env *TestEnvironment) waitForService(url string, timeout time.Duration) bool {
|
|
client := &http.Client{Timeout: 2 * time.Second}
|
|
deadline := time.Now().Add(timeout)
|
|
for time.Now().Before(deadline) {
|
|
resp, err := client.Get(url)
|
|
if err == nil {
|
|
resp.Body.Close()
|
|
if resp.StatusCode == http.StatusOK {
|
|
return true
|
|
}
|
|
}
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// IcebergURL returns the Iceberg REST Catalog URL
|
|
func (env *TestEnvironment) IcebergURL() string {
|
|
return fmt.Sprintf("http://127.0.0.1:%d", env.icebergPort)
|
|
}
|
|
|
|
// TestIcebergConfig tests the /v1/config endpoint
|
|
func TestIcebergConfig(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping integration test in short mode")
|
|
}
|
|
|
|
env := sharedEnv
|
|
|
|
// Test GET /v1/config
|
|
resp, err := http.Get(env.IcebergURL() + "/v1/config")
|
|
if err != nil {
|
|
t.Fatalf("Failed to get config: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
t.Errorf("Expected status 200, got %d", resp.StatusCode)
|
|
}
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
t.Fatalf("Failed to read response body: %v", err)
|
|
}
|
|
|
|
// Verify response contains required fields
|
|
bodyStr := string(body)
|
|
if !strings.Contains(bodyStr, "defaults") || !strings.Contains(bodyStr, "overrides") {
|
|
t.Errorf("Config response missing required fields: %s", bodyStr)
|
|
}
|
|
}
|
|
|
|
// TestIcebergNamespaces tests namespace operations
|
|
func TestIcebergNamespaces(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping integration test in short mode")
|
|
}
|
|
|
|
env := sharedEnv
|
|
|
|
// Create the default table bucket first via S3
|
|
bucketName := "warehouse-ns-" + randomSuffix()
|
|
createTableBucket(t, env, bucketName)
|
|
|
|
// Test GET /v1/namespaces (should return empty list initially)
|
|
resp, err := http.Get(env.IcebergURL() + icebergPath(bucketName, "/v1/namespaces"))
|
|
if err != nil {
|
|
t.Fatalf("Failed to list namespaces: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
t.Fatalf("Expected status 200, got %d: %s", resp.StatusCode, body)
|
|
}
|
|
}
|
|
|
|
// TestStageCreateAndFinalizeFlow verifies staged create remains invisible until assert-create commit finalizes table creation.
|
|
func TestStageCreateAndFinalizeFlow(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping integration test in short mode")
|
|
}
|
|
|
|
env := sharedEnv
|
|
bucketName := "warehouse-stage-" + randomSuffix()
|
|
createTableBucket(t, env, bucketName)
|
|
|
|
namespace := "stage_ns_" + randomSuffix()
|
|
tableName := "orders"
|
|
|
|
status, _, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, "/v1/namespaces"), map[string]any{
|
|
"namespace": []string{namespace},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Create namespace request failed: %v", err)
|
|
}
|
|
if status != http.StatusOK && status != http.StatusConflict {
|
|
t.Fatalf("Create namespace status = %d, want 200 or 409", status)
|
|
}
|
|
|
|
status, badReqResp, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables", namespace)), map[string]any{
|
|
"stage-create": true,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Stage create missing-name request failed: %v", err)
|
|
}
|
|
if status != http.StatusBadRequest {
|
|
t.Fatalf("Stage create missing-name status = %d, want 400", status)
|
|
}
|
|
errorObj, _ := badReqResp["error"].(map[string]any)
|
|
if got := errorObj["type"]; got != "BadRequestException" {
|
|
t.Fatalf("error.type = %v, want BadRequestException", got)
|
|
}
|
|
msg, _ := errorObj["message"].(string)
|
|
if !strings.Contains(strings.ToLower(msg), "table name is required") {
|
|
t.Fatalf("error.message = %v, want it to include %q", errorObj["message"], "table name is required")
|
|
}
|
|
|
|
status, stageResp, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables", namespace)), map[string]any{
|
|
"name": tableName,
|
|
"stage-create": true,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Stage create request failed: %v", err)
|
|
}
|
|
if status != http.StatusOK {
|
|
t.Fatalf("Stage create status = %d, want 200", status)
|
|
}
|
|
stageLocation, _ := stageResp["metadata-location"].(string)
|
|
if !strings.HasSuffix(stageLocation, "/metadata/v1.metadata.json") {
|
|
t.Fatalf("stage metadata-location = %q, want suffix /metadata/v1.metadata.json", stageLocation)
|
|
}
|
|
|
|
status, _, err = doIcebergJSONRequest(env, http.MethodGet, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), nil)
|
|
if err != nil {
|
|
t.Fatalf("Load staged table request failed: %v", err)
|
|
}
|
|
if status != http.StatusNotFound {
|
|
t.Fatalf("Load staged table status = %d, want 404", status)
|
|
}
|
|
|
|
status, commitResp, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), map[string]any{
|
|
"requirements": []map[string]any{
|
|
{"type": "assert-create"},
|
|
},
|
|
"updates": []any{},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Finalize commit request failed: %v", err)
|
|
}
|
|
if status != http.StatusOK {
|
|
t.Fatalf("Finalize commit status = %d, want 200", status)
|
|
}
|
|
commitLocation, _ := commitResp["metadata-location"].(string)
|
|
if !strings.HasSuffix(commitLocation, "/metadata/v1.metadata.json") {
|
|
t.Fatalf("final metadata-location = %q, want suffix /metadata/v1.metadata.json", commitLocation)
|
|
}
|
|
|
|
status, loadResp, err := doIcebergJSONRequest(env, http.MethodGet, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), nil)
|
|
if err != nil {
|
|
t.Fatalf("Load finalized table request failed: %v", err)
|
|
}
|
|
if status != http.StatusOK {
|
|
t.Fatalf("Load finalized table status = %d, want 200", status)
|
|
}
|
|
loadLocation, _ := loadResp["metadata-location"].(string)
|
|
if loadLocation != commitLocation {
|
|
t.Fatalf("loaded metadata-location = %q, want %q", loadLocation, commitLocation)
|
|
}
|
|
}
|
|
|
|
// TestCommitMissingTableWithoutAssertCreate ensures missing-table commits still require assert-create for creation.
|
|
func TestCommitMissingTableWithoutAssertCreate(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping integration test in short mode")
|
|
}
|
|
|
|
env := sharedEnv
|
|
bucketName := "warehouse-missing-" + randomSuffix()
|
|
createTableBucket(t, env, bucketName)
|
|
|
|
namespace := "stage_missing_assert_ns_" + randomSuffix()
|
|
tableName := "missing_table"
|
|
|
|
status, _, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, "/v1/namespaces"), map[string]any{
|
|
"namespace": []string{namespace},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Create namespace request failed: %v", err)
|
|
}
|
|
if status != http.StatusOK && status != http.StatusConflict {
|
|
t.Fatalf("Create namespace status = %d, want 200 or 409", status)
|
|
}
|
|
|
|
status, _, err = doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), map[string]any{
|
|
"requirements": []any{},
|
|
"updates": []any{},
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Commit missing table request failed: %v", err)
|
|
}
|
|
if status != http.StatusNotFound {
|
|
t.Fatalf("Commit missing table status = %d, want 404", status)
|
|
}
|
|
}
|
|
|
|
// doIcebergJSONRequest decodes JSON object responses used by catalog tests.
|
|
func doIcebergJSONRequest(env *TestEnvironment, method, path string, payload any) (int, map[string]any, error) {
|
|
url := env.IcebergURL() + path
|
|
|
|
var bodyReader io.Reader
|
|
if payload != nil {
|
|
data, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
bodyReader = bytes.NewReader(data)
|
|
}
|
|
|
|
req, err := http.NewRequest(method, url, bodyReader)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
if payload != nil {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
}
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
data, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return resp.StatusCode, nil, err
|
|
}
|
|
|
|
if len(data) == 0 {
|
|
return resp.StatusCode, nil, nil
|
|
}
|
|
|
|
var decoded map[string]any
|
|
if err := json.Unmarshal(data, &decoded); err != nil {
|
|
return resp.StatusCode, nil, fmt.Errorf("failed to decode %s %s response: %w body=%s", method, path, err, string(data))
|
|
}
|
|
return resp.StatusCode, decoded, nil
|
|
}
|
|
|
|
// icebergPath inserts the table bucket prefix into Iceberg REST API paths.
|
|
// For example, "/v1/namespaces" with prefix "my-bucket" becomes
|
|
// "/v1/my-bucket/namespaces".
|
|
func icebergPath(prefix, path string) string {
|
|
if prefix == "" {
|
|
return path
|
|
}
|
|
const base = "/v1/"
|
|
if !strings.HasPrefix(path, base) {
|
|
return path
|
|
}
|
|
withPrefix := base + prefix + "/" + strings.TrimPrefix(path, base)
|
|
return withPrefix
|
|
}
|
|
|
|
// createTableBucket creates a table bucket via the S3Tables REST API
|
|
func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) {
|
|
t.Helper()
|
|
|
|
// Use S3Tables REST API to create the bucket
|
|
endpoint := fmt.Sprintf("http://localhost:%d/buckets", env.s3Port)
|
|
|
|
reqBody := fmt.Sprintf(`{"name":"%s"}`, bucketName)
|
|
req, err := http.NewRequest(http.MethodPut, endpoint, strings.NewReader(reqBody))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create request: %v", err)
|
|
}
|
|
req.Header.Set("Content-Type", "application/x-amz-json-1.1")
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create table bucket %s: %v", bucketName, err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
body, _ := io.ReadAll(resp.Body)
|
|
t.Logf("Create table bucket %s response: status=%d, body=%s", bucketName, resp.StatusCode, string(body))
|
|
|
|
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusConflict {
|
|
t.Fatalf("Failed to create table bucket %s, status %d: %s", bucketName, resp.StatusCode, body)
|
|
}
|
|
t.Logf("Created table bucket %s", bucketName)
|
|
}
|
|
|
|
// randomSuffix returns a short random hex suffix for unique resource naming.
|
|
func randomSuffix() string {
|
|
return fmt.Sprintf("%x", time.Now().UnixNano()&0xffffffff)
|
|
}
|
|
|
|
// TestDuckDBIntegration tests Iceberg catalog operations using DuckDB
|
|
// This test requires Docker to be available
|
|
func TestDuckDBIntegration(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping integration test in short mode")
|
|
}
|
|
|
|
env := sharedEnv
|
|
|
|
if !env.dockerAvailable {
|
|
t.Skip("Docker not available, skipping DuckDB integration test")
|
|
}
|
|
|
|
// Create a temporary SQL file for DuckDB to execute
|
|
sqlFile := filepath.Join(env.dataDir, "test.sql")
|
|
sqlContent := fmt.Sprintf(`
|
|
-- Install and load Iceberg extension
|
|
INSTALL iceberg;
|
|
LOAD iceberg;
|
|
|
|
-- List namespaces via Iceberg REST catalog (basic connectivity test)
|
|
-- Note: Full operations require setting up S3 credentials which is beyond this test
|
|
SELECT 'Iceberg extension loaded successfully' as result;
|
|
`)
|
|
|
|
if err := os.WriteFile(sqlFile, []byte(sqlContent), 0644); err != nil {
|
|
t.Fatalf("Failed to write SQL file: %v", err)
|
|
}
|
|
|
|
// Run DuckDB in Docker to test Iceberg connectivity
|
|
// Use host.docker.internal to connect to the host's Iceberg port
|
|
cmd := exec.Command("docker", "run", "--rm",
|
|
"-v", fmt.Sprintf("%s:/test", env.dataDir),
|
|
"--add-host", "host.docker.internal:host-gateway",
|
|
"--entrypoint", "duckdb",
|
|
"duckdb/duckdb:latest",
|
|
"-init", "/test/test.sql",
|
|
"-c", "SELECT 1",
|
|
)
|
|
|
|
output, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
t.Logf("DuckDB output: %s", output)
|
|
// Check for expected errors in certain CI environments
|
|
outputStr := string(output)
|
|
if strings.Contains(outputStr, "iceberg extension is not available") ||
|
|
strings.Contains(outputStr, "Failed to load") {
|
|
t.Skip("Skipping DuckDB test: Iceberg extension not available in Docker image")
|
|
}
|
|
// Any other error is unexpected
|
|
t.Fatalf("DuckDB command failed unexpectedly. Output: %s\nError: %v", output, err)
|
|
}
|
|
|
|
// Verify the test completed successfully
|
|
outputStr := string(output)
|
|
t.Logf("DuckDB output: %s", outputStr)
|
|
if !strings.Contains(outputStr, "Iceberg extension loaded successfully") {
|
|
t.Errorf("Expected success message in output, got: %s", outputStr)
|
|
}
|
|
}
|