Files
seaweedFS/weed/shell/command_volume_merge.go
Chris Lu b565a0cc86 Adds volume.merge command with deduplication and disk-based backend (#8441)
* Enhance volume.merge command with deduplication and disk-based backend

* Fix copyVolume function call with correct argument order and missing bool parameter

* Revert "Fix copyVolume function call with correct argument order and missing bool parameter"

This reverts commit 7b4a190643576fec11f896b26bcad03dd02da2f7.

* Fix critical issues: per-replica writable tracking, tail goroutine cancellation via done channel, and debug logging for allocation failures

* Optimize memory usage with watermark approach for duplicate detection

* Fix critical issues: swap copyVolume arguments, increase idle timeout, remove file double-close, use glog for logging

* Replace temporary file with in-memory buffer for needle blob serialization

* test(volume.merge): Add comprehensive unit and integration tests

Add 7 unit tests covering:
- Ordering by timestamp
- Cross-stream duplicate deduplication
- Empty stream handling
- Complex multi-stream deduplication
- Single stream passthrough
- Large needle ID support
- LastModified fallback when timestamp unavailable

Add 2 integration validation tests:
- TestMergeWorkflowValidation: Documents 9-stage merge workflow
- TestMergeEdgeCaseHandling: Validates 10 edge case handling

All tests passing (9/9)

* fix(volume.merge): Use time window for deduplication to handle clock skew

The same needle ID can have different timestamps on different servers due to
clock skew and replication lag. Needles with the same ID within a 5-second
time window are now treated as duplicates (same write with timestamp variance).

Key changes:
- Add mergeDeduplicationWindowNs constant (5 seconds)
- Replace exact timestamp matching with time window comparison
- Use windowInitialized flag to properly detect window transitions
- Add TestMergeNeedleStreamsTimeWindowDeduplication test

This ensures that replicated writes with slight timestamp differences are
properly deduplicated during merge, while separate updates to the same file
ID (outside the window) are preserved.

All tests passing (10/10)

* test: Add volume.merge integration tests with 5 comprehensive test cases

* test: integration tests for volume.merge command

* Fix integration tests: use TripleVolumeCluster for volume.merge testing

- Created new TripleVolumeCluster framework (cluster_triple.go) with 3 volume servers
- Rebuilt weed binary with volume.merge command compiled in
- Updated all 5 integration tests to use TripleVolumeCluster instead of DualVolumeCluster
- Tests now properly allocate volumes on 2 servers and let merge allocate on 3rd
- All 5 integration tests now pass:
  - TestVolumeMergeBasic
  - TestVolumeMergeReadonly
  - TestVolumeMergeRestore
  - TestVolumeMergeTailNeedles
  - TestVolumeMergeDivergentReplicas

* Refactor test framework: use parameterized server count instead of hardcoded

- Renamed TripleVolumeCluster to MultiVolumeCluster with serverCount parameter
- Replaced hardcoded volumePort0/1/2 with slices for flexible server count
- Updated StartTripleVolumeCluster as backward-compatible wrapper calling StartMultiVolumeCluster(t, profile, 3)
- Made directory creation, port allocation, and server startup loop-based
- Updated accessor methods (VolumeAdminAddress, VolumeGRPCAddress, etc.) to support any server count
- All 5 integration tests continue to pass with new parameterized cluster framework
- Enables future testing with 2, 4, 5+ volume servers by calling StartMultiVolumeCluster directly

* Consolidate cluster frameworks: StartDualVolumeCluster now uses MultiVolumeCluster

- Made DualVolumeCluster a type alias for MultiVolumeCluster
- Updated StartDualVolumeCluster to call StartMultiVolumeCluster(t, profile, 2)
- Removed duplicate code from cluster_dual.go (now just 17 lines)
- All existing tests using StartDualVolumeCluster continue to work without changes
- Backward compatible: existing code continues to use the old function signatures
- Added wrapper functions in cluster_multi.go for StartTripleVolumeCluster
- Enables unified cluster management across all test suites

* Address PR review comments: improve error handling and clean up code

- Replace parse error swallow with proper error return
- Log cleanup and restoration errors instead of silently discarding them
- Remove unused offset field from memoryBackendFile struct
- Fix WriteAt buffer truncation bug to preserve trailing bytes
- All unit tests passing (10/10)
- Code compiles successfully

* Fix PR review findings: test improvements and code quality

- Add timeout to runWeedShell to prevent hanging
- Add server 1 readonly status verification in tests
- Assert merge fails when replicas writable (not just log output)
- Replace sleep with polling for writable restoration check
- Fix WriteAt stale data snapshot bug in memoryBackendFile
- Fix startVolume error logging to show current server log
- Fix volumePubPorts double assignment in port allocation
- Rename test to reflect behavior: DoesNotDeduplicateAcrossWindows
- Fix misleading dedup window comment

Unit tests: 10/10 passing
Binary: Compiles successfully

* Fix test assumption: merge command marks volumes readonly automatically

TestVolumeMergeReadonly was expecting merge to fail on writable volumes, but the
merge command is designed to mark volumes readonly as part of its operation. Fixed
test to verify merge succeeds on writable volumes and properly restores writable
state afterward. Removed redundant Test 2 code that duplicated the new behavior.

* fmt

* Fix deduplication logic to correctly handle same-stream vs cross-stream duplicates

The dedup map previously used only NeedleId as key, causing same-stream
overwrites to be incorrectly skipped as duplicates. Changed to track which
stream first processed each needle ID in the current window:

- Cross-stream duplicates (same ID from different streams, within window) are skipped
- Same-stream duplicates (overwrites from same stream) are kept
- Map now stores: needleId -> streamIndex of first occurrence in window

Added TestMergeNeedleStreamsSameStreamDuplicates to verify same-stream
overwrites are preserved while cross-stream duplicates are skipped.

All unit tests passing (11/11)
Binary compiles successfully
2026-02-25 10:12:09 -08:00

529 lines
16 KiB
Go

package shell
import (
"bytes"
"container/heap"
"context"
"flag"
"fmt"
"io"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"google.golang.org/grpc"
)
// mergeIdleTimeoutSeconds is the timeout for idle streams during needle tailing.
// This ensures that slow or stalled streams don't block the merge indefinitely.
// Set to 5 seconds to handle network congestion and avoid premature stream termination.
// Can be made configurable in the future if needed for different deployment scenarios.
const mergeIdleTimeoutSeconds = 5
// mergeDeduplicationWindowNs defines the time window for deduplication across replicas.
// Since the same needle ID can have different timestamps on different servers due to
// clock skew and replication lag, we deduplicate needles with the same ID within this window.
// Set to 5 seconds in nanoseconds to handle typical server clock differences.
const mergeDeduplicationWindowNs = 5 * time.Second
func init() {
Commands = append(Commands, &commandVolumeMerge{})
}
type commandVolumeMerge struct{}
func (c *commandVolumeMerge) Name() string {
return "volume.merge"
}
func (c *commandVolumeMerge) Help() string {
return `merge replicas for a volume id in timestamp order into a fresh copy
volume.merge -volumeId <volume id>
This command:
1) marks the volume readonly on replicas (if not already)
2) allocates a temporary copy on a third location
3) merges replicas in append timestamp order, skipping duplicates
4) replaces the original replicas with the merged volume
5) restores writable state if it was writable before
`
}
func (c *commandVolumeMerge) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeMerge) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
mergeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeIdInt := mergeCommand.Int("volumeId", 0, "the volume id")
targetNodeStr := mergeCommand.String("target", "", "optional target volume server <host>:<port> for temporary merge output")
noLock := mergeCommand.Bool("noLock", false, "do not lock the admin shell at one's own risk")
if err = mergeCommand.Parse(args); err != nil {
return err
}
if *volumeIdInt == 0 {
return fmt.Errorf("volumeId is required")
}
if *noLock {
commandEnv.noLock = true
} else if err = commandEnv.confirmIsLocked(args); err != nil {
return err
}
volumeId := needle.VolumeId(*volumeIdInt)
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return err
}
volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
replicas := volumeReplicas[uint32(volumeId)]
if len(replicas) < 2 {
return fmt.Errorf("volume %d has %d replica(s); merge requires at least two", volumeId, len(replicas))
}
volumeInfo := replicas[0].info
replicaPlacement, err := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
if err != nil {
return fmt.Errorf("parse replica placement for volume %d: %w", volumeId, err)
}
var targetServer pb.ServerAddress
if *targetNodeStr != "" {
targetServer = pb.ServerAddress(*targetNodeStr)
if isReplicaServer(targetServer, replicas) {
return fmt.Errorf("target %s already hosts volume %d", *targetNodeStr, volumeId)
}
if err = allocateMergeVolume(commandEnv.option.GrpcDialOption, targetServer, volumeInfo, replicaPlacement); err != nil {
return err
}
} else {
targetServer, err = allocateMergeVolumeOnThirdLocation(commandEnv.option.GrpcDialOption, allLocations, replicas, volumeInfo, replicaPlacement)
if err != nil {
return err
}
}
cleanupTarget := true
defer func() {
if !cleanupTarget {
return
}
if delErr := deleteVolume(commandEnv.option.GrpcDialOption, volumeId, targetServer, false); delErr != nil {
glog.Warningf("failed to clean up temporary merge volume %d on %s: %v", volumeId, targetServer, delErr)
}
}()
writableReplicaIndices, err := ensureVolumeReadonly(commandEnv, replicas)
if err != nil {
return err
}
if len(writableReplicaIndices) > 0 {
defer func() {
// Only restore writable state for replicas that were originally writable
writableReplicas := make([]*VolumeReplica, 0, len(writableReplicaIndices))
for _, idx := range writableReplicaIndices {
writableReplicas = append(writableReplicas, replicas[idx])
}
if restoreErr := markReplicasWritable(commandEnv.option.GrpcDialOption, writableReplicas, true, false); restoreErr != nil {
glog.Warningf("failed to restore writable state for volume %d: %v", volumeId, restoreErr)
}
}()
}
done := make(chan struct{})
defer close(done)
sources := make([]needleStream, 0, len(replicas))
for _, replica := range replicas {
server := pb.NewServerAddressFromDataNode(replica.location.dataNode)
sources = append(sources, startTailNeedleStream(commandEnv.option.GrpcDialOption, volumeId, server, done))
}
mergeErr := operation.WithVolumeServerClient(false, targetServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
version := needle.Version(volumeInfo.Version)
if version == 0 {
version = needle.GetCurrentVersion()
}
return mergeNeedleStreams(sources, func(streamIndex int, n *needle.Needle) error {
blob, size, err := needleBlobFromNeedle(n, version)
if err != nil {
return err
}
_, err = client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{
VolumeId: uint32(volumeId),
NeedleId: uint64(n.Id),
Size: int32(size),
NeedleBlob: blob,
})
return err
})
})
if mergeErr != nil {
return mergeErr
}
for _, replica := range replicas {
sourceServer := pb.NewServerAddressFromDataNode(replica.location.dataNode)
if _, err = copyVolume(commandEnv.option.GrpcDialOption, writer, volumeId, targetServer, sourceServer, "", 0, false); err != nil {
return err
}
}
if err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, targetServer, false); err != nil {
return err
}
cleanupTarget = false
fmt.Fprintf(writer, "merged volume %d from %d replicas via %s\n", volumeId, len(replicas), targetServer)
return nil
}
type needleStream interface {
Next() (*needle.Needle, bool)
Err() error
}
type tailNeedleStream struct {
ch <-chan *needle.Needle
errMu sync.Mutex
err error
}
func (s *tailNeedleStream) Next() (*needle.Needle, bool) {
n, ok := <-s.ch
return n, ok
}
func (s *tailNeedleStream) Err() error {
s.errMu.Lock()
defer s.errMu.Unlock()
return s.err
}
func (s *tailNeedleStream) setErr(err error) {
s.errMu.Lock()
s.err = err
s.errMu.Unlock()
}
func startTailNeedleStream(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, server pb.ServerAddress, done <-chan struct{}) *tailNeedleStream {
ch := make(chan *needle.Needle, 32)
stream := &tailNeedleStream{ch: ch}
go func() {
err := operation.TailVolumeFromSource(server, grpcDialOption, volumeId, 0, mergeIdleTimeoutSeconds, func(n *needle.Needle) error {
select {
case ch <- n:
case <-done:
return fmt.Errorf("merge cancelled")
}
return nil
})
close(ch)
stream.setErr(err)
}()
return stream
}
type needleMergeItem struct {
streamIndex int
needle *needle.Needle
timestamp uint64
}
type needleMergeHeap []needleMergeItem
func (h needleMergeHeap) Len() int { return len(h) }
func (h needleMergeHeap) Less(i, j int) bool {
if h[i].timestamp == h[j].timestamp {
return h[i].needle.Id < h[j].needle.Id
}
return h[i].timestamp < h[j].timestamp
}
func (h needleMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *needleMergeHeap) Push(x any) {
*h = append(*h, x.(needleMergeItem))
}
func (h *needleMergeHeap) Pop() any {
old := *h
n := len(old)
item := old[n-1]
*h = old[:n-1]
return item
}
func mergeNeedleStreams(streams []needleStream, consume func(int, *needle.Needle) error) error {
h := &needleMergeHeap{}
heap.Init(h)
for i, stream := range streams {
if n, ok := stream.Next(); ok {
heap.Push(h, needleMergeItem{streamIndex: i, needle: n, timestamp: needleTimestamp(n)})
}
}
// Track seen needle IDs (by stream) within a time window to skip cross-stream duplicates only.
// Needles with the same ID from different streams within mergeDeduplicationWindowNs are considered
// cross-stream duplicates and skipped. Same-stream duplicates (overwrites) are kept.
// Map: needleId -> streamIndex that first processed it in this window
seenAtTimestamp := make(map[types.NeedleId]int)
var windowStartTimestamp uint64
windowInitialized := false
for h.Len() > 0 {
item := heap.Pop(h).(needleMergeItem)
ts := item.timestamp
n := item.needle
// Initialize window on first timestamp, or move to new window when outside current window
if !windowInitialized {
windowStartTimestamp = ts
windowInitialized = true
} else if ts > windowStartTimestamp+uint64(mergeDeduplicationWindowNs) {
// Moving to a new window: clear the watermark to reduce memory usage.
// This is safe because we only skip duplicates within the same time window.
seenAtTimestamp = make(map[types.NeedleId]int)
windowStartTimestamp = ts
}
// Skip cross-stream duplicates: if we've already seen this needle ID from a DIFFERENT stream
// within this time window, skip it. Same-stream duplicates (overwrites) are kept.
if seenStreamIdx, exists := seenAtTimestamp[n.Id]; exists && seenStreamIdx != item.streamIndex {
// Cross-stream duplicate from different stream - skip this occurrence
if nextN, ok := streams[item.streamIndex].Next(); ok {
heap.Push(h, needleMergeItem{streamIndex: item.streamIndex, needle: nextN, timestamp: needleTimestamp(nextN)})
}
continue
}
// Record this stream's occurrence of this needle ID in this window
// (overwrite if from same stream, since we process in timestamp order)
seenAtTimestamp[n.Id] = item.streamIndex
if err := consume(item.streamIndex, n); err != nil {
return err
}
if nextN, ok := streams[item.streamIndex].Next(); ok {
heap.Push(h, needleMergeItem{streamIndex: item.streamIndex, needle: nextN, timestamp: needleTimestamp(nextN)})
}
}
for _, stream := range streams {
if err := stream.Err(); err != nil {
return err
}
}
return nil
}
func needleTimestamp(n *needle.Needle) uint64 {
if n.AppendAtNs != 0 {
return n.AppendAtNs
}
if n.LastModified != 0 {
return uint64(time.Unix(int64(n.LastModified), 0).UnixNano())
}
return 0
}
// memoryBackendFile implements backend.BackendStorageFile using an in-memory buffer
type memoryBackendFile struct {
buf *bytes.Buffer
}
func (m *memoryBackendFile) ReadAt(p []byte, off int64) (n int, err error) {
data := m.buf.Bytes()
if off >= int64(len(data)) {
return 0, io.EOF
}
n = copy(p, data[off:])
if off+int64(n) < int64(len(data)) {
return n, nil
}
return n, io.EOF
}
func (m *memoryBackendFile) WriteAt(p []byte, off int64) (n int, err error) {
data := m.buf.Bytes()
if off > int64(len(data)) {
// Pad with zeros
m.buf.Write(make([]byte, off-int64(len(data))))
// Refresh data snapshot after padding to see the padded length
data = m.buf.Bytes()
}
if off == int64(len(data)) {
return m.buf.Write(p)
}
// Overwrite existing data: preserve any trailing bytes beyond the write range
newLen := off + int64(len(p))
if newLen < int64(len(data)) {
newLen = int64(len(data))
}
newData := make([]byte, newLen)
copy(newData, data)
copy(newData[off:], p)
m.buf = bytes.NewBuffer(newData)
return len(p), nil
}
func (m *memoryBackendFile) Truncate(off int64) error {
data := m.buf.Bytes()
if off > int64(len(data)) {
m.buf.Write(make([]byte, off-int64(len(data))))
} else {
m.buf = bytes.NewBuffer(data[:off])
}
return nil
}
func (m *memoryBackendFile) Close() error {
return nil
}
func (m *memoryBackendFile) GetStat() (datSize int64, modTime time.Time, err error) {
return int64(m.buf.Len()), time.Now(), nil
}
func (m *memoryBackendFile) Name() string {
return "memory"
}
func (m *memoryBackendFile) Sync() error {
return nil
}
func newMemoryBackendFile() *memoryBackendFile {
return &memoryBackendFile{
buf: &bytes.Buffer{},
}
}
func needleBlobFromNeedle(n *needle.Needle, version needle.Version) ([]byte, types.Size, error) {
// Use in-memory buffer for serialization to avoid expensive temporary file I/O
memFile := newMemoryBackendFile()
defer memFile.Close()
_, size, actualSize, err := n.Append(memFile, version)
if err != nil {
return nil, 0, err
}
buf := make([]byte, actualSize)
read, err := memFile.ReadAt(buf, 0)
if err != nil && err != io.EOF {
return nil, 0, err
}
return buf[:read], size, nil
}
func allocateMergeVolumeOnThirdLocation(grpcDialOption grpc.DialOption, allLocations []location, replicas []*VolumeReplica, info *master_pb.VolumeInformationMessage, replicaPlacement *super_block.ReplicaPlacement) (pb.ServerAddress, error) {
replicaNodes := map[string]struct{}{}
for _, replica := range replicas {
replicaNodes[replica.location.dataNode.Id] = struct{}{}
}
for _, loc := range allLocations {
if _, exists := replicaNodes[loc.dataNode.Id]; exists {
continue
}
if !locationHasDiskType(loc, info.DiskType) {
continue
}
server := pb.NewServerAddressFromDataNode(loc.dataNode)
if err := allocateMergeVolume(grpcDialOption, server, info, replicaPlacement); err != nil {
glog.V(1).Infof("failed to allocate merge volume on %s with replication %s: %v", server, replicaPlacement.String(), err)
continue
}
return server, nil
}
return "", fmt.Errorf("no third location available to merge volume %d", info.Id)
}
func allocateMergeVolume(grpcDialOption grpc.DialOption, server pb.ServerAddress, info *master_pb.VolumeInformationMessage, replicaPlacement *super_block.ReplicaPlacement) error {
return operation.WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{
VolumeId: info.Id,
Collection: info.Collection,
Preallocate: 0,
Replication: replicaPlacement.String(),
Ttl: needle.LoadTTLFromUint32(info.Ttl).String(),
DiskType: info.DiskType,
Version: info.Version,
})
return err
})
}
// ensureVolumeReadonly marks all replicas as readonly and returns the indices of replicas that were writable
func ensureVolumeReadonly(commandEnv *CommandEnv, replicas []*VolumeReplica) ([]int, error) {
var writableReplicaIndices []int
for i, replica := range replicas {
server := pb.NewServerAddressFromDataNode(replica.location.dataNode)
err := operation.WithVolumeServerClient(false, server, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{VolumeId: replica.info.Id})
if err != nil {
return err
}
if !resp.IsReadOnly {
writableReplicaIndices = append(writableReplicaIndices, i)
}
return nil
})
if err != nil {
return nil, err
}
}
if len(writableReplicaIndices) > 0 {
if err := markReplicasWritable(commandEnv.option.GrpcDialOption, replicas, false, false); err != nil {
return nil, err
}
}
return writableReplicaIndices, nil
}
func isReplicaServer(target pb.ServerAddress, replicas []*VolumeReplica) bool {
for _, replica := range replicas {
if pb.NewServerAddressFromDataNode(replica.location.dataNode) == target {
return true
}
}
return false
}
func locationHasDiskType(loc location, diskType string) bool {
for _, diskInfo := range loc.dataNode.DiskInfos {
if diskInfo.Type == diskType {
return true
}
}
return false
}
func markReplicasWritable(grpcDialOption grpc.DialOption, replicas []*VolumeReplica, writable bool, persist bool) error {
for _, replica := range replicas {
server := pb.NewServerAddressFromDataNode(replica.location.dataNode)
err := operation.WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
if writable {
_, err := client.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{VolumeId: replica.info.Id})
return err
}
_, err := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{VolumeId: replica.info.Id, Persist: persist})
return err
})
if err != nil {
return err
}
}
return nil
}