fix(volume-rust): fix volume balance between Go and Rust servers (#8915)

Two bugs prevented reliable volume balancing when a Rust volume server
is the copy target:

1. find_last_append_at_ns returned None for delete tombstones (Size==0
   in dat header), falling back to file mtime truncated to seconds.
   This caused the tail step to re-send needles from the last sub-second
   window. Fix: change `needle_size <= 0` to `< 0` since Size==0 delete
   needles still have a valid timestamp in their tail.

2. VolumeTailReceiver called read_body_v2 on delete needles, which have
   no DataSize/Data/flags — only checksum+timestamp+padding after the
   header. Fix: skip read_body_v2 when size == 0, reject negative sizes.

Also:
- Unify gRPC server bind: use TcpListener::bind before spawn for both
  TLS and non-TLS paths, propagating bind errors at startup.
- Add mixed Go+Rust cluster test harness and integration tests covering
  VolumeCopy in both directions, copy with deletes, and full balance
  move with tail tombstone propagation and source deletion.
- Make FindOrBuildRustBinary configurable for default vs no-default
  features (4-byte vs 5-byte offsets).
This commit is contained in:
Chris Lu
2026-04-04 09:13:23 -07:00
committed by GitHub
parent d1823d3784
commit 9add18e169
6 changed files with 1039 additions and 46 deletions

View File

@@ -93,7 +93,7 @@ jobs:
- name: Build Go weed binary - name: Build Go weed binary
run: | run: |
cd weed cd weed
go build -o weed . go build -tags 5BytesOffset -o weed .
chmod +x weed chmod +x weed
./weed version ./weed version
@@ -169,7 +169,7 @@ jobs:
- name: Build Go weed binary - name: Build Go weed binary
run: | run: |
cd weed cd weed
go build -o weed . go build -tags 5BytesOffset -o weed .
chmod +x weed chmod +x weed
./weed version ./weed version

View File

@@ -20,7 +20,7 @@ default = ["5bytes"]
[dependencies] [dependencies]
# Async runtime # Async runtime
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1" tokio-stream = { version = "0.1", features = ["net"] }
tokio-io-timeout = "1" tokio-io-timeout = "1"
# gRPC + protobuf # gRPC + protobuf

View File

@@ -600,35 +600,35 @@ async fn run(
}) })
}; };
// Bind the gRPC listener before spawning to propagate bind errors at startup.
let grpc_listener = tokio::net::TcpListener::bind(&grpc_addr)
.await
.unwrap_or_else(|e| panic!("Failed to bind gRPC to {}: {}", grpc_addr, e));
let grpc_local_addr = grpc_listener
.local_addr()
.unwrap_or_else(|e| panic!("Failed to get gRPC local addr: {}", e));
let grpc_handle = { let grpc_handle = {
let grpc_state = state.clone(); let grpc_state = state.clone();
let grpc_addr = grpc_addr.clone();
let grpc_tls_acceptor = grpc_tls_acceptor.clone(); let grpc_tls_acceptor = grpc_tls_acceptor.clone();
let mut shutdown_rx = shutdown_tx.subscribe(); let mut shutdown_rx = shutdown_tx.subscribe();
let shutdown_tx_grpc = shutdown_tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let addr = tokio::net::lookup_host(&grpc_addr)
.await
.expect("Failed to resolve gRPC address")
.next()
.expect("No addresses found for gRPC bind address");
let grpc_service = VolumeGrpcService { let grpc_service = VolumeGrpcService {
state: grpc_state.clone(), state: grpc_state.clone(),
}; };
if let Some(tls_acceptor) = grpc_tls_acceptor { let reflection_v1 = tonic_reflection::server::Builder::configure()
let listener = tokio::net::TcpListener::bind(&grpc_addr) .register_encoded_file_descriptor_set(seaweed_volume::pb::FILE_DESCRIPTOR_SET)
.await .build_v1()
.unwrap_or_else(|e| panic!("Failed to bind gRPC to {}: {}", grpc_addr, e)); .expect("Failed to build gRPC reflection v1 service");
let incoming = grpc_tls_incoming(listener, tls_acceptor); let reflection_v1alpha = tonic_reflection::server::Builder::configure()
let reflection_v1 = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(seaweed_volume::pb::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(seaweed_volume::pb::FILE_DESCRIPTOR_SET) .build_v1alpha()
.build_v1() .expect("Failed to build gRPC reflection v1alpha service");
.expect("Failed to build gRPC reflection v1 service"); let result = if let Some(tls_acceptor) = grpc_tls_acceptor {
let reflection_v1alpha = tonic_reflection::server::Builder::configure() let incoming = grpc_tls_incoming(grpc_listener, tls_acceptor);
.register_encoded_file_descriptor_set(seaweed_volume::pb::FILE_DESCRIPTOR_SET) info!("gRPC server listening on {} (TLS enabled)", grpc_local_addr);
.build_v1alpha() build_grpc_server_builder()
.expect("Failed to build gRPC reflection v1alpha service");
info!("gRPC server listening on {} (TLS enabled)", addr);
if let Err(e) = build_grpc_server_builder()
.layer(GrpcRequestIdLayer) .layer(GrpcRequestIdLayer)
.add_service(reflection_v1) .add_service(reflection_v1)
.add_service(reflection_v1alpha) .add_service(reflection_v1alpha)
@@ -637,32 +637,25 @@ async fn run(
let _ = shutdown_rx.recv().await; let _ = shutdown_rx.recv().await;
}) })
.await .await
{
error!("gRPC server error: {}", e);
}
} else { } else {
let reflection_v1 = tonic_reflection::server::Builder::configure() let incoming =
.register_encoded_file_descriptor_set(seaweed_volume::pb::FILE_DESCRIPTOR_SET) tokio_stream::wrappers::TcpListenerStream::new(grpc_listener);
.build_v1() info!("gRPC server listening on {}", grpc_local_addr);
.expect("Failed to build gRPC reflection v1 service"); build_grpc_server_builder()
let reflection_v1alpha = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(seaweed_volume::pb::FILE_DESCRIPTOR_SET)
.build_v1alpha()
.expect("Failed to build gRPC reflection v1alpha service");
info!("gRPC server listening on {}", addr);
if let Err(e) = build_grpc_server_builder()
.layer(GrpcRequestIdLayer) .layer(GrpcRequestIdLayer)
.add_service(reflection_v1) .add_service(reflection_v1)
.add_service(reflection_v1alpha) .add_service(reflection_v1alpha)
.add_service(build_volume_grpc_service(grpc_service)) .add_service(build_volume_grpc_service(grpc_service))
.serve_with_shutdown(addr, async move { .serve_with_incoming_shutdown(incoming, async move {
let _ = shutdown_rx.recv().await; let _ = shutdown_rx.recv().await;
}) })
.await .await
{ };
error!("gRPC server error: {}", e); if let Err(ref e) = result {
} error!("gRPC server error: {}", e);
let _ = shutdown_tx_grpc.send(());
} }
result
}) })
}; };
@@ -771,9 +764,40 @@ async fn run(
})) }))
}; };
// Wait for all servers // Wait for servers. Use select! with &mut so the losing handle is not
// dropped, then await it explicitly afterward.
let mut server_err: Option<String> = None;
let mut http_handle = http_handle;
let mut grpc_handle = grpc_handle;
let grpc_finished_first = tokio::select! {
_ = &mut http_handle => false,
_ = &mut grpc_handle => true,
};
// Inspect the gRPC result (already resolved if it finished first,
// otherwise await it now).
let grpc_result = if grpc_finished_first {
grpc_handle.await
} else {
// HTTP finished first; gRPC is still running. Await it.
grpc_handle.await
};
match grpc_result {
Ok(Ok(())) => {}
Ok(Err(e)) => {
let msg = format!("gRPC server exited with error: {}", e);
error!("{}", msg);
server_err = Some(msg);
// serve error already sent shutdown inside the task
}
Err(e) => {
let msg = format!("gRPC task panicked: {}", e);
error!("{}", msg);
server_err = Some(msg);
let _ = shutdown_tx.send(());
}
}
// Ensure the HTTP handle completes too.
let _ = http_handle.await; let _ = http_handle.await;
let _ = grpc_handle.await;
if let Some(h) = public_handle { if let Some(h) = public_handle {
let _ = h.await; let _ = h.await;
} }
@@ -798,6 +822,10 @@ async fn run(
cpu_profile.finish().map_err(std::io::Error::other)?; cpu_profile.finish().map_err(std::io::Error::other)?;
} }
if let Some(err_msg) = server_err {
return Err(std::io::Error::other(err_msg).into());
}
info!("Volume server stopped."); info!("Volume server stopped.");
Ok(()) Ok(())
} }

