fix flaky test

This commit is contained in:
Chris Lu
2026-02-04 23:16:31 -08:00
parent c2bfd7b524
commit e39a4c2041
2 changed files with 38 additions and 14 deletions

View File

@@ -188,16 +188,30 @@ def main():
print(f"Prefix: {args.prefix}") print(f"Prefix: {args.prefix}")
print() print()
# Load the REST catalog # Load the REST catalog with retries to handle possible delay in catalog server readiness
catalog = load_catalog( import time
"rest", max_retries = 10
**{ catalog = None
"type": "rest", for attempt in range(max_retries):
"uri": args.catalog_url, try:
"warehouse": args.warehouse, catalog = load_catalog(
"prefix": args.prefix, "rest",
} **{
) "type": "rest",
"uri": args.catalog_url,
"warehouse": args.warehouse,
"prefix": args.prefix,
}
)
print(f"Successfully connected to catalog on attempt {attempt + 1}")
break
except Exception as e:
if attempt < max_retries - 1:
print(f" Attempt {attempt + 1} failed, retrying in 2s... ({e})")
time.sleep(2)
else:
print(f" All {max_retries} attempts failed.")
raise e
# Run tests # Run tests
tests = [ tests = [

View File

@@ -888,9 +888,12 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) {
}, *miniWebDavOptions.port) }, *miniWebDavOptions.port)
} }
// Wait for both S3 and WebDAV to be ready // Wait for services to be ready
if *miniEnableS3 { if *miniEnableS3 {
waitForServiceReady("S3", *miniS3Options.port, bindIp) waitForServiceReady("S3", *miniS3Options.port, bindIp)
if miniS3Options.portIceberg != nil && *miniS3Options.portIceberg > 0 {
waitForServiceReady("Iceberg", *miniS3Options.portIceberg, bindIp)
}
} }
if *miniEnableWebDAV { if *miniEnableWebDAV {
waitForServiceReady("WebDAV", *miniWebDavOptions.port, bindIp) waitForServiceReady("WebDAV", *miniWebDavOptions.port, bindIp)
@@ -909,6 +912,7 @@ func startMiniService(name string, fn func(), port int) {
// waitForServiceReady pings the service HTTP endpoint to check if it's ready to accept connections // waitForServiceReady pings the service HTTP endpoint to check if it's ready to accept connections
func waitForServiceReady(name string, port int, bindIp string) { func waitForServiceReady(name string, port int, bindIp string) {
address := fmt.Sprintf("http://%s:%d", bindIp, port) address := fmt.Sprintf("http://%s:%d", bindIp, port)
healthAddr := getHealthCheckAddr(address)
maxAttempts := 30 // 30 * 200ms = 6 seconds max wait maxAttempts := 30 // 30 * 200ms = 6 seconds max wait
attempt := 0 attempt := 0
client := &http.Client{ client := &http.Client{
@@ -916,7 +920,7 @@ func waitForServiceReady(name string, port int, bindIp string) {
} }
for attempt < maxAttempts { for attempt < maxAttempts {
resp, err := client.Get(address) resp, err := client.Get(healthAddr)
if err == nil { if err == nil {
resp.Body.Close() resp.Body.Close()
glog.Infof("%s service is ready at %s", name, address) glog.Infof("%s service is ready at %s", name, address)
@@ -1022,7 +1026,7 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) {
// waitForAdminServerReady pings the admin server HTTP endpoint to check if it's ready // waitForAdminServerReady pings the admin server HTTP endpoint to check if it's ready
func waitForAdminServerReady(adminAddr string) error { func waitForAdminServerReady(adminAddr string) error {
healthAddr := fmt.Sprintf("%s/health", adminAddr) healthAddr := getHealthCheckAddr(fmt.Sprintf("%s/health", adminAddr))
maxAttempts := 60 // 60 * 500ms = 30 seconds max wait maxAttempts := 60 // 60 * 500ms = 30 seconds max wait
attempt := 0 attempt := 0
client := &http.Client{ client := &http.Client{
@@ -1033,7 +1037,7 @@ func waitForAdminServerReady(adminAddr string) error {
resp, err := client.Get(healthAddr) resp, err := client.Get(healthAddr)
if err == nil { if err == nil {
resp.Body.Close() resp.Body.Close()
glog.V(1).Infof("Admin server is ready at %s", adminAddr) glog.Infof("Admin server is ready at %s", adminAddr)
return nil return nil
} }
attempt++ attempt++
@@ -1042,6 +1046,12 @@ func waitForAdminServerReady(adminAddr string) error {
return fmt.Errorf("admin server did not become ready at %s after %d attempts", adminAddr, maxAttempts) return fmt.Errorf("admin server did not become ready at %s after %d attempts", adminAddr, maxAttempts)
} }
func getHealthCheckAddr(addr string) string {
if strings.Contains(addr, "://0.0.0.0:") {
return strings.Replace(addr, "://0.0.0.0:", "://127.0.0.1:", 1)
}
return addr
}
// waitForWorkerReady polls the worker's gRPC port to ensure the worker has fully initialized // waitForWorkerReady polls the worker's gRPC port to ensure the worker has fully initialized
func waitForWorkerReady(workerGrpcAddr string) { func waitForWorkerReady(workerGrpcAddr string) {