Add disk-aware EC rebalancing (#7597)

* Add placement package for EC shard placement logic

- Consolidate EC shard placement algorithm for reuse across shell and worker tasks
- Support multi-pass selection: racks, then servers, then disks
- Include proper spread verification and scoring functions
- Comprehensive test coverage for various cluster topologies

* Make ec.balance disk-aware for multi-disk servers

- Add EcDisk struct to track individual disks on volume servers
- Update EcNode to maintain per-disk shard distribution
- Parse disk_id from EC shard information during topology collection
- Implement pickBestDiskOnNode() for selecting best disk per shard
- Add diskDistributionScore() for tie-breaking node selection
- Update all move operations to specify target disk in RPC calls
- Improves shard balance within multi-disk servers, not just across servers

* Use placement package in EC detection for consistent disk-level placement

- Replace custom EC disk selection logic with shared placement package
- Convert topology DiskInfo to placement.DiskCandidate format
- Use SelectDestinations() for multi-rack/server/disk spreading
- Convert placement results back to topology DiskInfo for task creation
- Ensures EC detection uses same placement logic as shell commands

* Make volume server evacuation disk-aware

- Use pickBestDiskOnNode() when selecting evacuation target disk
- Specify target disk in evacuation RPC requests
- Maintains balanced disk distribution during server evacuations

* Rename PlacementConfig to PlacementRequest for clarity

PlacementRequest better reflects that this is a request for placement
rather than a configuration object. This improves API semantics.

* Rename DefaultConfig to DefaultPlacementRequest

Aligns with the PlacementRequest type naming for consistency

* Address review comments from Gemini and CodeRabbit

Fix HIGH issues:
- Fix empty disk discovery: Now discovers all disks from VolumeInfos,
  not just from EC shards. This ensures disks without EC shards are
  still considered for placement.
- Fix EC shard count calculation in detection.go: Now correctly filters
  by DiskId and sums actual shard counts using ShardBits.ShardIdCount()
  instead of just counting EcShardInfo entries.

Fix MEDIUM issues:
- Add disk ID to evacuation log messages for consistency with other logging
- Remove unused serverToDisks variable in placement.go
- Fix comment that incorrectly said 'ascending' when sorting is 'descending'

* add ec tests

* Update ec-integration-tests.yml

* Update ec_integration_test.go

* Fix EC integration tests CI: build weed binary and update actions

- Add 'Build weed binary' step before running tests
- Update actions/setup-go from v4 to v6 (Node20 compatibility)
- Update actions/checkout from v2 to v4 (Node20 compatibility)
- Move working-directory to test step only

* Add disk-aware EC rebalancing integration tests

- Add TestDiskAwareECRebalancing test with multi-disk cluster setup
- Test EC encode with disk awareness (shows disk ID in output)
- Test EC balance with disk-level shard distribution
- Add helper functions for disk-level verification:
  - startMultiDiskCluster: 3 servers x 4 disks each
  - countShardsPerDisk: track shards per disk per server
  - calculateDiskShardVariance: measure distribution balance
- Verify no single disk is overloaded with shards
This commit is contained in:
Chris Lu
2025-12-02 12:30:15 -08:00
committed by GitHub
parent ebb06a3908
commit 4f038820dc
7 changed files with 1680 additions and 80 deletions

View File

@@ -5,9 +5,11 @@ import (
"context"
"fmt"
"io"
"math"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
@@ -139,6 +141,15 @@ func TestECEncodingVolumeLocationTimingBug(t *testing.T) {
t.Logf("EC encoding completed successfully")
}
// Add detailed logging for EC encoding command
t.Logf("Debug: Executing EC encoding command for volume %d", volumeId)
t.Logf("Debug: Command arguments: %v", args)
if err != nil {
t.Logf("Debug: EC encoding command failed with error: %v", err)
} else {
t.Logf("Debug: EC encoding command completed successfully")
}
// The key test: check if the fix prevents the timing issue
if contains(outputStr, "Collecting volume locations") && contains(outputStr, "before EC encoding") {
t.Logf("FIX DETECTED: Volume locations collected BEFORE EC encoding (timing bug prevented)")
@@ -526,7 +537,8 @@ func uploadTestData(data []byte, masterAddress string) (needle.VolumeId, error)
func getVolumeLocations(commandEnv *shell.CommandEnv, volumeId needle.VolumeId) ([]string, error) {
// Retry mechanism to handle timing issues with volume registration
for i := 0; i < 10; i++ {
// Increase retry attempts for volume location retrieval
for i := 0; i < 20; i++ { // Increased from 10 to 20 retries
locations, ok := commandEnv.MasterClient.GetLocationsClone(uint32(volumeId))
if ok {
var result []string
@@ -646,3 +658,427 @@ func TestECEncodingRegressionPrevention(t *testing.T) {
t.Log("Timing pattern regression test passed")
})
}
// TestDiskAwareECRebalancing tests EC shard placement across multiple disks per server
// This verifies the disk-aware EC rebalancing feature works correctly
func TestDiskAwareECRebalancing(t *testing.T) {
if testing.Short() {
t.Skip("Skipping disk-aware integration test in short mode")
}
testDir, err := os.MkdirTemp("", "seaweedfs_disk_aware_ec_test_")
require.NoError(t, err)
defer os.RemoveAll(testDir)
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
defer cancel()
// Start cluster with MULTIPLE DISKS per volume server
cluster, err := startMultiDiskCluster(ctx, testDir)
require.NoError(t, err)
defer cluster.Stop()
// Wait for servers to be ready
require.NoError(t, waitForServer("127.0.0.1:9334", 30*time.Second))
for i := 0; i < 3; i++ {
require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:809%d", i), 30*time.Second))
}
// Wait longer for volume servers to register with master and create volumes
t.Log("Waiting for volume servers to register with master...")
time.Sleep(10 * time.Second)
// Create command environment
options := &shell.ShellOptions{
Masters: stringPtr("127.0.0.1:9334"),
GrpcDialOption: grpc.WithInsecure(),
FilerGroup: stringPtr("default"),
}
commandEnv := shell.NewCommandEnv(options)
// Connect to master with longer timeout
ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
go commandEnv.MasterClient.KeepConnectedToMaster(ctx2)
commandEnv.MasterClient.WaitUntilConnected(ctx2)
// Wait for master client to fully sync
time.Sleep(5 * time.Second)
// Upload test data to create a volume - retry if volumes not ready
var volumeId needle.VolumeId
testData := []byte("Disk-aware EC rebalancing test data - this needs to be large enough to create a volume")
for retry := 0; retry < 5; retry++ {
volumeId, err = uploadTestDataToMaster(testData, "127.0.0.1:9334")
if err == nil {
break
}
t.Logf("Upload attempt %d failed: %v, retrying...", retry+1, err)
time.Sleep(3 * time.Second)
}
require.NoError(t, err, "Failed to upload test data after retries")
t.Logf("Created volume %d for disk-aware EC test", volumeId)
// Wait for volume to be registered
time.Sleep(3 * time.Second)
t.Run("verify_multi_disk_setup", func(t *testing.T) {
// Verify that each server has multiple disk directories
for server := 0; server < 3; server++ {
diskCount := 0
for disk := 0; disk < 4; disk++ {
diskDir := filepath.Join(testDir, fmt.Sprintf("server%d_disk%d", server, disk))
if _, err := os.Stat(diskDir); err == nil {
diskCount++
}
}
assert.Equal(t, 4, diskCount, "Server %d should have 4 disk directories", server)
t.Logf("Server %d has %d disk directories", server, diskCount)
}
})
t.Run("ec_encode_with_disk_awareness", func(t *testing.T) {
// Get lock first
lockCmd := shell.Commands[findCommandIndex("lock")]
var lockOutput bytes.Buffer
err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
if err != nil {
t.Logf("Lock command failed: %v", err)
}
// Execute EC encoding
var output bytes.Buffer
ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")]
args := []string{"-volumeId", fmt.Sprintf("%d", volumeId), "-collection", "test", "-force"}
// Capture output
oldStdout := os.Stdout
oldStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stdout = w
os.Stderr = w
err = ecEncodeCmd.Do(args, commandEnv, &output)
w.Close()
os.Stdout = oldStdout
os.Stderr = oldStderr
capturedOutput, _ := io.ReadAll(r)
outputStr := string(capturedOutput) + output.String()
t.Logf("EC encode output:\n%s", outputStr)
if err != nil {
t.Logf("EC encoding completed with error: %v", err)
} else {
t.Logf("EC encoding completed successfully")
}
})
t.Run("verify_disk_level_shard_distribution", func(t *testing.T) {
// Wait for shards to be distributed
time.Sleep(2 * time.Second)
// Count shards on each disk of each server
diskDistribution := countShardsPerDisk(testDir, uint32(volumeId))
totalShards := 0
disksWithShards := 0
maxShardsOnSingleDisk := 0
t.Logf("Disk-level shard distribution for volume %d:", volumeId)
for server, disks := range diskDistribution {
for diskId, shardCount := range disks {
if shardCount > 0 {
t.Logf(" %s disk %d: %d shards", server, diskId, shardCount)
totalShards += shardCount
disksWithShards++
if shardCount > maxShardsOnSingleDisk {
maxShardsOnSingleDisk = shardCount
}
}
}
}
t.Logf("Summary: %d total shards across %d disks (max %d on single disk)",
totalShards, disksWithShards, maxShardsOnSingleDisk)
// EC creates 14 shards (10 data + 4 parity), plus .ecx and .ecj files
// We should see shards distributed across multiple disks
if disksWithShards > 1 {
t.Logf("PASS: Shards distributed across %d disks", disksWithShards)
} else {
t.Logf("INFO: Shards on %d disk(s) - may be expected if volume was on single disk", disksWithShards)
}
})
t.Run("test_ec_balance_disk_awareness", func(t *testing.T) {
// Calculate initial disk balance variance
initialDistribution := countShardsPerDisk(testDir, uint32(volumeId))
initialVariance := calculateDiskShardVariance(initialDistribution)
t.Logf("Initial disk shard variance: %.2f", initialVariance)
// Run ec.balance command
var output bytes.Buffer
ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")]
oldStdout := os.Stdout
oldStderr := os.Stderr
r, w, _ := os.Pipe()
os.Stdout = w
os.Stderr = w
err := ecBalanceCmd.Do([]string{"-force"}, commandEnv, &output)
w.Close()
os.Stdout = oldStdout
os.Stderr = oldStderr
capturedOutput, _ := io.ReadAll(r)
outputStr := string(capturedOutput) + output.String()
if err != nil {
t.Logf("ec.balance error: %v", err)
}
t.Logf("ec.balance output:\n%s", outputStr)
// Wait for balance to complete
time.Sleep(2 * time.Second)
// Calculate final disk balance variance
finalDistribution := countShardsPerDisk(testDir, uint32(volumeId))
finalVariance := calculateDiskShardVariance(finalDistribution)
t.Logf("Final disk shard variance: %.2f", finalVariance)
t.Logf("Variance change: %.2f -> %.2f", initialVariance, finalVariance)
})
t.Run("verify_no_disk_overload", func(t *testing.T) {
// Verify that no single disk has too many shards of the same volume
diskDistribution := countShardsPerDisk(testDir, uint32(volumeId))
for server, disks := range diskDistribution {
for diskId, shardCount := range disks {
// With 14 EC shards and 12 disks (3 servers x 4 disks), ideally ~1-2 shards per disk
// Allow up to 4 shards per disk as a reasonable threshold
if shardCount > 4 {
t.Logf("WARNING: %s disk %d has %d shards (may indicate imbalance)",
server, diskId, shardCount)
}
}
}
})
}
// MultiDiskCluster represents a test cluster with multiple disks per volume server
type MultiDiskCluster struct {
masterCmd *exec.Cmd
volumeServers []*exec.Cmd
testDir string
}
func (c *MultiDiskCluster) Stop() {
// Stop volume servers first
for _, cmd := range c.volumeServers {
if cmd != nil && cmd.Process != nil {
cmd.Process.Kill()
cmd.Wait()
}
}
// Stop master server
if c.masterCmd != nil && c.masterCmd.Process != nil {
c.masterCmd.Process.Kill()
c.masterCmd.Wait()
}
}
// startMultiDiskCluster starts a SeaweedFS cluster with multiple disks per volume server
func startMultiDiskCluster(ctx context.Context, dataDir string) (*MultiDiskCluster, error) {
weedBinary := findWeedBinary()
if weedBinary == "" {
return nil, fmt.Errorf("weed binary not found")
}
cluster := &MultiDiskCluster{testDir: dataDir}
// Create master directory
masterDir := filepath.Join(dataDir, "master")
os.MkdirAll(masterDir, 0755)
// Start master server on a different port to avoid conflict
masterCmd := exec.CommandContext(ctx, weedBinary, "master",
"-port", "9334",
"-mdir", masterDir,
"-volumeSizeLimitMB", "10",
"-ip", "127.0.0.1",
)
masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
if err != nil {
return nil, fmt.Errorf("failed to create master log file: %v", err)
}
masterCmd.Stdout = masterLogFile
masterCmd.Stderr = masterLogFile
if err := masterCmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start master server: %v", err)
}
cluster.masterCmd = masterCmd
// Wait for master to be ready
time.Sleep(2 * time.Second)
// Start 3 volume servers, each with 4 disks
const numServers = 3
const disksPerServer = 4
for i := 0; i < numServers; i++ {
// Create 4 disk directories per server
var diskDirs []string
var maxVolumes []string
for d := 0; d < disksPerServer; d++ {
diskDir := filepath.Join(dataDir, fmt.Sprintf("server%d_disk%d", i, d))
if err := os.MkdirAll(diskDir, 0755); err != nil {
cluster.Stop()
return nil, fmt.Errorf("failed to create disk dir: %v", err)
}
diskDirs = append(diskDirs, diskDir)
maxVolumes = append(maxVolumes, "5")
}
port := fmt.Sprintf("809%d", i)
rack := fmt.Sprintf("rack%d", i)
volumeCmd := exec.CommandContext(ctx, weedBinary, "volume",
"-port", port,
"-dir", strings.Join(diskDirs, ","),
"-max", strings.Join(maxVolumes, ","),
"-mserver", "127.0.0.1:9334",
"-ip", "127.0.0.1",
"-dataCenter", "dc1",
"-rack", rack,
)
// Create log file for this volume server
logDir := filepath.Join(dataDir, fmt.Sprintf("server%d_logs", i))
os.MkdirAll(logDir, 0755)
volumeLogFile, err := os.Create(filepath.Join(logDir, "volume.log"))
if err != nil {
cluster.Stop()
return nil, fmt.Errorf("failed to create volume log file: %v", err)
}
volumeCmd.Stdout = volumeLogFile
volumeCmd.Stderr = volumeLogFile
if err := volumeCmd.Start(); err != nil {
cluster.Stop()
return nil, fmt.Errorf("failed to start volume server %d: %v", i, err)
}
cluster.volumeServers = append(cluster.volumeServers, volumeCmd)
}
// Wait for volume servers to register with master
// Multi-disk servers may take longer to initialize
time.Sleep(8 * time.Second)
return cluster, nil
}
// uploadTestDataToMaster uploads test data to a specific master address
func uploadTestDataToMaster(data []byte, masterAddress string) (needle.VolumeId, error) {
assignResult, err := operation.Assign(context.Background(), func(ctx context.Context) pb.ServerAddress {
return pb.ServerAddress(masterAddress)
}, grpc.WithInsecure(), &operation.VolumeAssignRequest{
Count: 1,
Collection: "test",
Replication: "000",
})
if err != nil {
return 0, err
}
uploader, err := operation.NewUploader()
if err != nil {
return 0, err
}
uploadResult, err, _ := uploader.Upload(context.Background(), bytes.NewReader(data), &operation.UploadOption{
UploadUrl: "http://" + assignResult.Url + "/" + assignResult.Fid,
Filename: "testfile.txt",
MimeType: "text/plain",
})
if err != nil {
return 0, err
}
if uploadResult.Error != "" {
return 0, fmt.Errorf("upload error: %s", uploadResult.Error)
}
fid, err := needle.ParseFileIdFromString(assignResult.Fid)
if err != nil {
return 0, err
}
return fid.VolumeId, nil
}
// countShardsPerDisk counts EC shards on each disk of each server
// Returns map: "serverN" -> map[diskId]shardCount
func countShardsPerDisk(testDir string, volumeId uint32) map[string]map[int]int {
result := make(map[string]map[int]int)
const numServers = 3
const disksPerServer = 4
for server := 0; server < numServers; server++ {
serverKey := fmt.Sprintf("server%d", server)
result[serverKey] = make(map[int]int)
for disk := 0; disk < disksPerServer; disk++ {
diskDir := filepath.Join(testDir, fmt.Sprintf("server%d_disk%d", server, disk))
count, err := countECShardFiles(diskDir, volumeId)
if err == nil && count > 0 {
result[serverKey][disk] = count
}
}
}
return result
}
// calculateDiskShardVariance measures how evenly shards are distributed across disks
// Lower variance means better distribution
func calculateDiskShardVariance(distribution map[string]map[int]int) float64 {
var counts []float64
for _, disks := range distribution {
for _, count := range disks {
if count > 0 {
counts = append(counts, float64(count))
}
}
}
if len(counts) == 0 {
return 0
}
// Calculate mean
mean := 0.0
for _, c := range counts {
mean += c
}
mean /= float64(len(counts))
// Calculate variance
variance := 0.0
for _, c := range counts {
variance += (c - mean) * (c - mean)
}
return math.Sqrt(variance / float64(len(counts)))
}