View File

@@ -1867,6 +1867,7 @@ impl VolumeServer for VolumeGrpcService {
{ {
let needle_header = resp.needle_header; let needle_header = resp.needle_header;
let mut needle_body = resp.needle_body; let mut needle_body = resp.needle_body;
let resp_version = resp.version;
if needle_header.is_empty() { if needle_header.is_empty() {
continue; continue;
@@ -1891,8 +1892,36 @@ impl VolumeServer for VolumeGrpcService {
// Parse needle from header + body // Parse needle from header + body
let mut n = Needle::default(); let mut n = Needle::default();
n.read_header(&needle_header); n.read_header(&needle_header);
n.read_body_v2(&needle_body)
.map_err(|e| Status::internal(format!("parse needle body: {}", e)))?; if n.size.0 < 0 {
return Err(Status::invalid_argument(format!(
"unexpected negative needle size {} for needle {}",
n.size.0, n.id.0
)));
} else if n.size.0 > 0 {
// Normal needle: parse the body fields (DataSize, Data, flags, etc.)
n.read_body_v2(&needle_body)
.map_err(|e| Status::internal(format!("parse needle body: {}", e)))?;
} else {
// Delete tombstone (size == 0): body is checksum + timestamp
// (V3) or checksum only (V2) + padding. Validate minimum
// footer length for the protocol version.
use crate::storage::types::{
NEEDLE_CHECKSUM_SIZE, TIMESTAMP_SIZE, VERSION_3, Version,
};
let version = Version(resp_version as u8);
let min_footer = if version >= VERSION_3 {
NEEDLE_CHECKSUM_SIZE + TIMESTAMP_SIZE
} else {
NEEDLE_CHECKSUM_SIZE
};
if needle_body.len() < min_footer {
return Err(Status::invalid_argument(format!(
"tombstone needle {} body too short: got {} bytes, need >= {} for version {}",
n.id.0, needle_body.len(), min_footer, resp_version
)));
}
}
// Write needle to local volume // Write needle to local volume
let mut store = state.store.write().unwrap(); let mut store = state.store.write().unwrap();
@@ -4047,11 +4076,12 @@ fn find_last_append_at_ns(idx_path: &str, dat_path: &str, version: u32) -> Optio
let mut header = [0u8; 16]; let mut header = [0u8; 16];
dat_file.read_exact(&mut header).ok()?; dat_file.read_exact(&mut header).ok()?;
let needle_size = i32::from_be_bytes([header[12], header[13], header[14], header[15]]); let needle_size = i32::from_be_bytes([header[12], header[13], header[14], header[15]]);
if needle_size <= 0 { if needle_size < 0 {
return None; return None;
} }
// Seek to tail: offset + 16 (header) + size -> checksum (4) + timestamp (8) // Seek to tail: offset + 16 (header) + size -> checksum (4) + timestamp (8)
// For delete needles (size == 0), the tail is right after the header.
let tail_offset = actual_offset as u64 + 16 + needle_size as u64; let tail_offset = actual_offset as u64 + 16 + needle_size as u64;
dat_file.seek(SeekFrom::Start(tail_offset)).ok()?; dat_file.seek(SeekFrom::Start(tail_offset)).ok()?;

View File

@@ -0,0 +1,368 @@
package framework
import (
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"strconv"
"sync"
"testing"
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
)
// MixedVolumeCluster wraps a Go master + a mix of Go and Rust volume servers
// for integration testing. Go servers occupy indices [0, goCount) and Rust
// servers occupy indices [goCount, goCount+rustCount).
type MixedVolumeCluster struct {
testingTB testing.TB
profile matrix.Profile
weedBinary string // Go weed binary (master + Go volume servers)
rustVolumeBinary string // Rust volume binary
baseDir string
configDir string
logsDir string
keepLogs bool
masterPort int
masterGrpcPort int
volumePorts []int
volumeGrpcPorts []int
volumePubPorts []int
isRust []bool // which servers are Rust
masterCmd *exec.Cmd
volumeCmds []*exec.Cmd
cleanupOnce sync.Once
}
// StartMixedVolumeCluster starts a cluster with 1 Go master, goCount Go volume
// servers, and rustCount Rust volume servers. Go servers come first in the index.
func StartMixedVolumeCluster(t testing.TB, profile matrix.Profile, goCount, rustCount int) *MixedVolumeCluster {
t.Helper()
if goCount < 0 || rustCount < 0 {
t.Fatalf("goCount and rustCount must be non-negative, got go=%d rust=%d", goCount, rustCount)
}
total := goCount + rustCount
if total < 2 {
t.Fatalf("need at least 2 volume servers, got %d", total)
}
weedBinary, err := FindOrBuildWeedBinary()
if err != nil {
t.Fatalf("resolve weed binary: %v", err)
}
// Only build the Rust binary when Rust servers are requested.
var rustBinary string
if rustCount > 0 {
rustBinary, err = FindOrBuildRustBinary()
if err != nil {
t.Skipf("skipping mixed cluster test: rust binary unavailable: %v", err)
}
}
baseDir, keepLogs, err := newWorkDir()
if err != nil {
t.Fatalf("create temp test directory: %v", err)
}
configDir := filepath.Join(baseDir, "config")
logsDir := filepath.Join(baseDir, "logs")
masterDataDir := filepath.Join(baseDir, "master")
dirs := []string{configDir, logsDir, masterDataDir}
for i := 0; i < total; i++ {
dirs = append(dirs, filepath.Join(baseDir, fmt.Sprintf("volume%d", i)))
}
for _, dir := range dirs {
if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil {
t.Fatalf("create %s: %v", dir, mkErr)
}
}
if err = writeSecurityConfig(configDir, profile); err != nil {
t.Fatalf("write security config: %v", err)
}
masterPort, masterGrpcPort, err := allocateMasterPortPair()
if err != nil {
t.Fatalf("allocate master port pair: %v", err)
}
// 2 ports per server (admin, grpc); add 1 more when public port is split out.
portsPerServer := 2
if profile.SplitPublicPort {
portsPerServer = 3
}
ports, err := allocatePorts(total * portsPerServer)
if err != nil {
t.Fatalf("allocate volume ports: %v", err)
}
isRust := make([]bool, total)
for i := goCount; i < total; i++ {
isRust[i] = true
}
c := &MixedVolumeCluster{
testingTB: t,
profile: profile,
weedBinary: weedBinary,
rustVolumeBinary: rustBinary,
baseDir: baseDir,
configDir: configDir,
logsDir: logsDir,
keepLogs: keepLogs,
masterPort: masterPort,
masterGrpcPort: masterGrpcPort,
volumePorts: make([]int, total),
volumeGrpcPorts: make([]int, total),
volumePubPorts: make([]int, total),
isRust: isRust,
volumeCmds: make([]*exec.Cmd, total),
}
for i := 0; i < total; i++ {
baseIdx := i * portsPerServer
c.volumePorts[i] = ports[baseIdx]
c.volumeGrpcPorts[i] = ports[baseIdx+1]
if profile.SplitPublicPort {
c.volumePubPorts[i] = ports[baseIdx+2]
} else {
c.volumePubPorts[i] = c.volumePorts[i] // reuse admin port
}
}
// Start master
if err = c.startMaster(masterDataDir); err != nil {
c.Stop()
t.Fatalf("start master: %v", err)
}
helper := &Cluster{logsDir: logsDir}
if err = helper.waitForHTTP(c.MasterURL() + "/dir/status"); err != nil {
masterLog := helper.tailLog("master.log")
c.Stop()
t.Fatalf("wait for master readiness: %v\nmaster log tail:\n%s", err, masterLog)
}
// Start volume servers
for i := 0; i < total; i++ {
volumeDataDir := filepath.Join(baseDir, fmt.Sprintf("volume%d", i))
if isRust[i] {
err = c.startRustVolume(i, volumeDataDir)
} else {
err = c.startGoVolume(i, volumeDataDir)
}
if err != nil {
logTail := helper.tailLog(fmt.Sprintf("volume%d.log", i))
c.Stop()
t.Fatalf("start volume server %d (rust=%v): %v\nlog tail:\n%s", i, isRust[i], err, logTail)
}
// Rust uses /healthz, Go uses /status
healthURL := c.VolumeAdminURL(i) + "/status"
if isRust[i] {
healthURL = c.VolumeAdminURL(i) + "/healthz"
}
if err = helper.waitForHTTP(healthURL); err != nil {
logTail := helper.tailLog(fmt.Sprintf("volume%d.log", i))
c.Stop()
t.Fatalf("wait for volume server %d readiness: %v\nlog tail:\n%s", i, err, logTail)
}
if err = helper.waitForTCP(c.VolumeGRPCAddress(i)); err != nil {
logTail := helper.tailLog(fmt.Sprintf("volume%d.log", i))
c.Stop()
t.Fatalf("wait for volume server %d grpc readiness: %v\nlog tail:\n%s", i, err, logTail)
}
}
t.Cleanup(func() {
c.Stop()
})
return c
}
func (c *MixedVolumeCluster) Stop() {
if c == nil {
return
}
c.cleanupOnce.Do(func() {
for i := len(c.volumeCmds) - 1; i >= 0; i-- {
stopProcess(c.volumeCmds[i])
}
stopProcess(c.masterCmd)
if !c.keepLogs && !c.testingTB.Failed() {
_ = os.RemoveAll(c.baseDir)
} else if c.baseDir != "" {
c.testingTB.Logf("mixed volume server integration logs kept at %s", c.baseDir)
}
})
}
func (c *MixedVolumeCluster) startMaster(dataDir string) error {
logFile, err := os.Create(filepath.Join(c.logsDir, "master.log"))
if err != nil {
return err
}
args := []string{
"-config_dir=" + c.configDir,
"master",
"-ip=127.0.0.1",
"-port=" + strconv.Itoa(c.masterPort),
"-port.grpc=" + strconv.Itoa(c.masterGrpcPort),
"-mdir=" + dataDir,
"-peers=none",
"-volumeSizeLimitMB=" + strconv.Itoa(testVolumeSizeLimitMB),
"-defaultReplication=000",
}
c.masterCmd = exec.Command(c.weedBinary, args...)
c.masterCmd.Dir = c.baseDir
c.masterCmd.Stdout = logFile
c.masterCmd.Stderr = logFile
if err = c.masterCmd.Start(); err != nil {
logFile.Close()
return err
}
logFile.Close() // child inherited the fd
return nil
}
func (c *MixedVolumeCluster) startGoVolume(index int, dataDir string) error {
logName := fmt.Sprintf("volume%d.log", index)
logFile, err := os.Create(filepath.Join(c.logsDir, logName))
if err != nil {
return err
}
args := []string{
"-config_dir=" + c.configDir,
"volume",
"-ip=127.0.0.1",
"-port=" + strconv.Itoa(c.volumePorts[index]),
"-port.grpc=" + strconv.Itoa(c.volumeGrpcPorts[index]),
"-port.public=" + strconv.Itoa(c.volumePubPorts[index]),
"-dir=" + dataDir,
"-max=16",
"-master=127.0.0.1:" + strconv.Itoa(c.masterPort),
"-readMode=" + c.profile.ReadMode,
"-concurrentUploadLimitMB=" + strconv.Itoa(c.profile.ConcurrentUploadLimitMB),
"-concurrentDownloadLimitMB=" + strconv.Itoa(c.profile.ConcurrentDownloadLimitMB),
}
if c.profile.InflightUploadTimeout > 0 {
args = append(args, "-inflightUploadDataTimeout="+c.profile.InflightUploadTimeout.String())
}
if c.profile.InflightDownloadTimeout > 0 {
args = append(args, "-inflightDownloadDataTimeout="+c.profile.InflightDownloadTimeout.String())
}
cmd := exec.Command(c.weedBinary, args...)
cmd.Dir = c.baseDir
cmd.Stdout = logFile
cmd.Stderr = logFile
if err = cmd.Start(); err != nil {
logFile.Close()
return err
}
logFile.Close() // child inherited the fd
c.volumeCmds[index] = cmd
return nil
}
func (c *MixedVolumeCluster) startRustVolume(index int, dataDir string) error {
logName := fmt.Sprintf("volume%d.log", index)
logFile, err := os.Create(filepath.Join(c.logsDir, logName))
if err != nil {
return err
}
args := rustVolumeArgs(
c.profile,
c.configDir,
c.masterPort,
c.volumePorts[index],
c.volumeGrpcPorts[index],
c.volumePubPorts[index],
dataDir,
)
cmd := exec.Command(c.rustVolumeBinary, args...)
cmd.Dir = c.baseDir
cmd.Stdout = logFile
cmd.Stderr = logFile
if err = cmd.Start(); err != nil {
logFile.Close()
return err
}
logFile.Close() // child inherited the fd
c.volumeCmds[index] = cmd
return nil
}
// --- accessor methods (mirror MultiVolumeCluster) ---
func (c *MixedVolumeCluster) MasterAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.masterPort))
}
func (c *MixedVolumeCluster) MasterURL() string {
return "http://" + c.MasterAddress()
}
func (c *MixedVolumeCluster) VolumeAdminAddress(index int) string {
if index < 0 || index >= len(c.volumePorts) {
return ""
}
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePorts[index]))
}
func (c *MixedVolumeCluster) VolumePublicAddress(index int) string {
if index < 0 || index >= len(c.volumePubPorts) {
return ""
}
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePubPorts[index]))
}
func (c *MixedVolumeCluster) VolumeGRPCAddress(index int) string {
if index < 0 || index >= len(c.volumeGrpcPorts) {
return ""
}
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumeGrpcPorts[index]))
}
func (c *MixedVolumeCluster) VolumeAdminURL(index int) string {
return "http://" + c.VolumeAdminAddress(index)
}
func (c *MixedVolumeCluster) VolumePublicURL(index int) string {
return "http://" + c.VolumePublicAddress(index)
}
func (c *MixedVolumeCluster) BaseDir() string {
return c.baseDir
}
// VolumeServerAddress returns SeaweedFS server address format: ip:httpPort.grpcPort
func (c *MixedVolumeCluster) VolumeServerAddress(index int) string {
if index < 0 || index >= len(c.volumePorts) {
return ""
}
return fmt.Sprintf("%s.%d", c.VolumeAdminAddress(index), c.volumeGrpcPorts[index])
}
func (c *MixedVolumeCluster) IsRust(index int) bool {
if index < 0 || index >= len(c.isRust) {
return false
}
return c.isRust[index]
}

