Files
seaweedFS/weed/shell/ec_proportional_rebalance_test.go
Chris Lu 4aa50bfa6a fix: EC rebalance fails with replica placement 000 (#7812)
* fix: EC rebalance fails with replica placement 000

This PR fixes several issues with EC shard distribution:

1. Pre-flight check before EC encoding
   - Verify target disk type has capacity before encoding starts
   - Prevents encoding shards only to fail during rebalance
   - Shows helpful error when wrong diskType is specified (e.g., ssd when volumes are on hdd)

2. Fix EC rebalance with replica placement 000
   - When DiffRackCount=0, shards should be distributed freely across racks
   - The '000' placement means 'no volume replication needed' because EC provides redundancy
   - Previously all racks were skipped with error 'shards X > replica placement limit (0)'

3. Add unit tests for EC rebalance slot calculation
   - TestECRebalanceWithLimitedSlots: documents the limited slots scenario
   - TestECRebalanceZeroFreeSlots: reproduces the 0 free slots error

4. Add Makefile for manual EC testing
   - make setup: start cluster and populate data
   - make shell: open weed shell for EC commands
   - make clean: stop cluster and cleanup

* fix: default -rebalance to true for ec.encode

The -rebalance flag was defaulting to false, which meant ec.encode would
only print shard moves but not actually execute them. This is a poor
default since the whole point of EC encoding is to distribute shards
across servers for fault tolerance.

Now -rebalance defaults to true, so shards are actually distributed
after encoding. Users can use -rebalance=false if they only want to
see what would happen without making changes.

* test/erasure_coding: improve Makefile safety and docs

- Narrow pkill pattern for volume servers to use TEST_DIR instead of
  port pattern, avoiding accidental kills of unrelated SeaweedFS processes
- Document external dependencies (curl, jq) in header comments

* shell: refactor buildRackWithEcShards to reuse buildEcShards

Extract common shard bit construction logic to avoid duplication
between buildEcShards and buildRackWithEcShards helper functions.

* shell: update test for EC replication 000 behavior

When DiffRackCount=0 (replication "000"), EC shards should be
distributed freely across racks since erasure coding provides its
own redundancy. Update test expectation to reflect this behavior.

* erasure_coding: add distribution package for proportional EC shard placement

Add a new reusable package for EC shard distribution that:
- Supports configurable EC ratios (not hard-coded 10+4)
- Distributes shards proportionally based on replication policy
- Provides fault tolerance analysis
- Prefers moving parity shards to keep data shards spread out

Key components:
- ECConfig: Configurable data/parity shard counts
- ReplicationConfig: Parsed XYZ replication policy
- ECDistribution: Target shard counts per DC/rack/node
- Rebalancer: Plans shard moves with parity-first strategy

This enables seaweed-enterprise custom EC ratios and weed worker
integration while maintaining a clean, testable architecture.

* shell: integrate distribution package for EC rebalancing

Add shell wrappers around the distribution package:
- ProportionalECRebalancer: Plans moves using distribution.Rebalancer
- NewProportionalECRebalancerWithConfig: Supports custom EC configs
- GetDistributionSummary/GetFaultToleranceAnalysis: Helper functions

The shell layer converts between EcNode types and the generic
TopologyNode types used by the distribution package.

* test setup

* ec: improve data and parity shard distribution across racks

- Add shardsByTypePerRack helper to track data vs parity shards
- Rewrite doBalanceEcShardsAcrossRacks for two-pass balancing:
  1. Balance data shards (0-9) evenly, max ceil(10/6)=2 per rack
  2. Balance parity shards (10-13) evenly, max ceil(4/6)=1 per rack
- Add balanceShardTypeAcrossRacks for generic shard type balancing
- Add pickRackForShardType to select destination with room for type
- Add unit tests for even data/parity distribution verification

This ensures even read load during normal operation by spreading
both data and parity shards across all available racks.

* ec: make data/parity shard counts configurable in ecBalancer

- Add dataShardCount and parityShardCount fields to ecBalancer struct
- Add getDataShardCount() and getParityShardCount() methods with defaults
- Replace direct constant usage with configurable methods
- Fix unused variable warning for parityPerRack

This allows seaweed-enterprise to use custom EC ratios while
defaulting to standard 10+4 scheme.

* Address PR 7812 review comments

Makefile improvements:
- Save PIDs for each volume server for precise termination
- Use PID-based killing in stop target with pkill fallback
- Use more specific pkill patterns with TEST_DIR paths

Documentation:
- Document jq dependency in README.md

Rebalancer fix:
- Fix duplicate shard count updates in applyMovesToAnalysis
- All planners (DC/rack/node) update counts inline during planning
- Remove duplicate updates from applyMovesToAnalysis to avoid double-counting

* test/erasure_coding: use mktemp for test file template

Use mktemp instead of hardcoded /tmp/testfile_template.bin path
to provide better isolation for concurrent test runs.
2025-12-19 13:29:12 -08:00

252 lines
6.2 KiB
Go

package shell
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/distribution"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
func TestCalculateECDistributionShell(t *testing.T) {
// Test the shell wrapper function
rp, _ := super_block.NewReplicaPlacementFromString("110")
dist := CalculateECDistribution(
erasure_coding.TotalShardsCount,
erasure_coding.ParityShardsCount,
rp,
)
if dist.ReplicationConfig.MinDataCenters != 2 {
t.Errorf("Expected 2 DCs, got %d", dist.ReplicationConfig.MinDataCenters)
}
if dist.TargetShardsPerDC != 7 {
t.Errorf("Expected 7 shards per DC, got %d", dist.TargetShardsPerDC)
}
t.Log(dist.Summary())
}
func TestAnalyzeVolumeDistributionShell(t *testing.T) {
diskType := types.HardDriveType
diskTypeKey := string(diskType)
// Build a topology with unbalanced distribution
node1 := &EcNode{
info: &master_pb.DataNodeInfo{
Id: "127.0.0.1:8080",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {
Type: diskTypeKey,
MaxVolumeCount: 10,
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{
{
Id: 1,
Collection: "test",
EcIndexBits: 0x3FFF, // All 14 shards
},
},
},
},
},
dc: "dc1",
rack: "rack1",
freeEcSlot: 5,
}
node2 := &EcNode{
info: &master_pb.DataNodeInfo{
Id: "127.0.0.1:8081",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {
Type: diskTypeKey,
MaxVolumeCount: 10,
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{},
},
},
},
dc: "dc2",
rack: "rack2",
freeEcSlot: 10,
}
locations := []*EcNode{node1, node2}
volumeId := needle.VolumeId(1)
analysis := AnalyzeVolumeDistribution(volumeId, locations, diskType)
shardsByDC := analysis.GetShardsByDC()
if shardsByDC["dc1"] != 14 {
t.Errorf("Expected 14 shards in dc1, got %d", shardsByDC["dc1"])
}
t.Log(analysis.DetailedString())
}
func TestProportionalRebalancerShell(t *testing.T) {
diskType := types.HardDriveType
diskTypeKey := string(diskType)
// Build topology: 2 DCs, 2 racks each, all shards on one node
nodes := []*EcNode{
{
info: &master_pb.DataNodeInfo{
Id: "dc1-rack1-node1",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {
Type: diskTypeKey,
MaxVolumeCount: 10,
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{
{Id: 1, Collection: "test", EcIndexBits: 0x3FFF},
},
},
},
},
dc: "dc1", rack: "dc1-rack1", freeEcSlot: 0,
},
{
info: &master_pb.DataNodeInfo{
Id: "dc1-rack2-node1",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {Type: diskTypeKey, MaxVolumeCount: 10},
},
},
dc: "dc1", rack: "dc1-rack2", freeEcSlot: 10,
},
{
info: &master_pb.DataNodeInfo{
Id: "dc2-rack1-node1",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {Type: diskTypeKey, MaxVolumeCount: 10},
},
},
dc: "dc2", rack: "dc2-rack1", freeEcSlot: 10,
},
{
info: &master_pb.DataNodeInfo{
Id: "dc2-rack2-node1",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {Type: diskTypeKey, MaxVolumeCount: 10},
},
},
dc: "dc2", rack: "dc2-rack2", freeEcSlot: 10,
},
}
rp, _ := super_block.NewReplicaPlacementFromString("110")
rebalancer := NewProportionalECRebalancer(nodes, rp, diskType)
volumeId := needle.VolumeId(1)
moves, err := rebalancer.PlanMoves(volumeId, []*EcNode{nodes[0]})
if err != nil {
t.Fatalf("PlanMoves failed: %v", err)
}
t.Logf("Planned %d moves", len(moves))
for i, move := range moves {
t.Logf(" %d. %s", i+1, move.String())
}
// Verify moves to dc2
movedToDC2 := 0
for _, move := range moves {
if move.DestNode.dc == "dc2" {
movedToDC2++
}
}
if movedToDC2 == 0 {
t.Error("Expected some moves to dc2")
}
}
func TestCustomECConfigRebalancer(t *testing.T) {
diskType := types.HardDriveType
diskTypeKey := string(diskType)
// Test with custom 8+4 EC configuration
ecConfig, err := distribution.NewECConfig(8, 4)
if err != nil {
t.Fatalf("Failed to create EC config: %v", err)
}
// Build topology for 12 shards (8+4)
nodes := []*EcNode{
{
info: &master_pb.DataNodeInfo{
Id: "dc1-node1",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {
Type: diskTypeKey,
MaxVolumeCount: 10,
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{
{Id: 1, Collection: "test", EcIndexBits: 0x0FFF}, // 12 shards (bits 0-11)
},
},
},
},
dc: "dc1", rack: "dc1-rack1", freeEcSlot: 0,
},
{
info: &master_pb.DataNodeInfo{
Id: "dc2-node1",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {Type: diskTypeKey, MaxVolumeCount: 10},
},
},
dc: "dc2", rack: "dc2-rack1", freeEcSlot: 10,
},
{
info: &master_pb.DataNodeInfo{
Id: "dc3-node1",
DiskInfos: map[string]*master_pb.DiskInfo{
diskTypeKey: {Type: diskTypeKey, MaxVolumeCount: 10},
},
},
dc: "dc3", rack: "dc3-rack1", freeEcSlot: 10,
},
}
rp, _ := super_block.NewReplicaPlacementFromString("200") // 3 DCs
rebalancer := NewProportionalECRebalancerWithConfig(nodes, rp, diskType, ecConfig)
volumeId := needle.VolumeId(1)
moves, err := rebalancer.PlanMoves(volumeId, []*EcNode{nodes[0]})
if err != nil {
t.Fatalf("PlanMoves failed: %v", err)
}
t.Logf("Custom 8+4 EC with 200 replication: planned %d moves", len(moves))
// Get the distribution summary
summary := GetDistributionSummaryWithConfig(rp, ecConfig)
t.Log(summary)
analysis := GetFaultToleranceAnalysisWithConfig(rp, ecConfig)
t.Log(analysis)
}
func TestGetDistributionSummaryShell(t *testing.T) {
rp, _ := super_block.NewReplicaPlacementFromString("110")
summary := GetDistributionSummary(rp)
t.Log(summary)
if len(summary) == 0 {
t.Error("Summary should not be empty")
}
analysis := GetFaultToleranceAnalysis(rp)
t.Log(analysis)
if len(analysis) == 0 {
t.Error("Analysis should not be empty")
}
}