test: harden weed mini readiness checks

This commit is contained in:
Chris Lu
2026-03-30 16:21:36 -07:00
parent 7d426d2a56
commit d5068b3ee6
5 changed files with 81 additions and 153 deletions

View File

@@ -4,8 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"math/rand"
"net"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
@@ -96,9 +94,9 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
t.Fatalf("failed to create temp directory: %v", err) t.Fatalf("failed to create temp directory: %v", err)
} }
env.masterPort = mustFreePort(t, "Master") env.masterPort = testutil.MustFreeMiniPort(t, "Master")
env.filerPort = mustFreePort(t, "Filer") env.filerPort = testutil.MustFreeMiniPort(t, "Filer")
env.s3Port = mustFreePort(t, "S3") env.s3Port = testutil.MustFreeMiniPort(t, "S3")
bindIP := testutil.FindBindIP() bindIP := testutil.FindBindIP()
iamConfigPath, err := testutil.WriteIAMConfig(env.seaweedfsDataDir, env.accessKey, env.secretKey) iamConfigPath, err := testutil.WriteIAMConfig(env.seaweedfsDataDir, env.accessKey, env.secretKey)
@@ -135,14 +133,14 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
} }
registerMiniProcess(env.masterProcess) registerMiniProcess(env.masterProcess)
if !waitForPort(env.masterPort, 15*time.Second) { if !testutil.WaitForPort(env.masterPort, testutil.SeaweedMiniStartupTimeout) {
t.Fatalf("weed mini failed to start - master port %d not listening", env.masterPort) t.Fatalf("weed mini failed to start - master port %d not listening", env.masterPort)
} }
if !waitForPort(env.filerPort, 15*time.Second) { if !testutil.WaitForPort(env.filerPort, testutil.SeaweedMiniStartupTimeout) {
t.Fatalf("weed mini failed to start - filer port %d not listening", env.filerPort) t.Fatalf("weed mini failed to start - filer port %d not listening", env.filerPort)
} }
if !waitForPort(env.s3Port, 15*time.Second) { if !testutil.WaitForService(fmt.Sprintf("http://127.0.0.1:%d/status", env.s3Port), testutil.SeaweedMiniStartupTimeout) {
t.Fatalf("weed mini failed to start - s3 port %d not listening", env.s3Port) t.Fatalf("weed mini failed to start - s3 endpoint http://127.0.0.1:%d/status not responding", env.s3Port)
} }
} }
@@ -208,46 +206,6 @@ func (env *TestEnvironment) Cleanup(t *testing.T) {
} }
} }
func mustFreePort(t *testing.T, name string) int {
t.Helper()
for i := 0; i < 200; i++ {
port := 20000 + rand.Intn(30000)
listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))
if err != nil {
continue
}
_ = listener.Close()
grpcPort := port + 10000
if grpcPort > 65535 {
continue
}
grpcListener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", grpcPort))
if err != nil {
continue
}
_ = grpcListener.Close()
return port
}
t.Fatalf("failed to get free port for %s", name)
return 0
}
func waitForPort(port int, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", port), 500*time.Millisecond)
if err == nil {
_ = conn.Close()
return true
}
time.Sleep(100 * time.Millisecond)
}
return false
}
func runSparkPyScript(t *testing.T, container testcontainers.Container, script string, s3Port int) (int, string) { func runSparkPyScript(t *testing.T, container testcontainers.Container, script string, s3Port int) (int, string) {
t.Helper() t.Helper()

View File

@@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"io" "io"
"math/rand" "math/rand"
"net"
"net/http" "net/http"
"os" "os"
"os/exec" "os/exec"
@@ -115,11 +114,11 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
t.Fatalf("failed to create temp directory: %v", err) t.Fatalf("failed to create temp directory: %v", err)
} }
env.masterPort = mustFreePort(t, "Master") env.masterPort = testutil.MustFreeMiniPort(t, "Master")
env.filerPort = mustFreePort(t, "Filer") env.filerPort = testutil.MustFreeMiniPort(t, "Filer")
env.s3Port = mustFreePort(t, "S3") env.s3Port = testutil.MustFreeMiniPort(t, "S3")
env.icebergRestPort = mustFreePort(t, "Iceberg") env.icebergRestPort = testutil.MustFreeMiniPort(t, "Iceberg")
env.risingwavePort = mustFreePort(t, "RisingWave") env.risingwavePort = testutil.MustFreeMiniPort(t, "RisingWave")
env.bindIP = testutil.FindBindIP() env.bindIP = testutil.FindBindIP()
@@ -161,62 +160,20 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
registerMiniProcess(env.masterProcess) registerMiniProcess(env.masterProcess)
// Wait for all services to be ready // Wait for all services to be ready
if !waitForPort(env.masterPort, 15*time.Second) { if !testutil.WaitForPort(env.masterPort, testutil.SeaweedMiniStartupTimeout) {
t.Fatalf("weed mini failed to start - master port %d not listening", env.masterPort) t.Fatalf("weed mini failed to start - master port %d not listening", env.masterPort)
} }
if !waitForPort(env.filerPort, 15*time.Second) { if !testutil.WaitForPort(env.filerPort, testutil.SeaweedMiniStartupTimeout) {
t.Fatalf("weed mini failed to start - filer port %d not listening", env.filerPort) t.Fatalf("weed mini failed to start - filer port %d not listening", env.filerPort)
} }
if !waitForPort(env.s3Port, 15*time.Second) { if !testutil.WaitForService(fmt.Sprintf("http://127.0.0.1:%d/status", env.s3Port), testutil.SeaweedMiniStartupTimeout) {
t.Fatalf("weed mini failed to start - s3 port %d not listening", env.s3Port) t.Fatalf("weed mini failed to start - s3 endpoint http://127.0.0.1:%d/status not responding", env.s3Port)
} }
if !waitForPort(env.icebergRestPort, 15*time.Second) { if !testutil.WaitForService(fmt.Sprintf("http://127.0.0.1:%d/v1/config", env.icebergRestPort), testutil.SeaweedMiniStartupTimeout) {
t.Fatalf("weed mini failed to start - iceberg rest port %d not listening", env.icebergRestPort) t.Fatalf("weed mini failed to start - iceberg rest endpoint http://127.0.0.1:%d/v1/config not responding", env.icebergRestPort)
} }
} }
func mustFreePort(t *testing.T, name string) int {
t.Helper()
minPort := 10000
maxPort := 55000 // Ensure port+10000 < 65535
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < 1000; i++ {
port := minPort + r.Intn(maxPort-minPort)
// Check http port
ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
if err != nil {
continue
}
ln.Close()
// Check grpc port (weed mini uses port+10000)
ln2, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port+10000))
if err != nil {
continue
}
ln2.Close()
return port
}
t.Fatalf("failed to find a free port < %d for %s after 1000 attempts", maxPort, name)
return 0
}
func waitForPort(port int, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", port), 500*time.Millisecond)
if err == nil {
conn.Close()
return true
}
time.Sleep(100 * time.Millisecond)
}
return false
}
func (env *TestEnvironment) StartRisingWave(t *testing.T) { func (env *TestEnvironment) StartRisingWave(t *testing.T) {
t.Helper() t.Helper()
@@ -253,7 +210,7 @@ func (env *TestEnvironment) StartRisingWave(t *testing.T) {
} }
// Wait for RisingWave port to be open on host // Wait for RisingWave port to be open on host
if !waitForPort(env.risingwavePort, 120*time.Second) { if !testutil.WaitForPort(env.risingwavePort, 120*time.Second) {
t.Fatalf("timed out waiting for RisingWave port %d to be open", env.risingwavePort) t.Fatalf("timed out waiting for RisingWave port %d to be open", env.risingwavePort)
} }

View File

@@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"io" "io"
"math/rand" "math/rand"
"net"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
@@ -91,10 +90,10 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
t.Fatalf("failed to create temp directory: %v", err) t.Fatalf("failed to create temp directory: %v", err)
} }
env.masterPort = mustFreePort(t, "Master") env.masterPort = testutil.MustFreeMiniPort(t, "Master")
env.filerPort = mustFreePort(t, "Filer") env.filerPort = testutil.MustFreeMiniPort(t, "Filer")
env.s3Port = mustFreePort(t, "S3") env.s3Port = testutil.MustFreeMiniPort(t, "S3")
env.icebergRestPort = mustFreePort(t, "Iceberg") env.icebergRestPort = testutil.MustFreeMiniPort(t, "Iceberg")
bindIP := testutil.FindBindIP() bindIP := testutil.FindBindIP()
@@ -127,58 +126,20 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
registerMiniProcess(env.masterProcess) registerMiniProcess(env.masterProcess)
// Wait for all services to be ready // Wait for all services to be ready
if !waitForPort(env.masterPort, 15*time.Second) { if !testutil.WaitForPort(env.masterPort, testutil.SeaweedMiniStartupTimeout) {
t.Fatalf("weed mini failed to start - master port %d not listening", env.masterPort) t.Fatalf("weed mini failed to start - master port %d not listening", env.masterPort)
} }
if !waitForPort(env.filerPort, 15*time.Second) { if !testutil.WaitForPort(env.filerPort, testutil.SeaweedMiniStartupTimeout) {
t.Fatalf("weed mini failed to start - filer port %d not listening", env.filerPort) t.Fatalf("weed mini failed to start - filer port %d not listening", env.filerPort)
} }
if !waitForPort(env.s3Port, 15*time.Second) { if !testutil.WaitForService(fmt.Sprintf("http://127.0.0.1:%d/status", env.s3Port), testutil.SeaweedMiniStartupTimeout) {
t.Fatalf("weed mini failed to start - s3 port %d not listening", env.s3Port) t.Fatalf("weed mini failed to start - s3 endpoint http://127.0.0.1:%d/status not responding", env.s3Port)
} }
if !waitForPort(env.icebergRestPort, 15*time.Second) { if !testutil.WaitForService(fmt.Sprintf("http://127.0.0.1:%d/v1/config", env.icebergRestPort), testutil.SeaweedMiniStartupTimeout) {
t.Fatalf("weed mini failed to start - iceberg rest port %d not listening", env.icebergRestPort) t.Fatalf("weed mini failed to start - iceberg rest endpoint http://127.0.0.1:%d/v1/config not responding", env.icebergRestPort)
} }
} }
func mustFreePort(t *testing.T, name string) int {
t.Helper()
for i := 0; i < 200; i++ {
port := 20000 + rand.Intn(30000)
listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port))
if err != nil {
continue
}
listener.Close()
grpcPort := port + 10000
if grpcPort > 65535 {
continue
}
grpcListener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", grpcPort))
if err != nil {
continue
}
grpcListener.Close()
return port
}
t.Fatalf("failed to get free port for %s", name)
return 0
}
func waitForPort(port int, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", port), 500*time.Millisecond)
if err == nil {
conn.Close()
return true
}
time.Sleep(100 * time.Millisecond)
}
return false
}
func (env *TestEnvironment) writeSparkConfig(t *testing.T, catalogBucket string) string { func (env *TestEnvironment) writeSparkConfig(t *testing.T, catalogBucket string) string {
t.Helper() t.Helper()

View File

@@ -2,6 +2,7 @@ package testutil
import ( import (
"context" "context"
"fmt"
"net" "net"
"net/http" "net/http"
"os/exec" "os/exec"
@@ -64,3 +65,17 @@ func WaitForService(url string, timeout time.Duration) bool {
} }
} }
} }
func WaitForPort(port int, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
address := fmt.Sprintf("127.0.0.1:%d", port)
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("tcp", address, 500*time.Millisecond)
if err == nil {
_ = conn.Close()
return true
}
time.Sleep(100 * time.Millisecond)
}
return false
}

View File

@@ -2,11 +2,16 @@ package testutil
import ( import (
"fmt" "fmt"
"math/rand"
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
"testing"
"time"
) )
const SeaweedMiniStartupTimeout = 45 * time.Second
func FindBindIP() string { func FindBindIP() string {
addrs, err := net.InterfaceAddrs() addrs, err := net.InterfaceAddrs()
if err != nil { if err != nil {
@@ -54,3 +59,35 @@ func WriteIAMConfig(dir, accessKey, secretKey string) (string, error) {
} }
return iamConfigPath, nil return iamConfigPath, nil
} }
func MustFreeMiniPort(t *testing.T, name string) int {
t.Helper()
const (
minPort = 10000
maxPort = 55000
)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < 1000; i++ {
port := minPort + r.Intn(maxPort-minPort)
listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port))
if err != nil {
continue
}
_ = listener.Close()
grpcPort := port + 10000
grpcListener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", grpcPort))
if err != nil {
continue
}
_ = grpcListener.Close()
return port
}
t.Fatalf("failed to get free weed mini port for %s", name)
return 0
}