View File

@@ -0,0 +1,567 @@
package volume_server_grpc_test
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/test/volume_server/framework"
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
)
// deleteAndWaitForTombstone issues an HTTP DELETE for the given fid on
// volumeURL, asserts a successful response, then polls with GET until
// the file returns 404 (tombstone visible) or the timeout elapses.
func deleteAndWaitForTombstone(t *testing.T, httpClient *http.Client, volumeURL, fid string) {
t.Helper()
req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/%s", volumeURL, fid), nil)
if err != nil {
t.Fatalf("build delete request for %s: %v", fid, err)
}
resp := framework.DoRequest(t, httpClient, req)
framework.ReadAllAndClose(t, resp)
if resp.StatusCode != http.StatusAccepted && resp.StatusCode != http.StatusOK {
t.Fatalf("delete %s: expected 200 or 202, got %d", fid, resp.StatusCode)
}
// Poll until GET returns 404 (tombstone flushed to disk).
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) {
getResp := framework.ReadBytes(t, httpClient, volumeURL, fid)
status := getResp.StatusCode
framework.ReadAllAndClose(t, getResp)
if status == http.StatusNotFound {
return
}
time.Sleep(50 * time.Millisecond)
}
t.Fatalf("delete %s: tombstone not visible after 5s", fid)
}
// TestMixedBalanceCopyGoToRust verifies that VolumeCopy works from a Go volume
// server to a Rust volume server. This is the core operation behind volume
// balancing in a mixed Go+Rust cluster.
func TestMixedBalanceCopyGoToRust(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
cluster := framework.StartMixedVolumeCluster(t, matrix.P1(), 1, 1)
// server 0 = Go, server 1 = Rust
conn0, goClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
defer conn0.Close()
conn1, rustClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
defer conn1.Close()
httpClient := framework.NewHTTPClient()
const volumeID = uint32(50)
// Allocate volume on Go server and upload test data
framework.AllocateVolume(t, goClient, volumeID, "")
testFiles := []struct {
key uint64
cookie uint32
data []byte
}{
{1, 0xAABBCCDD, []byte("hello from Go server")},
{2, 0x11223344, []byte("second file for balance test")},
{3, 0xDEADBEEF, make([]byte, 4096)}, // larger file
}
for _, f := range testFiles {
fid := framework.NewFileID(volumeID, f.key, f.cookie)
resp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(0), fid, f.data)
body := framework.ReadAllAndClose(t, resp)
if resp.StatusCode != http.StatusCreated {
t.Fatalf("upload %s: expected 201, got %d: %s", fid, resp.StatusCode, body)
}
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Read source volume status before copy
sourceStatus, err := goClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: volumeID,
})
if err != nil {
t.Fatalf("ReadVolumeFileStatus on Go server: %v", err)
}
t.Logf("Source: dat=%d idx=%d files=%d version=%d",
sourceStatus.GetDatFileSize(), sourceStatus.GetIdxFileSize(),
sourceStatus.GetFileCount(), sourceStatus.GetVersion())
// Copy volume from Go (server 0) to Rust (server 1)
copyStream, err := rustClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
VolumeId: volumeID,
SourceDataNode: cluster.VolumeServerAddress(0),
})
if err != nil {
t.Fatalf("VolumeCopy call failed: %v", err)
}
var lastAppendAtNs uint64
for {
resp, recvErr := copyStream.Recv()
if recvErr != nil {
if recvErr != io.EOF {
t.Fatalf("VolumeCopy recv error: %v", recvErr)
}
break
}
if resp.GetLastAppendAtNs() != 0 {
lastAppendAtNs = resp.GetLastAppendAtNs()
}
}
t.Logf("VolumeCopy completed, lastAppendAtNs=%d", lastAppendAtNs)
// Verify: read volume status from Rust server
targetStatus, err := rustClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: volumeID,
})
if err != nil {
t.Fatalf("ReadVolumeFileStatus on Rust server: %v", err)
}
t.Logf("Target: dat=%d idx=%d files=%d version=%d",
targetStatus.GetDatFileSize(), targetStatus.GetIdxFileSize(),
targetStatus.GetFileCount(), targetStatus.GetVersion())
if sourceStatus.GetDatFileSize() != targetStatus.GetDatFileSize() {
t.Fatalf("dat file size mismatch: source=%d target=%d",
sourceStatus.GetDatFileSize(), targetStatus.GetDatFileSize())
}
if sourceStatus.GetIdxFileSize() != targetStatus.GetIdxFileSize() {
t.Fatalf("idx file size mismatch: source=%d target=%d",
sourceStatus.GetIdxFileSize(), targetStatus.GetIdxFileSize())
}
if sourceStatus.GetFileCount() != targetStatus.GetFileCount() {
t.Fatalf("file count mismatch: source=%d target=%d",
sourceStatus.GetFileCount(), targetStatus.GetFileCount())
}
// Verify data can be read from Rust server
for _, f := range testFiles {
fid := framework.NewFileID(volumeID, f.key, f.cookie)
resp := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(1), fid)
body := framework.ReadAllAndClose(t, resp)
if resp.StatusCode != http.StatusOK {
t.Fatalf("read %s from Rust server: expected 200, got %d", fid, resp.StatusCode)
}
if !bytes.Equal(body, f.data) {
t.Fatalf("read %s from Rust server: content mismatch (got %d bytes, want %d)", fid, len(body), len(f.data))
}
}
}
// TestMixedBalanceCopyRustToGo verifies that VolumeCopy works from a Rust
// volume server to a Go volume server.
func TestMixedBalanceCopyRustToGo(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
cluster := framework.StartMixedVolumeCluster(t, matrix.P1(), 1, 1)
conn0, goClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
defer conn0.Close()
conn1, rustClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
defer conn1.Close()
httpClient := framework.NewHTTPClient()
const volumeID = uint32(51)
// Allocate volume on Rust server and upload test data
framework.AllocateVolume(t, rustClient, volumeID, "")
testFiles := []struct {
key uint64
cookie uint32
data []byte
}{
{1, 0xAABBCCDD, []byte("hello from Rust server")},
{2, 0x11223344, []byte("second file for reverse balance")},
}
for _, f := range testFiles {
fid := framework.NewFileID(volumeID, f.key, f.cookie)
resp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(1), fid, f.data)
body := framework.ReadAllAndClose(t, resp)
if resp.StatusCode != http.StatusCreated {
t.Fatalf("upload %s: expected 201, got %d: %s", fid, resp.StatusCode, body)
}
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
sourceStatus, err := rustClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: volumeID,
})
if err != nil {
t.Fatalf("ReadVolumeFileStatus on Rust server: %v", err)
}
// Copy volume from Rust (server 1) to Go (server 0)
copyStream, err := goClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
VolumeId: volumeID,
SourceDataNode: cluster.VolumeServerAddress(1),
})
if err != nil {
t.Fatalf("VolumeCopy call failed: %v", err)
}
for {
_, recvErr := copyStream.Recv()
if recvErr != nil {
if recvErr != io.EOF {
t.Fatalf("VolumeCopy recv error: %v", recvErr)
}
break
}
}
targetStatus, err := goClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: volumeID,
})
if err != nil {
t.Fatalf("ReadVolumeFileStatus on Go server: %v", err)
}
if sourceStatus.GetDatFileSize() != targetStatus.GetDatFileSize() {
t.Fatalf("dat file size mismatch: source=%d target=%d",
sourceStatus.GetDatFileSize(), targetStatus.GetDatFileSize())
}
if sourceStatus.GetIdxFileSize() != targetStatus.GetIdxFileSize() {
t.Fatalf("idx file size mismatch: source=%d target=%d",
sourceStatus.GetIdxFileSize(), targetStatus.GetIdxFileSize())
}
if sourceStatus.GetFileCount() != targetStatus.GetFileCount() {
t.Fatalf("file count mismatch: source=%d target=%d",
sourceStatus.GetFileCount(), targetStatus.GetFileCount())
}
// Verify data can be read from Go server
for _, f := range testFiles {
fid := framework.NewFileID(volumeID, f.key, f.cookie)
resp := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(0), fid)
body := framework.ReadAllAndClose(t, resp)
if resp.StatusCode != http.StatusOK {
t.Fatalf("read %s from Go server: expected 200, got %d", fid, resp.StatusCode)
}
if !bytes.Equal(body, f.data) {
t.Fatalf("read %s from Go server: content mismatch (got %d bytes, want %d)", fid, len(body), len(f.data))
}
}
}
// TestMixedBalanceCopyWithDeletes verifies that VolumeCopy correctly handles
// volumes that have both active and deleted needles.
func TestMixedBalanceCopyWithDeletes(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
cluster := framework.StartMixedVolumeCluster(t, matrix.P1(), 1, 1)
conn0, goClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
defer conn0.Close()
conn1, rustClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
defer conn1.Close()
httpClient := framework.NewHTTPClient()
const volumeID = uint32(52)
framework.AllocateVolume(t, goClient, volumeID, "")
// Upload files
for i := uint64(1); i <= 5; i++ {
fid := framework.NewFileID(volumeID, i, 0x12340000+uint32(i))
resp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(0), fid, []byte(fmt.Sprintf("file-%d", i)))
framework.ReadAllAndClose(t, resp)
if resp.StatusCode != http.StatusCreated {
t.Fatalf("upload file %d: expected 201, got %d", i, resp.StatusCode)
}
}
// Delete some files and wait for tombstones to be visible
for _, key := range []uint64{2, 4} {
fid := framework.NewFileID(volumeID, key, 0x12340000+uint32(key))
deleteAndWaitForTombstone(t, httpClient, cluster.VolumeAdminURL(0), fid)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
sourceStatus, err := goClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: volumeID,
})
if err != nil {
t.Fatalf("ReadVolumeFileStatus: %v", err)
}
t.Logf("Source after deletes: dat=%d idx=%d files=%d",
sourceStatus.GetDatFileSize(), sourceStatus.GetIdxFileSize(),
sourceStatus.GetFileCount())
// Copy volume from Go to Rust
copyStream, err := rustClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
VolumeId: volumeID,
SourceDataNode: cluster.VolumeServerAddress(0),
})
if err != nil {
t.Fatalf("VolumeCopy: %v", err)
}
var lastAppendAtNs uint64
for {
resp, recvErr := copyStream.Recv()
if recvErr != nil {
if recvErr != io.EOF {
t.Fatalf("VolumeCopy recv error: %v", recvErr)
}
break
}
if resp.GetLastAppendAtNs() != 0 {
lastAppendAtNs = resp.GetLastAppendAtNs()
}
}
if lastAppendAtNs == 0 {
t.Fatalf("VolumeCopy did not return a lastAppendAtNs timestamp")
}
t.Logf("VolumeCopy completed, lastAppendAtNs=%d", lastAppendAtNs)
targetStatusAfterCopy, err := rustClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: volumeID,
})
if err != nil {
t.Fatalf("ReadVolumeFileStatus on Rust: %v", err)
}
t.Logf("Target after copy: dat=%d idx=%d files=%d",
targetStatusAfterCopy.GetDatFileSize(), targetStatusAfterCopy.GetIdxFileSize(),
targetStatusAfterCopy.GetFileCount())
if sourceStatus.GetDatFileSize() != targetStatusAfterCopy.GetDatFileSize() {
t.Fatalf("dat file size mismatch: source=%d target=%d",
sourceStatus.GetDatFileSize(), targetStatusAfterCopy.GetDatFileSize())
}
if sourceStatus.GetIdxFileSize() != targetStatusAfterCopy.GetIdxFileSize() {
t.Fatalf("idx file size mismatch: source=%d target=%d",
sourceStatus.GetIdxFileSize(), targetStatusAfterCopy.GetIdxFileSize())
}
// Tail from the copy checkpoint — source is unchanged (deletes happened
// before copy), so tailing should not append any data.
_, err = rustClient.VolumeTailReceiver(ctx, &volume_server_pb.VolumeTailReceiverRequest{
VolumeId: volumeID,
SinceNs: lastAppendAtNs,
IdleTimeoutSeconds: 3,
SourceVolumeServer: cluster.VolumeServerAddress(0),
})
if err != nil {
t.Fatalf("VolumeTailReceiver: %v", err)
}
targetStatusAfterTail, err := rustClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: volumeID,
})
if err != nil {
t.Fatalf("ReadVolumeFileStatus after tail: %v", err)
}
if targetStatusAfterTail.GetDatFileSize() != targetStatusAfterCopy.GetDatFileSize() {
t.Fatalf("dat grew after tail: before=%d after=%d",
targetStatusAfterCopy.GetDatFileSize(), targetStatusAfterTail.GetDatFileSize())
}
if targetStatusAfterTail.GetIdxFileSize() != targetStatusAfterCopy.GetIdxFileSize() {
t.Fatalf("idx grew after tail: before=%d after=%d",
targetStatusAfterCopy.GetIdxFileSize(), targetStatusAfterTail.GetIdxFileSize())
}
// Verify surviving files are readable from Rust
for _, key := range []uint64{1, 3, 5} {
fid := framework.NewFileID(volumeID, key, 0x12340000+uint32(key))
resp := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(1), fid)
body := framework.ReadAllAndClose(t, resp)
if resp.StatusCode != http.StatusOK {
t.Fatalf("read surviving file %d from Rust: expected 200, got %d", key, resp.StatusCode)
}
expected := fmt.Sprintf("file-%d", key)
if string(body) != expected {
t.Fatalf("data mismatch for file %d: got %q want %q", key, body, expected)
}
}
// Verify deleted files return 404
for _, key := range []uint64{2, 4} {
fid := framework.NewFileID(volumeID, key, 0x12340000+uint32(key))
resp := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(1), fid)
framework.ReadAllAndClose(t, resp)
if resp.StatusCode != http.StatusNotFound {
t.Fatalf("read deleted file %d from Rust: expected 404, got %d", key, resp.StatusCode)
}
}
}
// TestMixedBalanceFullMoveGoToRust exercises the complete volume balance move
// flow: mark readonly → copy → tail → verify sizes → delete source.
// This mirrors the steps in balance_task.go Execute().
func TestMixedBalanceFullMoveGoToRust(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
cluster := framework.StartMixedVolumeCluster(t, matrix.P1(), 1, 1)
conn0, goClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
defer conn0.Close()
conn1, rustClient := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
defer conn1.Close()
httpClient := framework.NewHTTPClient()
const volumeID = uint32(53)
framework.AllocateVolume(t, goClient, volumeID, "")
// Upload test data
for i := uint64(1); i <= 5; i++ {
fid := framework.NewFileID(volumeID, i, 0xABCD0000+uint32(i))
resp := framework.UploadBytes(t, httpClient, cluster.VolumeAdminURL(0), fid, []byte(fmt.Sprintf("balance-move-file-%d", i)))
framework.ReadAllAndClose(t, resp)
if resp.StatusCode != http.StatusCreated {
t.Fatalf("upload file %d: expected 201, got %d", i, resp.StatusCode)
}
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
// Step 1: Copy volume to Rust server (source stays writable so we can
// delete after copy to exercise the tail tombstone path)
copyStream, err := rustClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
VolumeId: volumeID,
SourceDataNode: cluster.VolumeServerAddress(0),
})
if err != nil {
t.Fatalf("VolumeCopy: %v", err)
}
var lastAppendAtNs uint64
for {
resp, recvErr := copyStream.Recv()
if recvErr != nil {
if recvErr != io.EOF {
t.Fatalf("VolumeCopy recv: %v", recvErr)
}
break
}
if resp.GetLastAppendAtNs() != 0 {
lastAppendAtNs = resp.GetLastAppendAtNs()
}
}
t.Logf("Copy done, lastAppendAtNs=%d", lastAppendAtNs)
// Step 2: Delete file 3 on the source AFTER copy. This creates a
// tombstone needle that the tail step must propagate to the Rust server.
deleteAndWaitForTombstone(t, httpClient, cluster.VolumeAdminURL(0),
framework.NewFileID(volumeID, 3, 0xABCD0003))
// Step 3: Mark source readonly so no further writes arrive during tail
_, err = goClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: volumeID,
})
if err != nil {
t.Fatalf("mark readonly: %v", err)
}
// Read source status (the reference for post-tail verification)
sourceStatus, err := goClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: volumeID,
})
if err != nil {
t.Fatalf("read source status: %v", err)
}
t.Logf("Source: dat=%d idx=%d files=%d",
sourceStatus.GetDatFileSize(), sourceStatus.GetIdxFileSize(),
sourceStatus.GetFileCount())
// Step 4: Tail for updates — this must pick up the delete tombstone
_, err = rustClient.VolumeTailReceiver(ctx, &volume_server_pb.VolumeTailReceiverRequest{
VolumeId: volumeID,
SinceNs: lastAppendAtNs,
IdleTimeoutSeconds: 5,
SourceVolumeServer: cluster.VolumeServerAddress(0),
})
if err != nil {
t.Fatalf("VolumeTailReceiver: %v", err)
}
// Step 5: Verify dat/idx sizes match after tail.
// We compare file sizes (byte-level correctness) rather than file_count
// because the tail writes the tombstone via the write path (which
// increments file_count) while the source used the delete path (which
// only increments deletion_count). This is consistent with Go behavior.
targetStatus, err := rustClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: volumeID,
})
if err != nil {
t.Fatalf("read target status: %v", err)
}
t.Logf("Target: dat=%d idx=%d files=%d",
targetStatus.GetDatFileSize(), targetStatus.GetIdxFileSize(),
targetStatus.GetFileCount())
if sourceStatus.GetDatFileSize() != targetStatus.GetDatFileSize() {
t.Fatalf("dat size mismatch after tail: source=%d target=%d",
sourceStatus.GetDatFileSize(), targetStatus.GetDatFileSize())
}
if sourceStatus.GetIdxFileSize() != targetStatus.GetIdxFileSize() {
t.Fatalf("idx size mismatch after tail: source=%d target=%d",
sourceStatus.GetIdxFileSize(), targetStatus.GetIdxFileSize())
}
// Step 6: Delete volume from source
_, err = goClient.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{
VolumeId: volumeID,
})
if err != nil {
t.Fatalf("delete source volume: %v", err)
}
// Verify source volume is gone
_, err = goClient.ReadVolumeFileStatus(ctx, &volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: volumeID,
})
if err == nil {
t.Fatalf("expected error reading deleted source volume, got nil")
}
t.Logf("Source volume deleted successfully (error as expected: %v)", err)
// Verify all surviving data is readable from Rust (the new home)
for _, key := range []uint64{1, 2, 4, 5} {
fid := framework.NewFileID(volumeID, key, 0xABCD0000+uint32(key))
resp := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(1), fid)
body := framework.ReadAllAndClose(t, resp)
if resp.StatusCode != http.StatusOK {
t.Fatalf("read file %d from Rust after move: expected 200, got %d", key, resp.StatusCode)
}
expected := fmt.Sprintf("balance-move-file-%d", key)
if string(body) != expected {
t.Fatalf("data mismatch for file %d after move: got %q want %q", key, body, expected)
}
}
// Verify deleted file is 404 on Rust (tombstone propagated via tail)
{
fid := framework.NewFileID(volumeID, 3, 0xABCD0003)
resp := framework.ReadBytes(t, httpClient, cluster.VolumeAdminURL(1), fid)
framework.ReadAllAndClose(t, resp)
if resp.StatusCode != http.StatusNotFound {
t.Fatalf("deleted file 3 on Rust: expected 404, got %d", resp.StatusCode)
}
}
t.Logf("Full balance move completed: volume %d moved from Go to Rust, source purged", volumeID)
}