Files
seaweedFS/weed/shell/command_volume_server_evacuate.go
Chris Lu df4f2f7020 ec: add -diskType flag to EC commands for SSD support (#7607)
* ec: add diskType parameter to core EC functions

Add diskType parameter to:
- ecBalancer struct
- collectEcVolumeServersByDc()
- collectEcNodesForDC()
- collectEcNodes()
- EcBalance()

This allows EC operations to target specific disk types (hdd, ssd, etc.)
instead of being hardcoded to HardDriveType only.

For backward compatibility, all callers currently pass types.HardDriveType
as the default value. Subsequent commits will add -diskType flags to
the individual EC commands.

* ec: update helper functions to use configurable diskType

Update the following functions to accept/use diskType parameter:
- findEcVolumeShards()
- addEcVolumeShards()
- deleteEcVolumeShards()
- moveMountedShardToEcNode()
- countShardsByRack()
- pickNEcShardsToMoveFrom()

All ecBalancer methods now use ecb.diskType instead of hardcoded
types.HardDriveType. Non-ecBalancer callers (like volumeServer.evacuate
and ec.rebuild) use types.HardDriveType as the default.

Update all test files to pass diskType where needed.

* ec: add -diskType flag to ec.balance and ec.encode commands

Add -diskType flag to specify the target disk type for EC operations:
- ec.balance -diskType=ssd
- ec.encode -diskType=ssd

The disk type can be 'hdd', 'ssd', or empty for default (hdd).
This allows placing EC shards on SSD or other disk types instead of
only HDD.

Example usage:
  ec.balance -collection=mybucket -diskType=ssd -apply
  ec.encode -collection=mybucket -diskType=ssd -force

* test: add integration tests for EC disk type support

Add integration tests to verify the -diskType flag works correctly:
- TestECDiskTypeSupport: Tests EC encode and balance with SSD disk type
- TestECDiskTypeMixedCluster: Tests EC operations on a mixed HDD/SSD cluster

The tests verify:
- Volume servers can be configured with specific disk types
- ec.encode accepts -diskType flag and encodes to the correct disk type
- ec.balance accepts -diskType flag and balances on the correct disk type
- Mixed disk type clusters work correctly with separate collections

* ec: add -sourceDiskType to ec.encode and -diskType to ec.decode

ec.encode:
- Add -sourceDiskType flag to filter source volumes by disk type
- This enables tier migration scenarios (e.g., SSD volumes → HDD EC shards)
- -diskType specifies target disk type for EC shards

ec.decode:
- Add -diskType flag to specify source disk type where EC shards are stored
- Update collectEcShardIds() and collectEcNodeShardBits() to accept diskType

Examples:
  # Encode SSD volumes to HDD EC shards (tier migration)
  ec.encode -collection=mybucket -sourceDiskType=ssd -diskType=hdd

  # Decode EC shards from SSD
  ec.decode -collection=mybucket -diskType=ssd

Integration tests updated to cover new flags.

* ec: fix variable shadowing and add -diskType to ec.rebuild and volumeServer.evacuate

Address code review comments:

1. Fix variable shadowing in collectEcVolumeServersByDc():
   - Rename loop variable 'diskType' to 'diskTypeKey' and 'diskTypeStr'
     to avoid shadowing the function parameter

2. Fix hardcoded HardDriveType in ecBalancer methods:
   - balanceEcRack(): use ecb.diskType instead of types.HardDriveType
   - collectVolumeIdToEcNodes(): use ecb.diskType

3. Add -diskType flag to ec.rebuild command:
   - Add diskType field to ecRebuilder struct
   - Pass diskType to collectEcNodes() and addEcVolumeShards()

4. Add -diskType flag to volumeServer.evacuate command:
   - Add diskType field to commandVolumeServerEvacuate struct
   - Pass diskType to collectEcVolumeServersByDc() and moveMountedShardToEcNode()

* test: add diskType field to ecBalancer in TestPickEcNodeToBalanceShardsInto

Address nitpick comment: ensure test ecBalancer struct has diskType
field set for consistency with other tests.

* ec: filter disk selection by disk type in pickBestDiskOnNode

When evacuating or rebalancing EC shards, pickBestDiskOnNode now
filters disks by the target disk type. This ensures:

1. EC shards from SSD disks are moved to SSD disks on destination nodes
2. EC shards from HDD disks are moved to HDD disks on destination nodes
3. No cross-disk-type shard movement occurs

This maintains the storage tier isolation when moving EC shards
between nodes during evacuation or rebalancing operations.

* ec: allow disk type fallback during evacuation

Update pickBestDiskOnNode to accept a strictDiskType parameter:

- strictDiskType=true (balancing): Only use disks of matching type.
  This maintains storage tier isolation during normal rebalancing.

- strictDiskType=false (evacuation): Prefer same disk type, but
  fall back to other disk types if no matching disk is available.
  This ensures evacuation can complete even when same-type capacity
  is insufficient.

Priority order for evacuation:
1. Same disk type with lowest shard count (preferred)
2. Different disk type with lowest shard count (fallback)

* test: use defer for lock/unlock to prevent lock leaks

Use defer to ensure locks are always released, even on early returns
or test failures. This prevents lock leaks that could cause subsequent
tests to hang or fail.

Changes:
- Return early if lock acquisition fails
- Immediately defer unlock after successful lock
- Remove redundant explicit unlock calls at end of tests
- Fix unused variable warning (err -> encodeErr/locErr)

* ec: dynamically discover disk types from topology for evacuation

Disk types are free-form tags (e.g., 'ssd', 'nvme', 'archive') that come
from the topology, not a hardcoded set. Only 'hdd' (or empty) is the
default disk type.

Use collectVolumeDiskTypes() to discover all disk types present in the
cluster topology instead of hardcoding [HardDriveType, SsdType].

* test: add evacuation fallback and cross-rack EC placement tests

Add two new integration tests:

1. TestEvacuationFallbackBehavior:
   - Tests that when same disk type has no capacity, shards fall back
     to other disk types during evacuation
   - Creates cluster with 1 SSD + 2 HDD servers (limited SSD capacity)
   - Verifies pickBestDiskOnNode behavior with strictDiskType=false

2. TestCrossRackECPlacement:
   - Tests EC shard distribution across different racks
   - Creates cluster with 4 servers in 4 different racks
   - Verifies shards are spread across multiple racks
   - Tests that ec.balance respects rack placement

Helper functions added:
- startLimitedSsdCluster: 1 SSD + 2 HDD servers
- startMultiRackCluster: 4 servers in 4 racks
- countShardsPerRack: counts EC shards per rack from disk

* test: fix collection mismatch in TestCrossRackECPlacement

The EC commands were using collection 'rack_test' but uploaded test data
uses collection 'test' (default). This caused ec.encode/ec.balance to not
find the uploaded volume.

Fix: Change EC commands to use '-collection test' to match the uploaded data.

Addresses review comment from PR #7607.

* test: close log files in MultiDiskCluster.Stop() to prevent FD leaks

Track log files in MultiDiskCluster.logFiles and close them in Stop()
to prevent file descriptor accumulation in long-running or many-test
scenarios.

Addresses review comment about logging resources cleanup.

* test: improve EC integration tests with proper assertions

- Add assertNoFlagError helper to detect flag parsing regressions
- Update diskType subtests to fail on flag errors (ec.encode, ec.balance, ec.decode)
- Update verify_disktype_flag_parsing to check help output contains diskType
- Remove verify_fallback_disk_selection (was documentation-only, not executable)
- Add assertion to verify_cross_rack_distribution for minimum 2 racks
- Consolidate uploadTestDataWithDiskType to accept collection parameter
- Remove duplicate uploadTestDataWithDiskTypeMixed function

* test: extract captureCommandOutput helper and fix error handling

- Add captureCommandOutput helper to reduce code duplication in diskType tests
- Create commandRunner interface to match shell command Do method
- Update ec_encode_with_ssd_disktype, ec_balance_with_ssd_disktype,
  ec_encode_with_source_disktype, ec_decode_with_disktype to use helper
- Fix filepath.Glob error handling in countShardsPerRack instead of ignoring it

* test: add flag validation to ec_balance_targets_correct_disk_type

Add assertNoFlagError calls after ec.balance commands to ensure
-diskType flag is properly recognized for both SSD and HDD disk types.

* test: add proper assertions for EC command results

- ec_encode_with_ssd_disktype: check for expected volume-related errors
- ec_balance_with_ssd_disktype: require success with require.NoError
- ec_encode_with_source_disktype: check for expected no-volume errors
- ec_decode_with_disktype: check for expected no-ec-volume errors
- upload_to_ssd_and_hdd: use require.NoError for setup validation

Tests now properly fail on unexpected errors rather than just logging.

* test: fix missing unlock in ec_encode_with_disk_awareness

Add defer unlock pattern to ensure lock is always released, matching
the pattern used in other subtests.

* test: improve helper robustness

- Make assertNoFlagError case-insensitive for pattern matching
- Use defer in captureCommandOutput to restore stdout/stderr and close
  pipe ends to avoid FD leaks even if cmd.Do panics
2025-12-10 22:42:52 -08:00

323 lines
11 KiB
Go

package shell
import (
"flag"
"fmt"
"io"
"slices"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func init() {
Commands = append(Commands, &commandVolumeServerEvacuate{})
}
type commandVolumeServerEvacuate struct {
topologyInfo *master_pb.TopologyInfo
targetServer *string
volumeRack *string
}
func (c *commandVolumeServerEvacuate) Name() string {
return "volumeServer.evacuate"
}
func (c *commandVolumeServerEvacuate) Help() string {
return `move out all data on a volume server
volumeServer.evacuate -node <host:port>
This command moves all data away from the volume server.
The volumes on the volume servers will be redistributed.
Usually this is used to prepare to shutdown or upgrade the volume server.
Sometimes a volume can not be moved because there are no
good destination to meet the replication requirement.
E.g. a volume replication 001 in a cluster with 2 volume servers can not be moved.
You can use "-skipNonMoveable" to move the rest volumes.
`
}
func (c *commandVolumeServerEvacuate) HasTag(CommandTag) bool {
return false
}
func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeServer := vsEvacuateCommand.String("node", "", "<host>:<port> of the volume server")
c.volumeRack = vsEvacuateCommand.String("rack", "", "source rack for the volume servers")
c.targetServer = vsEvacuateCommand.String("target", "", "<host>:<port> of target volume")
skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved")
applyChange := vsEvacuateCommand.Bool("apply", false, "actually apply the changes")
// TODO: remove this alias
applyChangeAlias := vsEvacuateCommand.Bool("force", false, "actually apply the changes (alias for -apply)")
retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry")
if err = vsEvacuateCommand.Parse(args); err != nil {
return nil
}
handleDeprecatedForceFlag(writer, vsEvacuateCommand, applyChangeAlias, applyChange)
infoAboutSimulationMode(writer, *applyChange, "-apply")
if err = commandEnv.confirmIsLocked(args); err != nil && *applyChange {
return
}
if *volumeServer == "" && *c.volumeRack == "" {
return fmt.Errorf("need to specify volume server by -node=<host>:<port> or source rack")
}
for i := 0; i < *retryCount+1; i++ {
if err = c.volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil {
return nil
}
}
return
}
func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) (err error) {
// 1. confirm the volume server is part of the cluster
// 2. collect all other volume servers, sort by empty slots
// 3. move to any other volume server as long as it satisfy the replication requirements
// list all the volumes
// collect topology information
c.topologyInfo, _, err = collectTopologyInfo(commandEnv, 0)
if err != nil {
return err
}
defer func() {
c.topologyInfo = nil
}()
if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
return err
}
if err := c.evacuateEcVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
return err
}
return nil
}
func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this volume server
volumeServers := collectVolumeServersByDcRackNode(c.topologyInfo, "", "", "")
thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer)
if len(thisNodes) == 0 {
return fmt.Errorf("%s is not found in this cluster", volumeServer)
}
// move away normal volumes
for _, thisNode := range thisNodes {
for _, diskInfo := range thisNode.info.DiskInfos {
if applyChange {
if topologyInfo, _, err := collectTopologyInfo(commandEnv, 0); err != nil {
fmt.Fprintf(writer, "update topologyInfo %v", err)
} else {
_, otherNodesNew := c.nodesOtherThan(
collectVolumeServersByDcRackNode(topologyInfo, "", "", ""), volumeServer)
if len(otherNodesNew) > 0 {
otherNodes = otherNodesNew
c.topologyInfo = topologyInfo
fmt.Fprintf(writer, "topologyInfo updated %v\n", len(otherNodes))
}
}
}
volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo)
for _, vol := range diskInfo.VolumeInfos {
hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
if err != nil {
fmt.Fprintf(writer, "move away volume %d from %s: %v\n", vol.Id, volumeServer, err)
}
if !hasMoved {
if skipNonMoveable {
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement))
fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String())
} else {
return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer)
}
}
}
}
}
return nil
}
func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// Evacuate EC volumes for all disk types discovered from topology
// Disk types are free-form tags (e.g., "", "hdd", "ssd", "nvme", etc.)
// We need to handle each disk type separately because shards should be moved to nodes with the same disk type
// We collect topology once at the start and track capacity changes ourselves
// (via freeEcSlot decrement after each move) rather than repeatedly refreshing,
// which would give a false sense of correctness since topology could be stale.
diskTypes := collectVolumeDiskTypes(c.topologyInfo)
for _, diskType := range diskTypes {
ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "", diskType)
thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer)
if len(thisNodes) == 0 {
// This server doesn't have EC shards for this disk type, skip
continue
}
// move away ec volumes for this disk type
for _, thisNode := range thisNodes {
diskInfo, found := thisNode.info.DiskInfos[string(diskType)]
if !found {
continue
}
for _, ecShardInfo := range diskInfo.EcShardInfos {
hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange, diskType, writer)
if err != nil {
fmt.Fprintf(writer, "move away volume %d from %s: %v\n", ecShardInfo.Id, volumeServer, err)
}
if !hasMoved {
if skipNonMoveable {
fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer)
} else {
return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer)
}
}
}
}
}
return nil
}
func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool, diskType types.DiskType, writer io.Writer) (hasMoved bool, err error) {
for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
// Sort by: 1) fewest shards of this volume, 2) most free EC slots
// This ensures we prefer nodes with capacity and balanced shard distribution
slices.SortFunc(otherNodes, func(a, b *EcNode) int {
aShards := a.localShardIdCount(ecShardInfo.Id)
bShards := b.localShardIdCount(ecShardInfo.Id)
if aShards != bShards {
return aShards - bShards // Prefer fewer shards
}
return b.freeEcSlot - a.freeEcSlot // Then prefer more free slots
})
shardMoved := false
skippedNodes := 0
for i := 0; i < len(otherNodes); i++ {
emptyNode := otherNodes[i]
// Skip nodes with no free EC slots
if emptyNode.freeEcSlot <= 0 {
skippedNodes++
continue
}
collectionPrefix := ""
if ecShardInfo.Collection != "" {
collectionPrefix = ecShardInfo.Collection + "_"
}
vid := needle.VolumeId(ecShardInfo.Id)
// For evacuation, prefer same disk type but allow fallback to other types
destDiskId := pickBestDiskOnNode(emptyNode, vid, diskType, false)
if destDiskId > 0 {
fmt.Fprintf(writer, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId)
} else {
fmt.Fprintf(writer, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
}
err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange, diskType)
if err != nil {
hasMoved = false
return
} else {
hasMoved = true
shardMoved = true
// Update the node's free slot count after successful move
emptyNode.freeEcSlot--
break
}
}
if !shardMoved {
if skippedNodes > 0 {
fmt.Fprintf(writer, "no available destination for ec shard %d.%d: %d nodes have no free slots\n",
ecShardInfo.Id, shardId, skippedNodes)
}
// Ensure partial moves are reported as failures to prevent data loss
hasMoved = false
return
}
}
return
}
func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) {
freeVolumeCountfn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType))
maxVolumeCountFn := capacityByMaxVolumeCount(types.ToDiskType(vol.DiskType))
for _, n := range otherNodes {
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
return v.DiskType == vol.DiskType
})
}
// most empty one is in the front
slices.SortFunc(otherNodes, func(a, b *Node) int {
return int(a.localVolumeRatio(maxVolumeCountFn) - b.localVolumeRatio(maxVolumeCountFn))
})
for i := 0; i < len(otherNodes); i++ {
emptyNode := otherNodes[i]
if freeVolumeCountfn(emptyNode.info) <= 0 {
continue
}
hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange)
if err != nil {
return
}
if hasMoved {
break
}
}
return
}
func (c *commandVolumeServerEvacuate) nodesOtherThan(volumeServers []*Node, thisServer string) (thisNodes []*Node, otherNodes []*Node) {
for _, node := range volumeServers {
if node.info.Id == thisServer || (*c.volumeRack != "" && node.rack == *c.volumeRack) {
thisNodes = append(thisNodes, node)
continue
}
if *c.volumeRack != "" && *c.volumeRack == node.rack {
continue
}
if *c.targetServer != "" && *c.targetServer != node.info.Id {
continue
}
otherNodes = append(otherNodes, node)
}
return
}
func (c *commandVolumeServerEvacuate) ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNodes []*EcNode, otherNodes []*EcNode) {
for _, node := range volumeServers {
if node.info.Id == thisServer || (*c.volumeRack != "" && string(node.rack) == *c.volumeRack) {
thisNodes = append(thisNodes, node)
continue
}
if *c.volumeRack != "" && *c.volumeRack == string(node.rack) {
continue
}
if *c.targetServer != "" && *c.targetServer != node.info.Id {
continue
}
otherNodes = append(otherNodes, node)
}
return
}