diff --git a/.github/workflows/rust-volume-server-tests.yml b/.github/workflows/rust-volume-server-tests.yml index b6d01d17c..6e724b0d4 100644 --- a/.github/workflows/rust-volume-server-tests.yml +++ b/.github/workflows/rust-volume-server-tests.yml @@ -93,7 +93,7 @@ jobs: - name: Build Go weed binary run: | cd weed - go build -o weed . + go build -tags 5BytesOffset -o weed . chmod +x weed ./weed version @@ -169,7 +169,7 @@ jobs: - name: Build Go weed binary run: | cd weed - go build -o weed . + go build -tags 5BytesOffset -o weed . chmod +x weed ./weed version diff --git a/seaweed-volume/Cargo.toml b/seaweed-volume/Cargo.toml index 0d7b9f5b7..e2c360bb5 100644 --- a/seaweed-volume/Cargo.toml +++ b/seaweed-volume/Cargo.toml @@ -20,7 +20,7 @@ default = ["5bytes"] [dependencies] # Async runtime tokio = { version = "1", features = ["full"] } -tokio-stream = "0.1" +tokio-stream = { version = "0.1", features = ["net"] } tokio-io-timeout = "1" # gRPC + protobuf diff --git a/seaweed-volume/src/main.rs b/seaweed-volume/src/main.rs index 0b4d24360..e2aaeb8aa 100644 --- a/seaweed-volume/src/main.rs +++ b/seaweed-volume/src/main.rs @@ -600,35 +600,35 @@ async fn run( }) }; + // Bind the gRPC listener before spawning to propagate bind errors at startup. + let grpc_listener = tokio::net::TcpListener::bind(&grpc_addr) + .await + .unwrap_or_else(|e| panic!("Failed to bind gRPC to {}: {}", grpc_addr, e)); + let grpc_local_addr = grpc_listener + .local_addr() + .unwrap_or_else(|e| panic!("Failed to get gRPC local addr: {}", e)); + let grpc_handle = { let grpc_state = state.clone(); - let grpc_addr = grpc_addr.clone(); let grpc_tls_acceptor = grpc_tls_acceptor.clone(); let mut shutdown_rx = shutdown_tx.subscribe(); + let shutdown_tx_grpc = shutdown_tx.clone(); tokio::spawn(async move { - let addr = tokio::net::lookup_host(&grpc_addr) - .await - .expect("Failed to resolve gRPC address") - .next() - .expect("No addresses found for gRPC bind address"); let grpc_service = VolumeGrpcService { state: grpc_state.clone(), }; - if let Some(tls_acceptor) = grpc_tls_acceptor { - let listener = tokio::net::TcpListener::bind(&grpc_addr) - .await - .unwrap_or_else(|e| panic!("Failed to bind gRPC to {}: {}", grpc_addr, e)); - let incoming = grpc_tls_incoming(listener, tls_acceptor); - let reflection_v1 = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(seaweed_volume::pb::FILE_DESCRIPTOR_SET) - .build_v1() - .expect("Failed to build gRPC reflection v1 service"); - let reflection_v1alpha = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(seaweed_volume::pb::FILE_DESCRIPTOR_SET) - .build_v1alpha() - .expect("Failed to build gRPC reflection v1alpha service"); - info!("gRPC server listening on {} (TLS enabled)", addr); - if let Err(e) = build_grpc_server_builder() + let reflection_v1 = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(seaweed_volume::pb::FILE_DESCRIPTOR_SET) + .build_v1() + .expect("Failed to build gRPC reflection v1 service"); + let reflection_v1alpha = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(seaweed_volume::pb::FILE_DESCRIPTOR_SET) + .build_v1alpha() + .expect("Failed to build gRPC reflection v1alpha service"); + let result = if let Some(tls_acceptor) = grpc_tls_acceptor { + let incoming = grpc_tls_incoming(grpc_listener, tls_acceptor); + info!("gRPC server listening on {} (TLS enabled)", grpc_local_addr); + build_grpc_server_builder() .layer(GrpcRequestIdLayer) .add_service(reflection_v1) .add_service(reflection_v1alpha) @@ -637,32 +637,25 @@ async fn run( let _ = shutdown_rx.recv().await; }) .await - { - error!("gRPC server error: {}", e); - } } else { - let reflection_v1 = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(seaweed_volume::pb::FILE_DESCRIPTOR_SET) - .build_v1() - .expect("Failed to build gRPC reflection v1 service"); - let reflection_v1alpha = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(seaweed_volume::pb::FILE_DESCRIPTOR_SET) - .build_v1alpha() - .expect("Failed to build gRPC reflection v1alpha service"); - info!("gRPC server listening on {}", addr); - if let Err(e) = build_grpc_server_builder() + let incoming = + tokio_stream::wrappers::TcpListenerStream::new(grpc_listener); + info!("gRPC server listening on {}", grpc_local_addr); + build_grpc_server_builder() .layer(GrpcRequestIdLayer) .add_service(reflection_v1) .add_service(reflection_v1alpha) .add_service(build_volume_grpc_service(grpc_service)) - .serve_with_shutdown(addr, async move { + .serve_with_incoming_shutdown(incoming, async move { let _ = shutdown_rx.recv().await; }) .await - { - error!("gRPC server error: {}", e); - } + }; + if let Err(ref e) = result { + error!("gRPC server error: {}", e); + let _ = shutdown_tx_grpc.send(()); } + result }) }; @@ -771,9 +764,40 @@ async fn run( })) }; - // Wait for all servers + // Wait for servers. Use select! with &mut so the losing handle is not + // dropped, then await it explicitly afterward. + let mut server_err: Option = None; + let mut http_handle = http_handle; + let mut grpc_handle = grpc_handle; + let grpc_finished_first = tokio::select! { + _ = &mut http_handle => false, + _ = &mut grpc_handle => true, + }; + // Inspect the gRPC result (already resolved if it finished first, + // otherwise await it now). + let grpc_result = if grpc_finished_first { + grpc_handle.await + } else { + // HTTP finished first; gRPC is still running. Await it. + grpc_handle.await + }; + match grpc_result { + Ok(Ok(())) => {} + Ok(Err(e)) => { + let msg = format!("gRPC server exited with error: {}", e); + error!("{}", msg); + server_err = Some(msg); + // serve error already sent shutdown inside the task + } + Err(e) => { + let msg = format!("gRPC task panicked: {}", e); + error!("{}", msg); + server_err = Some(msg); + let _ = shutdown_tx.send(()); + } + } + // Ensure the HTTP handle completes too. let _ = http_handle.await; - let _ = grpc_handle.await; if let Some(h) = public_handle { let _ = h.await; } @@ -798,6 +822,10 @@ async fn run( cpu_profile.finish().map_err(std::io::Error::other)?; } + if let Some(err_msg) = server_err { + return Err(std::io::Error::other(err_msg).into()); + } + info!("Volume server stopped."); Ok(()) } diff --git a/seaweed-volume/src/server/grpc_server.rs b/seaweed-volume/src/server/grpc_server.rs index 295583b0a..a4158e98c 100644 --- a/seaweed-volume/src/server/grpc_server.rs +++ b/seaweed-volume/src/server/grpc_server.rs @@ -1867,6 +1867,7 @@ impl VolumeServer for VolumeGrpcService { { let needle_header = resp.needle_header; let mut needle_body = resp.needle_body; + let resp_version = resp.version; if needle_header.is_empty() { continue; @@ -1891,8 +1892,36 @@ impl VolumeServer for VolumeGrpcService { // Parse needle from header + body let mut n = Needle::default(); n.read_header(&needle_header); - n.read_body_v2(&needle_body) - .map_err(|e| Status::internal(format!("parse needle body: {}", e)))?; + + if n.size.0 < 0 { + return Err(Status::invalid_argument(format!( + "unexpected negative needle size {} for needle {}", + n.size.0, n.id.0 + ))); + } else if n.size.0 > 0 { + // Normal needle: parse the body fields (DataSize, Data, flags, etc.) + n.read_body_v2(&needle_body) + .map_err(|e| Status::internal(format!("parse needle body: {}", e)))?; + } else { + // Delete tombstone (size == 0): body is checksum + timestamp + // (V3) or checksum only (V2) + padding. Validate minimum + // footer length for the protocol version. + use crate::storage::types::{ + NEEDLE_CHECKSUM_SIZE, TIMESTAMP_SIZE, VERSION_3, Version, + }; + let version = Version(resp_version as u8); + let min_footer = if version >= VERSION_3 { + NEEDLE_CHECKSUM_SIZE + TIMESTAMP_SIZE + } else { + NEEDLE_CHECKSUM_SIZE + }; + if needle_body.len() < min_footer { + return Err(Status::invalid_argument(format!( + "tombstone needle {} body too short: got {} bytes, need >= {} for version {}", + n.id.0, needle_body.len(), min_footer, resp_version + ))); + } + } // Write needle to local volume let mut store = state.store.write().unwrap(); @@ -4047,11 +4076,12 @@ fn find_last_append_at_ns(idx_path: &str, dat_path: &str, version: u32) -> Optio let mut header = [0u8; 16]; dat_file.read_exact(&mut header).ok()?; let needle_size = i32::from_be_bytes([header[12], header[13], header[14], header[15]]); - if needle_size <= 0 { + if needle_size < 0 { return None; } // Seek to tail: offset + 16 (header) + size -> checksum (4) + timestamp (8) + // For delete needles (size == 0), the tail is right after the header. let tail_offset = actual_offset as u64 + 16 + needle_size as u64; dat_file.seek(SeekFrom::Start(tail_offset)).ok()?; diff --git a/test/volume_server/framework/cluster_mixed.go b/test/volume_server/framework/cluster_mixed.go new file mode 100644 index 000000000..17376a842 --- /dev/null +++ b/test/volume_server/framework/cluster_mixed.go @@ -0,0 +1,368 @@ +package framework + +import ( + "fmt" + "net" + "os" + "os/exec" + "path/filepath" + "strconv" + "sync" + "testing" + + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" +) + +// MixedVolumeCluster wraps a Go master + a mix of Go and Rust volume servers +// for integration testing. Go servers occupy indices [0, goCount) and Rust +// servers occupy indices [goCount, goCount+rustCount). +type MixedVolumeCluster struct { + testingTB testing.TB + profile matrix.Profile + + weedBinary string // Go weed binary (master + Go volume servers) + rustVolumeBinary string // Rust volume binary + + baseDir string + configDir string + logsDir string + keepLogs bool + + masterPort int + masterGrpcPort int + + volumePorts []int + volumeGrpcPorts []int + volumePubPorts []int + isRust []bool // which servers are Rust + + masterCmd *exec.Cmd + volumeCmds []*exec.Cmd + + cleanupOnce sync.Once +} + +// StartMixedVolumeCluster starts a cluster with 1 Go master, goCount Go volume +// servers, and rustCount Rust volume servers. Go servers come first in the index. +func StartMixedVolumeCluster(t testing.TB, profile matrix.Profile, goCount, rustCount int) *MixedVolumeCluster { + t.Helper() + + if goCount < 0 || rustCount < 0 { + t.Fatalf("goCount and rustCount must be non-negative, got go=%d rust=%d", goCount, rustCount) + } + total := goCount + rustCount + if total < 2 { + t.Fatalf("need at least 2 volume servers, got %d", total) + } + + weedBinary, err := FindOrBuildWeedBinary() + if err != nil { + t.Fatalf("resolve weed binary: %v", err) + } + + // Only build the Rust binary when Rust servers are requested. + var rustBinary string + if rustCount > 0 { + rustBinary, err = FindOrBuildRustBinary() + if err != nil { + t.Skipf("skipping mixed cluster test: rust binary unavailable: %v", err) + } + } + + baseDir, keepLogs, err := newWorkDir() + if err != nil { + t.Fatalf("create temp test directory: %v", err) + } + + configDir := filepath.Join(baseDir, "config") + logsDir := filepath.Join(baseDir, "logs") + masterDataDir := filepath.Join(baseDir, "master") + + dirs := []string{configDir, logsDir, masterDataDir} + for i := 0; i < total; i++ { + dirs = append(dirs, filepath.Join(baseDir, fmt.Sprintf("volume%d", i))) + } + for _, dir := range dirs { + if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil { + t.Fatalf("create %s: %v", dir, mkErr) + } + } + + if err = writeSecurityConfig(configDir, profile); err != nil { + t.Fatalf("write security config: %v", err) + } + + masterPort, masterGrpcPort, err := allocateMasterPortPair() + if err != nil { + t.Fatalf("allocate master port pair: %v", err) + } + + // 2 ports per server (admin, grpc); add 1 more when public port is split out. + portsPerServer := 2 + if profile.SplitPublicPort { + portsPerServer = 3 + } + ports, err := allocatePorts(total * portsPerServer) + if err != nil { + t.Fatalf("allocate volume ports: %v", err) + } + + isRust := make([]bool, total) + for i := goCount; i < total; i++ { + isRust[i] = true + } + + c := &MixedVolumeCluster{ + testingTB: t, + profile: profile, + weedBinary: weedBinary, + rustVolumeBinary: rustBinary, + baseDir: baseDir, + configDir: configDir, + logsDir: logsDir, + keepLogs: keepLogs, + masterPort: masterPort, + masterGrpcPort: masterGrpcPort, + volumePorts: make([]int, total), + volumeGrpcPorts: make([]int, total), + volumePubPorts: make([]int, total), + isRust: isRust, + volumeCmds: make([]*exec.Cmd, total), + } + + for i := 0; i < total; i++ { + baseIdx := i * portsPerServer + c.volumePorts[i] = ports[baseIdx] + c.volumeGrpcPorts[i] = ports[baseIdx+1] + if profile.SplitPublicPort { + c.volumePubPorts[i] = ports[baseIdx+2] + } else { + c.volumePubPorts[i] = c.volumePorts[i] // reuse admin port + } + } + + // Start master + if err = c.startMaster(masterDataDir); err != nil { + c.Stop() + t.Fatalf("start master: %v", err) + } + helper := &Cluster{logsDir: logsDir} + if err = helper.waitForHTTP(c.MasterURL() + "/dir/status"); err != nil { + masterLog := helper.tailLog("master.log") + c.Stop() + t.Fatalf("wait for master readiness: %v\nmaster log tail:\n%s", err, masterLog) + } + + // Start volume servers + for i := 0; i < total; i++ { + volumeDataDir := filepath.Join(baseDir, fmt.Sprintf("volume%d", i)) + if isRust[i] { + err = c.startRustVolume(i, volumeDataDir) + } else { + err = c.startGoVolume(i, volumeDataDir) + } + if err != nil { + logTail := helper.tailLog(fmt.Sprintf("volume%d.log", i)) + c.Stop() + t.Fatalf("start volume server %d (rust=%v): %v\nlog tail:\n%s", i, isRust[i], err, logTail) + } + + // Rust uses /healthz, Go uses /status + healthURL := c.VolumeAdminURL(i) + "/status" + if isRust[i] { + healthURL = c.VolumeAdminURL(i) + "/healthz" + } + if err = helper.waitForHTTP(healthURL); err != nil { + logTail := helper.tailLog(fmt.Sprintf("volume%d.log", i)) + c.Stop() + t.Fatalf("wait for volume server %d readiness: %v\nlog tail:\n%s", i, err, logTail) + } + if err = helper.waitForTCP(c.VolumeGRPCAddress(i)); err != nil { + logTail := helper.tailLog(fmt.Sprintf("volume%d.log", i)) + c.Stop() + t.Fatalf("wait for volume server %d grpc readiness: %v\nlog tail:\n%s", i, err, logTail) + } + } + + t.Cleanup(func() { + c.Stop() + }) + + return c +} + +func (c *MixedVolumeCluster) Stop() { + if c == nil { + return + } + c.cleanupOnce.Do(func() { + for i := len(c.volumeCmds) - 1; i >= 0; i-- { + stopProcess(c.volumeCmds[i]) + } + stopProcess(c.masterCmd) + if !c.keepLogs && !c.testingTB.Failed() { + _ = os.RemoveAll(c.baseDir) + } else if c.baseDir != "" { + c.testingTB.Logf("mixed volume server integration logs kept at %s", c.baseDir) + } + }) +} + +func (c *MixedVolumeCluster) startMaster(dataDir string) error { + logFile, err := os.Create(filepath.Join(c.logsDir, "master.log")) + if err != nil { + return err + } + + args := []string{ + "-config_dir=" + c.configDir, + "master", + "-ip=127.0.0.1", + "-port=" + strconv.Itoa(c.masterPort), + "-port.grpc=" + strconv.Itoa(c.masterGrpcPort), + "-mdir=" + dataDir, + "-peers=none", + "-volumeSizeLimitMB=" + strconv.Itoa(testVolumeSizeLimitMB), + "-defaultReplication=000", + } + + c.masterCmd = exec.Command(c.weedBinary, args...) + c.masterCmd.Dir = c.baseDir + c.masterCmd.Stdout = logFile + c.masterCmd.Stderr = logFile + if err = c.masterCmd.Start(); err != nil { + logFile.Close() + return err + } + logFile.Close() // child inherited the fd + return nil +} + +func (c *MixedVolumeCluster) startGoVolume(index int, dataDir string) error { + logName := fmt.Sprintf("volume%d.log", index) + logFile, err := os.Create(filepath.Join(c.logsDir, logName)) + if err != nil { + return err + } + + args := []string{ + "-config_dir=" + c.configDir, + "volume", + "-ip=127.0.0.1", + "-port=" + strconv.Itoa(c.volumePorts[index]), + "-port.grpc=" + strconv.Itoa(c.volumeGrpcPorts[index]), + "-port.public=" + strconv.Itoa(c.volumePubPorts[index]), + "-dir=" + dataDir, + "-max=16", + "-master=127.0.0.1:" + strconv.Itoa(c.masterPort), + "-readMode=" + c.profile.ReadMode, + "-concurrentUploadLimitMB=" + strconv.Itoa(c.profile.ConcurrentUploadLimitMB), + "-concurrentDownloadLimitMB=" + strconv.Itoa(c.profile.ConcurrentDownloadLimitMB), + } + if c.profile.InflightUploadTimeout > 0 { + args = append(args, "-inflightUploadDataTimeout="+c.profile.InflightUploadTimeout.String()) + } + if c.profile.InflightDownloadTimeout > 0 { + args = append(args, "-inflightDownloadDataTimeout="+c.profile.InflightDownloadTimeout.String()) + } + + cmd := exec.Command(c.weedBinary, args...) + cmd.Dir = c.baseDir + cmd.Stdout = logFile + cmd.Stderr = logFile + if err = cmd.Start(); err != nil { + logFile.Close() + return err + } + logFile.Close() // child inherited the fd + c.volumeCmds[index] = cmd + return nil +} + +func (c *MixedVolumeCluster) startRustVolume(index int, dataDir string) error { + logName := fmt.Sprintf("volume%d.log", index) + logFile, err := os.Create(filepath.Join(c.logsDir, logName)) + if err != nil { + return err + } + + args := rustVolumeArgs( + c.profile, + c.configDir, + c.masterPort, + c.volumePorts[index], + c.volumeGrpcPorts[index], + c.volumePubPorts[index], + dataDir, + ) + + cmd := exec.Command(c.rustVolumeBinary, args...) + cmd.Dir = c.baseDir + cmd.Stdout = logFile + cmd.Stderr = logFile + if err = cmd.Start(); err != nil { + logFile.Close() + return err + } + logFile.Close() // child inherited the fd + c.volumeCmds[index] = cmd + return nil +} + +// --- accessor methods (mirror MultiVolumeCluster) --- + +func (c *MixedVolumeCluster) MasterAddress() string { + return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.masterPort)) +} + +func (c *MixedVolumeCluster) MasterURL() string { + return "http://" + c.MasterAddress() +} + +func (c *MixedVolumeCluster) VolumeAdminAddress(index int) string { + if index < 0 || index >= len(c.volumePorts) { + return "" + } + return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePorts[index])) +} + +func (c *MixedVolumeCluster) VolumePublicAddress(index int) string { + if index < 0 || index >= len(c.volumePubPorts) { + return "" + } + return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePubPorts[index])) +} + +func (c *MixedVolumeCluster) VolumeGRPCAddress(index int) string { + if index < 0 || index >= len(c.volumeGrpcPorts) { + return "" + } + return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumeGrpcPorts[index])) +} + +func (c *MixedVolumeCluster) VolumeAdminURL(index int) string { + return "http://" + c.VolumeAdminAddress(index) +} + +func (c *MixedVolumeCluster) VolumePublicURL(index int) string { + return "http://" + c.VolumePublicAddress(index) +} + +func (c *MixedVolumeCluster) BaseDir() string { + return c.baseDir +} + +// VolumeServerAddress returns SeaweedFS server address format: ip:httpPort.grpcPort +func (c *MixedVolumeCluster) VolumeServerAddress(index int) string { + if index < 0 || index >= len(c.volumePorts) { + return "" + } + return fmt.Sprintf("%s.%d", c.VolumeAdminAddress(index), c.volumeGrpcPorts[index]) +} + +func (c *MixedVolumeCluster) IsRust(index int) bool { + if index < 0 || index >= len(c.isRust) { + return false + } + return c.isRust[index] +} diff --git a/test/volume_server/grpc/mixed_balance_test.go b/test/volume_server/grpc/mixed_balance_test.go new file mode 100644 index 000000000..1ef836de0 --- /dev/null +++ b/test/volume_server/grpc/mixed_balance_test.go @@ -0,0 +1,567 @@ +package volume_server_grpc_test + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/test/volume_server/framework" + "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" +) + +// deleteAndWaitForTombstone issues an HTTP DELETE for the given fid on +// volumeURL, asserts a successful response, then polls with GET until +// the file returns 404 (tombstone visible) or the timeout elapses. +func deleteAndWaitForTombstone(t *testing.T, httpClient *http.Client, volumeURL, fid string) { + t.Helper() + req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/%s", volumeURL, fid), nil) + if err != nil { + t.Fatalf("build delete request for %s: %v", fid, err) + } + resp := framework.DoRequest(t, httpClient, req) + framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusAccepted && resp.StatusCode != http.StatusOK { + t.Fatalf("delete %s: expected 200 or 202, got %d", fid, resp.StatusCode) + } + // Poll until GET returns 404 (tombstone flushed to disk). + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + getResp := framework.ReadBytes(t, httpClient, volumeURL, fid) + status := getResp.StatusCode + framework.ReadAllAndClose(t, getResp) + if status == http.StatusNotFound { + return + } + time.Sleep(50 * time.Millisecond) + } + t.Fatalf("delete %s: tombstone not visible after 5s", fid) +} + +// TestMixedBalanceCopyGoToRust verifies that VolumeCopy works from a Go volume +// server to a Rust volume server. This is the core operation behind volume +// balancing in a mixed Go+Rust cluster. +func TestMixedBalanceCopyGoToRust(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartMixedVolumeCluster(t, matrix.P1(), 1, 1) + + // server 0 = Go, server 1 = Rust + conn0, goClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0)) + defer conn0.Close() + conn1, rustClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1)) + defer conn1.Close() + + httpClient := framework.NewHTTPClient() + + const volumeID = uint32(50) + + // Allocate volume on Go server and upload test data + framework.AllocateVolume(t, goClient, volumeID, "") + + testFiles := []struct { + key uint64 + cookie uint32 + data []byte + }{ + {1, 0xAABBCCDD, []byte("hello from Go server")}, + {2, 0x11223344, []byte("second file for balance test")}, + {3, 0xDEADBEEF, make([]byte, 4096)}, // larger file + } + + for _, f := range testFiles { + fid := framework.NewFileID(volumeID, f.key, f.cookie) + resp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(0), fid, f.data) + body := framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusCreated { + t.Fatalf("upload %s: expected 201, got %d: %s", fid, resp.StatusCode, body) + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Read source volume status before copy + sourceStatus, err := goClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: volumeID, + }) + if err != nil { + t.Fatalf("ReadVolumeFileStatus on Go server: %v", err) + } + t.Logf("Source: dat=%d idx=%d files=%d version=%d", + sourceStatus.GetDatFileSize(), sourceStatus.GetIdxFileSize(), + sourceStatus.GetFileCount(), sourceStatus.GetVersion()) + + // Copy volume from Go (server 0) to Rust (server 1) + copyStream, err := rustClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{ + VolumeId: volumeID, + SourceDataNode: cluster.VolumeServerAddress(0), + }) + if err != nil { + t.Fatalf("VolumeCopy call failed: %v", err) + } + + var lastAppendAtNs uint64 + for { + resp, recvErr := copyStream.Recv() + if recvErr != nil { + if recvErr != io.EOF { + t.Fatalf("VolumeCopy recv error: %v", recvErr) + } + break + } + if resp.GetLastAppendAtNs() != 0 { + lastAppendAtNs = resp.GetLastAppendAtNs() + } + } + t.Logf("VolumeCopy completed, lastAppendAtNs=%d", lastAppendAtNs) + + // Verify: read volume status from Rust server + targetStatus, err := rustClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: volumeID, + }) + if err != nil { + t.Fatalf("ReadVolumeFileStatus on Rust server: %v", err) + } + t.Logf("Target: dat=%d idx=%d files=%d version=%d", + targetStatus.GetDatFileSize(), targetStatus.GetIdxFileSize(), + targetStatus.GetFileCount(), targetStatus.GetVersion()) + + if sourceStatus.GetDatFileSize() != targetStatus.GetDatFileSize() { + t.Fatalf("dat file size mismatch: source=%d target=%d", + sourceStatus.GetDatFileSize(), targetStatus.GetDatFileSize()) + } + if sourceStatus.GetIdxFileSize() != targetStatus.GetIdxFileSize() { + t.Fatalf("idx file size mismatch: source=%d target=%d", + sourceStatus.GetIdxFileSize(), targetStatus.GetIdxFileSize()) + } + if sourceStatus.GetFileCount() != targetStatus.GetFileCount() { + t.Fatalf("file count mismatch: source=%d target=%d", + sourceStatus.GetFileCount(), targetStatus.GetFileCount()) + } + + // Verify data can be read from Rust server + for _, f := range testFiles { + fid := framework.NewFileID(volumeID, f.key, f.cookie) + resp := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(1), fid) + body := framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusOK { + t.Fatalf("read %s from Rust server: expected 200, got %d", fid, resp.StatusCode) + } + if !bytes.Equal(body, f.data) { + t.Fatalf("read %s from Rust server: content mismatch (got %d bytes, want %d)", fid, len(body), len(f.data)) + } + } +} + +// TestMixedBalanceCopyRustToGo verifies that VolumeCopy works from a Rust +// volume server to a Go volume server. +func TestMixedBalanceCopyRustToGo(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartMixedVolumeCluster(t, matrix.P1(), 1, 1) + + conn0, goClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0)) + defer conn0.Close() + conn1, rustClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1)) + defer conn1.Close() + + httpClient := framework.NewHTTPClient() + + const volumeID = uint32(51) + + // Allocate volume on Rust server and upload test data + framework.AllocateVolume(t, rustClient, volumeID, "") + + testFiles := []struct { + key uint64 + cookie uint32 + data []byte + }{ + {1, 0xAABBCCDD, []byte("hello from Rust server")}, + {2, 0x11223344, []byte("second file for reverse balance")}, + } + + for _, f := range testFiles { + fid := framework.NewFileID(volumeID, f.key, f.cookie) + resp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(1), fid, f.data) + body := framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusCreated { + t.Fatalf("upload %s: expected 201, got %d: %s", fid, resp.StatusCode, body) + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + sourceStatus, err := rustClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: volumeID, + }) + if err != nil { + t.Fatalf("ReadVolumeFileStatus on Rust server: %v", err) + } + + // Copy volume from Rust (server 1) to Go (server 0) + copyStream, err := goClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{ + VolumeId: volumeID, + SourceDataNode: cluster.VolumeServerAddress(1), + }) + if err != nil { + t.Fatalf("VolumeCopy call failed: %v", err) + } + for { + _, recvErr := copyStream.Recv() + if recvErr != nil { + if recvErr != io.EOF { + t.Fatalf("VolumeCopy recv error: %v", recvErr) + } + break + } + } + + targetStatus, err := goClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: volumeID, + }) + if err != nil { + t.Fatalf("ReadVolumeFileStatus on Go server: %v", err) + } + + if sourceStatus.GetDatFileSize() != targetStatus.GetDatFileSize() { + t.Fatalf("dat file size mismatch: source=%d target=%d", + sourceStatus.GetDatFileSize(), targetStatus.GetDatFileSize()) + } + if sourceStatus.GetIdxFileSize() != targetStatus.GetIdxFileSize() { + t.Fatalf("idx file size mismatch: source=%d target=%d", + sourceStatus.GetIdxFileSize(), targetStatus.GetIdxFileSize()) + } + if sourceStatus.GetFileCount() != targetStatus.GetFileCount() { + t.Fatalf("file count mismatch: source=%d target=%d", + sourceStatus.GetFileCount(), targetStatus.GetFileCount()) + } + + // Verify data can be read from Go server + for _, f := range testFiles { + fid := framework.NewFileID(volumeID, f.key, f.cookie) + resp := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(0), fid) + body := framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusOK { + t.Fatalf("read %s from Go server: expected 200, got %d", fid, resp.StatusCode) + } + if !bytes.Equal(body, f.data) { + t.Fatalf("read %s from Go server: content mismatch (got %d bytes, want %d)", fid, len(body), len(f.data)) + } + } +} + +// TestMixedBalanceCopyWithDeletes verifies that VolumeCopy correctly handles +// volumes that have both active and deleted needles. +func TestMixedBalanceCopyWithDeletes(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartMixedVolumeCluster(t, matrix.P1(), 1, 1) + + conn0, goClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0)) + defer conn0.Close() + conn1, rustClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1)) + defer conn1.Close() + + httpClient := framework.NewHTTPClient() + + const volumeID = uint32(52) + framework.AllocateVolume(t, goClient, volumeID, "") + + // Upload files + for i := uint64(1); i <= 5; i++ { + fid := framework.NewFileID(volumeID, i, 0x12340000+uint32(i)) + resp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(0), fid, []byte(fmt.Sprintf("file-%d", i))) + framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusCreated { + t.Fatalf("upload file %d: expected 201, got %d", i, resp.StatusCode) + } + } + + // Delete some files and wait for tombstones to be visible + for _, key := range []uint64{2, 4} { + fid := framework.NewFileID(volumeID, key, 0x12340000+uint32(key)) + deleteAndWaitForTombstone(t, httpClient, cluster.VolumeAdminURL(0), fid) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + sourceStatus, err := goClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: volumeID, + }) + if err != nil { + t.Fatalf("ReadVolumeFileStatus: %v", err) + } + t.Logf("Source after deletes: dat=%d idx=%d files=%d", + sourceStatus.GetDatFileSize(), sourceStatus.GetIdxFileSize(), + sourceStatus.GetFileCount()) + + // Copy volume from Go to Rust + copyStream, err := rustClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{ + VolumeId: volumeID, + SourceDataNode: cluster.VolumeServerAddress(0), + }) + if err != nil { + t.Fatalf("VolumeCopy: %v", err) + } + var lastAppendAtNs uint64 + for { + resp, recvErr := copyStream.Recv() + if recvErr != nil { + if recvErr != io.EOF { + t.Fatalf("VolumeCopy recv error: %v", recvErr) + } + break + } + if resp.GetLastAppendAtNs() != 0 { + lastAppendAtNs = resp.GetLastAppendAtNs() + } + } + if lastAppendAtNs == 0 { + t.Fatalf("VolumeCopy did not return a lastAppendAtNs timestamp") + } + t.Logf("VolumeCopy completed, lastAppendAtNs=%d", lastAppendAtNs) + + targetStatusAfterCopy, err := rustClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: volumeID, + }) + if err != nil { + t.Fatalf("ReadVolumeFileStatus on Rust: %v", err) + } + t.Logf("Target after copy: dat=%d idx=%d files=%d", + targetStatusAfterCopy.GetDatFileSize(), targetStatusAfterCopy.GetIdxFileSize(), + targetStatusAfterCopy.GetFileCount()) + + if sourceStatus.GetDatFileSize() != targetStatusAfterCopy.GetDatFileSize() { + t.Fatalf("dat file size mismatch: source=%d target=%d", + sourceStatus.GetDatFileSize(), targetStatusAfterCopy.GetDatFileSize()) + } + if sourceStatus.GetIdxFileSize() != targetStatusAfterCopy.GetIdxFileSize() { + t.Fatalf("idx file size mismatch: source=%d target=%d", + sourceStatus.GetIdxFileSize(), targetStatusAfterCopy.GetIdxFileSize()) + } + + // Tail from the copy checkpoint — source is unchanged (deletes happened + // before copy), so tailing should not append any data. + _, err = rustClient.VolumeTailReceiver(ctx, &volume_server_pb.VolumeTailReceiverRequest{ + VolumeId: volumeID, + SinceNs: lastAppendAtNs, + IdleTimeoutSeconds: 3, + SourceVolumeServer: cluster.VolumeServerAddress(0), + }) + if err != nil { + t.Fatalf("VolumeTailReceiver: %v", err) + } + + targetStatusAfterTail, err := rustClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: volumeID, + }) + if err != nil { + t.Fatalf("ReadVolumeFileStatus after tail: %v", err) + } + if targetStatusAfterTail.GetDatFileSize() != targetStatusAfterCopy.GetDatFileSize() { + t.Fatalf("dat grew after tail: before=%d after=%d", + targetStatusAfterCopy.GetDatFileSize(), targetStatusAfterTail.GetDatFileSize()) + } + if targetStatusAfterTail.GetIdxFileSize() != targetStatusAfterCopy.GetIdxFileSize() { + t.Fatalf("idx grew after tail: before=%d after=%d", + targetStatusAfterCopy.GetIdxFileSize(), targetStatusAfterTail.GetIdxFileSize()) + } + + // Verify surviving files are readable from Rust + for _, key := range []uint64{1, 3, 5} { + fid := framework.NewFileID(volumeID, key, 0x12340000+uint32(key)) + resp := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(1), fid) + body := framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusOK { + t.Fatalf("read surviving file %d from Rust: expected 200, got %d", key, resp.StatusCode) + } + expected := fmt.Sprintf("file-%d", key) + if string(body) != expected { + t.Fatalf("data mismatch for file %d: got %q want %q", key, body, expected) + } + } + + // Verify deleted files return 404 + for _, key := range []uint64{2, 4} { + fid := framework.NewFileID(volumeID, key, 0x12340000+uint32(key)) + resp := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(1), fid) + framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusNotFound { + t.Fatalf("read deleted file %d from Rust: expected 404, got %d", key, resp.StatusCode) + } + } +} + +// TestMixedBalanceFullMoveGoToRust exercises the complete volume balance move +// flow: mark readonly → copy → tail → verify sizes → delete source. +// This mirrors the steps in balance_task.go Execute(). +func TestMixedBalanceFullMoveGoToRust(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + + cluster := framework.StartMixedVolumeCluster(t, matrix.P1(), 1, 1) + + conn0, goClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0)) + defer conn0.Close() + conn1, rustClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1)) + defer conn1.Close() + + httpClient := framework.NewHTTPClient() + + const volumeID = uint32(53) + framework.AllocateVolume(t, goClient, volumeID, "") + + // Upload test data + for i := uint64(1); i <= 5; i++ { + fid := framework.NewFileID(volumeID, i, 0xABCD0000+uint32(i)) + resp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(0), fid, []byte(fmt.Sprintf("balance-move-file-%d", i))) + framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusCreated { + t.Fatalf("upload file %d: expected 201, got %d", i, resp.StatusCode) + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // Step 1: Copy volume to Rust server (source stays writable so we can + // delete after copy to exercise the tail tombstone path) + copyStream, err := rustClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{ + VolumeId: volumeID, + SourceDataNode: cluster.VolumeServerAddress(0), + }) + if err != nil { + t.Fatalf("VolumeCopy: %v", err) + } + var lastAppendAtNs uint64 + for { + resp, recvErr := copyStream.Recv() + if recvErr != nil { + if recvErr != io.EOF { + t.Fatalf("VolumeCopy recv: %v", recvErr) + } + break + } + if resp.GetLastAppendAtNs() != 0 { + lastAppendAtNs = resp.GetLastAppendAtNs() + } + } + t.Logf("Copy done, lastAppendAtNs=%d", lastAppendAtNs) + + // Step 2: Delete file 3 on the source AFTER copy. This creates a + // tombstone needle that the tail step must propagate to the Rust server. + deleteAndWaitForTombstone(t, httpClient, cluster.VolumeAdminURL(0), + framework.NewFileID(volumeID, 3, 0xABCD0003)) + + // Step 3: Mark source readonly so no further writes arrive during tail + _, err = goClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ + VolumeId: volumeID, + }) + if err != nil { + t.Fatalf("mark readonly: %v", err) + } + + // Read source status (the reference for post-tail verification) + sourceStatus, err := goClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: volumeID, + }) + if err != nil { + t.Fatalf("read source status: %v", err) + } + t.Logf("Source: dat=%d idx=%d files=%d", + sourceStatus.GetDatFileSize(), sourceStatus.GetIdxFileSize(), + sourceStatus.GetFileCount()) + + // Step 4: Tail for updates — this must pick up the delete tombstone + _, err = rustClient.VolumeTailReceiver(ctx, &volume_server_pb.VolumeTailReceiverRequest{ + VolumeId: volumeID, + SinceNs: lastAppendAtNs, + IdleTimeoutSeconds: 5, + SourceVolumeServer: cluster.VolumeServerAddress(0), + }) + if err != nil { + t.Fatalf("VolumeTailReceiver: %v", err) + } + + // Step 5: Verify dat/idx sizes match after tail. + // We compare file sizes (byte-level correctness) rather than file_count + // because the tail writes the tombstone via the write path (which + // increments file_count) while the source used the delete path (which + // only increments deletion_count). This is consistent with Go behavior. + targetStatus, err := rustClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: volumeID, + }) + if err != nil { + t.Fatalf("read target status: %v", err) + } + t.Logf("Target: dat=%d idx=%d files=%d", + targetStatus.GetDatFileSize(), targetStatus.GetIdxFileSize(), + targetStatus.GetFileCount()) + + if sourceStatus.GetDatFileSize() != targetStatus.GetDatFileSize() { + t.Fatalf("dat size mismatch after tail: source=%d target=%d", + sourceStatus.GetDatFileSize(), targetStatus.GetDatFileSize()) + } + if sourceStatus.GetIdxFileSize() != targetStatus.GetIdxFileSize() { + t.Fatalf("idx size mismatch after tail: source=%d target=%d", + sourceStatus.GetIdxFileSize(), targetStatus.GetIdxFileSize()) + } + + // Step 6: Delete volume from source + _, err = goClient.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{ + VolumeId: volumeID, + }) + if err != nil { + t.Fatalf("delete source volume: %v", err) + } + + // Verify source volume is gone + _, err = goClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: volumeID, + }) + if err == nil { + t.Fatalf("expected error reading deleted source volume, got nil") + } + t.Logf("Source volume deleted successfully (error as expected: %v)", err) + + // Verify all surviving data is readable from Rust (the new home) + for _, key := range []uint64{1, 2, 4, 5} { + fid := framework.NewFileID(volumeID, key, 0xABCD0000+uint32(key)) + resp := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(1), fid) + body := framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusOK { + t.Fatalf("read file %d from Rust after move: expected 200, got %d", key, resp.StatusCode) + } + expected := fmt.Sprintf("balance-move-file-%d", key) + if string(body) != expected { + t.Fatalf("data mismatch for file %d after move: got %q want %q", key, body, expected) + } + } + + // Verify deleted file is 404 on Rust (tombstone propagated via tail) + { + fid := framework.NewFileID(volumeID, 3, 0xABCD0003) + resp := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(1), fid) + framework.ReadAllAndClose(t, resp) + if resp.StatusCode != http.StatusNotFound { + t.Fatalf("deleted file 3 on Rust: expected 404, got %d", resp.StatusCode) + } + } + + t.Logf("Full balance move completed: volume %d moved from Go to Rust, source purged", volumeID) +}