* Enhance volume.merge command with deduplication and disk-based backend * Fix copyVolume function call with correct argument order and missing bool parameter * Revert "Fix copyVolume function call with correct argument order and missing bool parameter" This reverts commit 7b4a190643576fec11f896b26bcad03dd02da2f7. * Fix critical issues: per-replica writable tracking, tail goroutine cancellation via done channel, and debug logging for allocation failures * Optimize memory usage with watermark approach for duplicate detection * Fix critical issues: swap copyVolume arguments, increase idle timeout, remove file double-close, use glog for logging * Replace temporary file with in-memory buffer for needle blob serialization * test(volume.merge): Add comprehensive unit and integration tests Add 7 unit tests covering: - Ordering by timestamp - Cross-stream duplicate deduplication - Empty stream handling - Complex multi-stream deduplication - Single stream passthrough - Large needle ID support - LastModified fallback when timestamp unavailable Add 2 integration validation tests: - TestMergeWorkflowValidation: Documents 9-stage merge workflow - TestMergeEdgeCaseHandling: Validates 10 edge case handling All tests passing (9/9) * fix(volume.merge): Use time window for deduplication to handle clock skew The same needle ID can have different timestamps on different servers due to clock skew and replication lag. Needles with the same ID within a 5-second time window are now treated as duplicates (same write with timestamp variance). Key changes: - Add mergeDeduplicationWindowNs constant (5 seconds) - Replace exact timestamp matching with time window comparison - Use windowInitialized flag to properly detect window transitions - Add TestMergeNeedleStreamsTimeWindowDeduplication test This ensures that replicated writes with slight timestamp differences are properly deduplicated during merge, while separate updates to the same file ID (outside the window) are preserved. All tests passing (10/10) * test: Add volume.merge integration tests with 5 comprehensive test cases * test: integration tests for volume.merge command * Fix integration tests: use TripleVolumeCluster for volume.merge testing - Created new TripleVolumeCluster framework (cluster_triple.go) with 3 volume servers - Rebuilt weed binary with volume.merge command compiled in - Updated all 5 integration tests to use TripleVolumeCluster instead of DualVolumeCluster - Tests now properly allocate volumes on 2 servers and let merge allocate on 3rd - All 5 integration tests now pass: - TestVolumeMergeBasic - TestVolumeMergeReadonly - TestVolumeMergeRestore - TestVolumeMergeTailNeedles - TestVolumeMergeDivergentReplicas * Refactor test framework: use parameterized server count instead of hardcoded - Renamed TripleVolumeCluster to MultiVolumeCluster with serverCount parameter - Replaced hardcoded volumePort0/1/2 with slices for flexible server count - Updated StartTripleVolumeCluster as backward-compatible wrapper calling StartMultiVolumeCluster(t, profile, 3) - Made directory creation, port allocation, and server startup loop-based - Updated accessor methods (VolumeAdminAddress, VolumeGRPCAddress, etc.) to support any server count - All 5 integration tests continue to pass with new parameterized cluster framework - Enables future testing with 2, 4, 5+ volume servers by calling StartMultiVolumeCluster directly * Consolidate cluster frameworks: StartDualVolumeCluster now uses MultiVolumeCluster - Made DualVolumeCluster a type alias for MultiVolumeCluster - Updated StartDualVolumeCluster to call StartMultiVolumeCluster(t, profile, 2) - Removed duplicate code from cluster_dual.go (now just 17 lines) - All existing tests using StartDualVolumeCluster continue to work without changes - Backward compatible: existing code continues to use the old function signatures - Added wrapper functions in cluster_multi.go for StartTripleVolumeCluster - Enables unified cluster management across all test suites * Address PR review comments: improve error handling and clean up code - Replace parse error swallow with proper error return - Log cleanup and restoration errors instead of silently discarding them - Remove unused offset field from memoryBackendFile struct - Fix WriteAt buffer truncation bug to preserve trailing bytes - All unit tests passing (10/10) - Code compiles successfully * Fix PR review findings: test improvements and code quality - Add timeout to runWeedShell to prevent hanging - Add server 1 readonly status verification in tests - Assert merge fails when replicas writable (not just log output) - Replace sleep with polling for writable restoration check - Fix WriteAt stale data snapshot bug in memoryBackendFile - Fix startVolume error logging to show current server log - Fix volumePubPorts double assignment in port allocation - Rename test to reflect behavior: DoesNotDeduplicateAcrossWindows - Fix misleading dedup window comment Unit tests: 10/10 passing Binary: Compiles successfully * Fix test assumption: merge command marks volumes readonly automatically TestVolumeMergeReadonly was expecting merge to fail on writable volumes, but the merge command is designed to mark volumes readonly as part of its operation. Fixed test to verify merge succeeds on writable volumes and properly restores writable state afterward. Removed redundant Test 2 code that duplicated the new behavior. * fmt * Fix deduplication logic to correctly handle same-stream vs cross-stream duplicates The dedup map previously used only NeedleId as key, causing same-stream overwrites to be incorrectly skipped as duplicates. Changed to track which stream first processed each needle ID in the current window: - Cross-stream duplicates (same ID from different streams, within window) are skipped - Same-stream duplicates (overwrites from same stream) are kept - Map now stores: needleId -> streamIndex of first occurrence in window Added TestMergeNeedleStreamsSameStreamDuplicates to verify same-stream overwrites are preserved while cross-stream duplicates are skipped. All unit tests passing (11/11) Binary compiles successfully
455 lines
14 KiB
Go
455 lines
14 KiB
Go
package volume_server_merge_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"os/exec"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/test/volume_server/framework"
|
|
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
)
|
|
|
|
// runWeedShell executes a weed shell command by providing commands via stdin with lock/unlock.
|
|
// It uses a timeout to prevent hanging if the weed shell process becomes unresponsive.
|
|
func runWeedShell(t *testing.T, weedBinary, masterAddr, shellCommand string) (output string, err error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
cmd := exec.CommandContext(ctx, weedBinary, "shell", "-master="+masterAddr)
|
|
// Wrap command in lock/unlock for cluster-wide operations
|
|
shellCommands := "lock\n" + shellCommand + "\nunlock\nexit\n"
|
|
cmd.Stdin = strings.NewReader(shellCommands)
|
|
outputBytes, err := cmd.CombinedOutput()
|
|
output = string(outputBytes)
|
|
if err != nil {
|
|
if ctx.Err() == context.DeadlineExceeded {
|
|
t.Logf("weed shell command '%s' timed out after 30s", shellCommand)
|
|
} else {
|
|
t.Logf("weed shell command '%s' output: %s, error: %v", shellCommand, output, err)
|
|
}
|
|
}
|
|
return output, err
|
|
}
|
|
|
|
// TestVolumeMergeBasic verifies the basic volume.merge workflow using the weed shell command
|
|
func TestVolumeMergeBasic(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
// Start a triple cluster with 3 volume servers (needed for merge which allocates to a third location)
|
|
cluster := framework.StartTripleVolumeCluster(t, matrix.P1())
|
|
|
|
// Connect to volume servers to allocate volumes
|
|
conn0, volumeClient0 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
|
|
defer conn0.Close()
|
|
|
|
conn1, volumeClient1 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
|
|
defer conn1.Close()
|
|
|
|
const volumeID = uint32(100)
|
|
|
|
// Allocate volume on only 2 servers (replicas)
|
|
// The merge command will allocate on the 3rd server as a temporary location
|
|
framework.AllocateVolume(t, volumeClient0, volumeID, "")
|
|
framework.AllocateVolume(t, volumeClient1, volumeID, "")
|
|
|
|
t.Logf("Successfully allocated volume %d on servers 0 and 1 as replicas", volumeID)
|
|
|
|
// Get weed binary
|
|
weedBinary := os.Getenv("WEED_BINARY")
|
|
if weedBinary == "" {
|
|
var err error
|
|
weedBinary, err = framework.FindOrBuildWeedBinary()
|
|
if err != nil {
|
|
t.Fatalf("failed to find weed binary: %v", err)
|
|
}
|
|
}
|
|
|
|
// Execute volume.merge command via weed shell
|
|
output, err := runWeedShell(t, weedBinary, cluster.MasterAddress(), fmt.Sprintf("volume.merge -volumeId %d", volumeID))
|
|
|
|
t.Logf("volume.merge command output:\n%s", output)
|
|
|
|
if err != nil {
|
|
t.Fatalf("volume.merge command failed: %v\noutput: %s", err, output)
|
|
}
|
|
|
|
// Verify the success message in output
|
|
if !strings.Contains(output, fmt.Sprintf("merged volume %d", volumeID)) {
|
|
t.Fatalf("expected success message in output, got: %s", output)
|
|
}
|
|
|
|
t.Logf("Successfully executed volume.merge command for volume %d", volumeID)
|
|
}
|
|
|
|
// TestVolumeMergeReadonly verifies that volume.merge requires readonly state
|
|
func TestVolumeMergeReadonly(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
cluster := framework.StartTripleVolumeCluster(t, matrix.P1())
|
|
|
|
// Connect to volume servers
|
|
conn0, volumeClient0 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
|
|
defer conn0.Close()
|
|
|
|
conn1, volumeClient1 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
|
|
defer conn1.Close()
|
|
|
|
const volumeID = uint32(101)
|
|
|
|
// Allocate volumes on only 2 servers (the merge will allocate on the 3rd)
|
|
framework.AllocateVolume(t, volumeClient0, volumeID, "")
|
|
framework.AllocateVolume(t, volumeClient1, volumeID, "")
|
|
|
|
// Get weed binary
|
|
weedBinary := os.Getenv("WEED_BINARY")
|
|
if weedBinary == "" {
|
|
var err error
|
|
weedBinary, err = framework.FindOrBuildWeedBinary()
|
|
if err != nil {
|
|
t.Fatalf("failed to find weed binary: %v", err)
|
|
}
|
|
}
|
|
|
|
// Test 1: Merge while writable (merge command will mark volumes readonly as needed)
|
|
output, err := runWeedShell(t, weedBinary, cluster.MasterAddress(), fmt.Sprintf("volume.merge -volumeId %d", volumeID))
|
|
if err != nil {
|
|
t.Logf("merge on writable volumes failed: %v\noutput: %s", err, output)
|
|
t.Fatalf("volume.merge should work on writable volumes (marks them readonly internally)")
|
|
}
|
|
|
|
if !strings.Contains(output, fmt.Sprintf("merged volume %d", volumeID)) {
|
|
t.Fatalf("expected success message in output, got: %s", output)
|
|
}
|
|
|
|
// Verify volumes were marked readonly during merge and restored after
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
// Check that volumes are writable again after merge (were restored)
|
|
status0, err := volumeClient0.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to get status after merge: %v", err)
|
|
}
|
|
|
|
status1, err := volumeClient1.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to get status from server 1 after merge: %v", err)
|
|
}
|
|
|
|
if status0.GetIsReadOnly() {
|
|
t.Fatalf("expected volume to be writable again after merge on server 0")
|
|
}
|
|
|
|
if status1.GetIsReadOnly() {
|
|
t.Fatalf("expected volume to be writable again after merge on server 1")
|
|
}
|
|
|
|
t.Logf("Successfully tested merge on writable volumes and writable restoration")
|
|
}
|
|
|
|
// TestVolumeMergeRestore verifies that merge restores writable state for originally-writable replicas
|
|
func TestVolumeMergeRestore(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
cluster := framework.StartTripleVolumeCluster(t, matrix.P1())
|
|
|
|
conn0, volumeClient0 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
|
|
defer conn0.Close()
|
|
|
|
conn1, volumeClient1 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
|
|
defer conn1.Close()
|
|
|
|
const volumeID = uint32(102)
|
|
|
|
// Allocate volume on only 2 servers (the merge will allocate on the 3rd)
|
|
framework.AllocateVolume(t, volumeClient0, volumeID, "")
|
|
framework.AllocateVolume(t, volumeClient1, volumeID, "")
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
// Mark both as readonly
|
|
_, err := volumeClient0.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
|
VolumeId: volumeID,
|
|
Persist: false,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to mark readonly: %v", err)
|
|
}
|
|
|
|
_, err = volumeClient1.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
|
VolumeId: volumeID,
|
|
Persist: false,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to mark readonly on server 1: %v", err)
|
|
}
|
|
|
|
// Get weed binary
|
|
weedBinary := os.Getenv("WEED_BINARY")
|
|
if weedBinary == "" {
|
|
var err error
|
|
weedBinary, err = framework.FindOrBuildWeedBinary()
|
|
if err != nil {
|
|
t.Fatalf("failed to find weed binary: %v", err)
|
|
}
|
|
}
|
|
|
|
// Execute volume.merge via shell
|
|
output, err := runWeedShell(t, weedBinary, cluster.MasterAddress(), fmt.Sprintf("volume.merge -volumeId %d", volumeID))
|
|
|
|
t.Logf("volume.merge output: %s, error: %v", output, err)
|
|
|
|
if err != nil {
|
|
t.Fatalf("volume.merge failed: %v\noutput: %s", err, output)
|
|
}
|
|
|
|
if !strings.Contains(output, fmt.Sprintf("merged volume %d", volumeID)) {
|
|
t.Fatalf("expected success message in output, got: %s", output)
|
|
}
|
|
|
|
// After merge, verify that originally-writable replicas are writable again
|
|
// (The merge command should restore writable state for replicas that were writable before readonly)
|
|
// Actually both were writable initially, then marked readonly, so both should be restored
|
|
|
|
// Poll for writable state restoration instead of fixed sleep
|
|
maxRetries := 50 // ~5s total with 100ms sleeps
|
|
for retries := 0; retries < maxRetries; retries++ {
|
|
status0, err := volumeClient0.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err == nil && !status0.GetIsReadOnly() {
|
|
// Server 0 is writable, check server 1
|
|
status1, err := volumeClient1.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err == nil && !status1.GetIsReadOnly() {
|
|
// Both are writable, break out
|
|
break
|
|
}
|
|
}
|
|
if retries < maxRetries-1 {
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
status0Final, err := volumeClient0.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to get final status for server 0: %v", err)
|
|
}
|
|
|
|
status1Final, err := volumeClient1.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to get final status for server 1: %v", err)
|
|
}
|
|
|
|
if status0Final.GetIsReadOnly() {
|
|
t.Fatalf("expected volume %d to be writable on server 0 after merge, but it's still readonly", volumeID)
|
|
}
|
|
|
|
if status1Final.GetIsReadOnly() {
|
|
t.Fatalf("expected volume %d to be writable on server 1 after merge, but it's still readonly", volumeID)
|
|
}
|
|
|
|
t.Logf("After merge - volume %d on server 0: readonly=%v, server 1: readonly=%v", volumeID, status0Final.GetIsReadOnly(), status1Final.GetIsReadOnly())
|
|
|
|
t.Logf("Successfully tested merge and restore workflow for volume %d", volumeID)
|
|
}
|
|
|
|
// TestVolumeMergeTailNeedles verifies the volume.merge command with empty volumes
|
|
func TestVolumeMergeTailNeedles(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
cluster := framework.StartTripleVolumeCluster(t, matrix.P1())
|
|
|
|
conn0, volumeClient0 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
|
|
defer conn0.Close()
|
|
|
|
conn1, volumeClient1 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
|
|
defer conn1.Close()
|
|
|
|
const volumeID = uint32(200)
|
|
|
|
// Allocate empty volumes on only 2 servers (the merge will allocate on the 3rd)
|
|
framework.AllocateVolume(t, volumeClient0, volumeID, "")
|
|
framework.AllocateVolume(t, volumeClient1, volumeID, "")
|
|
|
|
// Mark as readonly
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
_, err := volumeClient0.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
|
VolumeId: volumeID,
|
|
Persist: false,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to mark readonly on server 0: %v", err)
|
|
}
|
|
|
|
_, err = volumeClient1.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
|
VolumeId: volumeID,
|
|
Persist: false,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to mark readonly on server 1: %v", err)
|
|
}
|
|
|
|
// Get weed binary
|
|
weedBinary := os.Getenv("WEED_BINARY")
|
|
if weedBinary == "" {
|
|
var err error
|
|
weedBinary, err = framework.FindOrBuildWeedBinary()
|
|
if err != nil {
|
|
t.Fatalf("failed to find weed binary: %v", err)
|
|
}
|
|
}
|
|
|
|
// Execute volume.merge command on empty volumes
|
|
output, err := runWeedShell(t, weedBinary, cluster.MasterAddress(), fmt.Sprintf("volume.merge -volumeId %d", volumeID))
|
|
|
|
t.Logf("merge empty volumes - output: %s, error: %v", output, err)
|
|
|
|
if err != nil {
|
|
t.Fatalf("volume.merge failed on empty volumes: %v\noutput: %s", err, output)
|
|
}
|
|
|
|
// Verify merge completed successfully
|
|
if !strings.Contains(output, fmt.Sprintf("merged volume %d", volumeID)) {
|
|
t.Fatalf("expected success message in output, got: %s", output)
|
|
}
|
|
|
|
t.Logf("Successfully merged empty volumes %d", volumeID)
|
|
}
|
|
|
|
// TestVolumeMergeDivergentReplicas simulates a realistic merge scenario using shell command
|
|
func TestVolumeMergeDivergentReplicas(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("skipping integration test in short mode")
|
|
}
|
|
|
|
cluster := framework.StartTripleVolumeCluster(t, matrix.P1())
|
|
|
|
// Connect to both servers
|
|
conn0, volumeClient0 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(0))
|
|
defer conn0.Close()
|
|
|
|
conn1, volumeClient1 := framework.DialVolumeServer(t, cluster.VolumeGRPCAddress(1))
|
|
defer conn1.Close()
|
|
|
|
const volumeID = uint32(201)
|
|
|
|
// Allocate the same volume on only 2 servers (the merge will allocate on the 3rd)
|
|
framework.AllocateVolume(t, volumeClient0, volumeID, "")
|
|
framework.AllocateVolume(t, volumeClient1, volumeID, "")
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
// Verify both volumes are initially writable
|
|
status0, err := volumeClient0.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to get status for server 0: %v", err)
|
|
}
|
|
|
|
status1, err := volumeClient1.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to get status for server 1: %v", err)
|
|
}
|
|
|
|
if status0.GetIsReadOnly() || status1.GetIsReadOnly() {
|
|
t.Fatalf("expected both volumes to be writable initially")
|
|
}
|
|
|
|
// Mark both as readonly to simulate merge precondition
|
|
_, err = volumeClient0.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
|
VolumeId: volumeID,
|
|
Persist: false,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to mark readonly on server 0: %v", err)
|
|
}
|
|
|
|
_, err = volumeClient1.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
|
|
VolumeId: volumeID,
|
|
Persist: false,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to mark readonly on server 1: %v", err)
|
|
}
|
|
|
|
// Verify both are readonly
|
|
status0Again, err := volumeClient0.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to get status after readonly: %v", err)
|
|
}
|
|
|
|
if !status0Again.GetIsReadOnly() {
|
|
t.Fatalf("expected volume %d to be readonly", volumeID)
|
|
}
|
|
|
|
// Also verify server 1 is readonly
|
|
status1Again, err := volumeClient1.VolumeStatus(ctx, &volume_server_pb.VolumeStatusRequest{
|
|
VolumeId: volumeID,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("failed to get status on server 1 after readonly: %v", err)
|
|
}
|
|
|
|
if !status1Again.GetIsReadOnly() {
|
|
t.Fatalf("expected volume %d to be readonly on server 1", volumeID)
|
|
}
|
|
|
|
// Get weed binary
|
|
weedBinary := os.Getenv("WEED_BINARY")
|
|
if weedBinary == "" {
|
|
var err error
|
|
weedBinary, err = framework.FindOrBuildWeedBinary()
|
|
if err != nil {
|
|
t.Fatalf("failed to find weed binary: %v", err)
|
|
}
|
|
}
|
|
|
|
// Execute volume.merge command via shell
|
|
output, err := runWeedShell(t, weedBinary, cluster.MasterAddress(), fmt.Sprintf("volume.merge -volumeId %d", volumeID))
|
|
|
|
t.Logf("merge divergent replicas - output: %s, error: %v", output, err)
|
|
|
|
if err != nil {
|
|
t.Fatalf("volume.merge failed: %v\noutput: %s", err, output)
|
|
}
|
|
|
|
// Verify merge completed successfully
|
|
if !strings.Contains(output, fmt.Sprintf("merged volume %d", volumeID)) {
|
|
t.Fatalf("expected success message in output, got: %s", output)
|
|
}
|
|
|
|
t.Logf("Successfully merged divergent replicas for volume %d using shell command", volumeID)
|
|
}
|