Fix imbalance detection disk type grouping and volume grow errors (#8097)
* Fix imbalance detection disk type grouping and volume grow errors This PR addresses two issues: 1. Imbalance Detection: Previously, balance detection did not verify disk types, leading to false positives when comparing heterogenous nodes (e.g. SSD vs HDD). Logic is now updated to group volumes by DiskType before calculating imbalance. 2. Volume Grow Errors: Fixed a variable scope issue in master_grpc_server_volume.go and added a pre-check for available space to prevent 'only 0 volumes left' error logs when a disk type is full or abandoned. Included units tests for the detection logic. * Refactor balance detection loop into detectForDiskType * Fix potential panic in volume grow logic by checking replica placement parse error
This commit is contained in:
@@ -59,8 +59,8 @@ func (ms *MasterServer) ProcessGrowRequest() {
|
||||
continue
|
||||
}
|
||||
dcs := ms.Topo.ListDCAndRacks()
|
||||
var err error
|
||||
for _, vlc := range ms.Topo.ListVolumeLayoutCollections() {
|
||||
var err error
|
||||
vl := vlc.VolumeLayout
|
||||
lastGrowCount := vl.GetLastGrowCount()
|
||||
if vl.HasGrowRequest() {
|
||||
@@ -74,8 +74,14 @@ func (ms *MasterServer) ProcessGrowRequest() {
|
||||
|
||||
switch {
|
||||
case mustGrow > 0:
|
||||
vgr.WritableVolumeCount = uint32(mustGrow)
|
||||
_, err = ms.VolumeGrow(ctx, vgr)
|
||||
if rp, rpErr := super_block.NewReplicaPlacementFromString(vgr.Replication); rpErr != nil {
|
||||
glog.V(0).Infof("failed to parse replica placement %s: %v", vgr.Replication, rpErr)
|
||||
} else {
|
||||
vgr.WritableVolumeCount = uint32(mustGrow)
|
||||
if ms.Topo.AvailableSpaceFor(&topology.VolumeGrowOption{DiskType: types.ToDiskType(vgr.DiskType)}) >= int64(vgr.WritableVolumeCount*uint32(rp.GetCopyCount())) {
|
||||
_, err = ms.VolumeGrow(ctx, vgr)
|
||||
}
|
||||
}
|
||||
case lastGrowCount > 0 && writable < int(lastGrowCount*2) && float64(crowded+volumeGrowStepCount) > float64(writable)*topology.VolumeGrowStrategy.Threshold:
|
||||
vgr.WritableVolumeCount = volumeGrowStepCount
|
||||
_, err = ms.VolumeGrow(ctx, vgr)
|
||||
|
||||
@@ -20,26 +20,46 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
|
||||
|
||||
balanceConfig := config.(*Config)
|
||||
|
||||
// Skip if cluster is too small
|
||||
// Group volumes by disk type to ensure we compare apples to apples
|
||||
volumesByDiskType := make(map[string][]*types.VolumeHealthMetrics)
|
||||
for _, metric := range metrics {
|
||||
volumesByDiskType[metric.DiskType] = append(volumesByDiskType[metric.DiskType], metric)
|
||||
}
|
||||
|
||||
var allParams []*types.TaskDetectionResult
|
||||
|
||||
for diskType, diskMetrics := range volumesByDiskType {
|
||||
if task := detectForDiskType(diskType, diskMetrics, balanceConfig, clusterInfo); task != nil {
|
||||
allParams = append(allParams, task)
|
||||
}
|
||||
}
|
||||
|
||||
return allParams, nil
|
||||
}
|
||||
|
||||
// detectForDiskType performs balance detection for a specific disk type
|
||||
func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics, balanceConfig *Config, clusterInfo *types.ClusterInfo) *types.TaskDetectionResult {
|
||||
// Skip if cluster segment is too small
|
||||
minVolumeCount := 2 // More reasonable for small clusters
|
||||
if len(metrics) < minVolumeCount {
|
||||
glog.Infof("BALANCE: No tasks created - cluster too small (%d volumes, need ≥%d)", len(metrics), minVolumeCount)
|
||||
return nil, nil
|
||||
if len(diskMetrics) < minVolumeCount {
|
||||
// Only log at verbose level to avoid spamming for small/empty disk types
|
||||
glog.V(1).Infof("BALANCE [%s]: No tasks created - cluster too small (%d volumes, need ≥%d)", diskType, len(diskMetrics), minVolumeCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Analyze volume distribution across servers
|
||||
serverVolumeCounts := make(map[string]int)
|
||||
for _, metric := range metrics {
|
||||
for _, metric := range diskMetrics {
|
||||
serverVolumeCounts[metric.Server]++
|
||||
}
|
||||
|
||||
if len(serverVolumeCounts) < balanceConfig.MinServerCount {
|
||||
glog.Infof("BALANCE: No tasks created - too few servers (%d servers, need ≥%d)", len(serverVolumeCounts), balanceConfig.MinServerCount)
|
||||
return nil, nil
|
||||
glog.V(1).Infof("BALANCE [%s]: No tasks created - too few servers (%d servers, need ≥%d)", diskType, len(serverVolumeCounts), balanceConfig.MinServerCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Calculate balance metrics
|
||||
totalVolumes := len(metrics)
|
||||
totalVolumes := len(diskMetrics)
|
||||
avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts))
|
||||
|
||||
maxVolumes := 0
|
||||
@@ -61,14 +81,14 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
|
||||
// Check if imbalance exceeds threshold
|
||||
imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
|
||||
if imbalanceRatio <= balanceConfig.ImbalanceThreshold {
|
||||
glog.Infof("BALANCE: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f",
|
||||
imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
|
||||
return nil, nil
|
||||
glog.Infof("BALANCE [%s]: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f",
|
||||
diskType, imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Select a volume from the overloaded server for balance
|
||||
var selectedVolume *types.VolumeHealthMetrics
|
||||
for _, metric := range metrics {
|
||||
for _, metric := range diskMetrics {
|
||||
if metric.Server == maxServer {
|
||||
selectedVolume = metric
|
||||
break
|
||||
@@ -76,13 +96,13 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
|
||||
}
|
||||
|
||||
if selectedVolume == nil {
|
||||
glog.Warningf("BALANCE: Could not find volume on overloaded server %s", maxServer)
|
||||
return nil, nil
|
||||
glog.Warningf("BALANCE [%s]: Could not find volume on overloaded server %s", diskType, maxServer)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create balance task with volume and destination planning info
|
||||
reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
|
||||
imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
|
||||
reason := fmt.Sprintf("Cluster imbalance detected for %s: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
|
||||
diskType, imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
|
||||
|
||||
// Generate task ID for ActiveTopology integration
|
||||
taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().Unix())
|
||||
@@ -102,21 +122,22 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
|
||||
if clusterInfo.ActiveTopology != nil {
|
||||
// Check if ANY task already exists in ActiveTopology for this volume
|
||||
if clusterInfo.ActiveTopology.HasAnyTask(selectedVolume.VolumeID) {
|
||||
glog.V(2).Infof("BALANCE: Skipping volume %d, task already exists in ActiveTopology", selectedVolume.VolumeID)
|
||||
return nil, nil
|
||||
glog.V(2).Infof("BALANCE [%s]: Skipping volume %d, task already exists in ActiveTopology", diskType, selectedVolume.VolumeID)
|
||||
return nil
|
||||
}
|
||||
|
||||
destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume)
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err)
|
||||
return nil, nil // Skip this task if destination planning fails
|
||||
return nil
|
||||
}
|
||||
|
||||
// Find the actual disk containing the volume on the source server
|
||||
sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server)
|
||||
if !found {
|
||||
return nil, fmt.Errorf("BALANCE: Could not find volume %d (collection: %s) on source server %s - unable to create balance task",
|
||||
selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server)
|
||||
glog.Warningf("BALANCE [%s]: Could not find volume %d (collection: %s) on source server %s - unable to create balance task",
|
||||
diskType, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create typed parameters with unified source and target information
|
||||
@@ -175,17 +196,18 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("BALANCE: Failed to add pending task for volume %d: %v", selectedVolume.VolumeID, err)
|
||||
glog.Warningf("BALANCE [%s]: Failed to add pending task for volume %d: %v", diskType, selectedVolume.VolumeID, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d",
|
||||
taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk)
|
||||
} else {
|
||||
glog.Warningf("No ActiveTopology available for destination planning in balance detection")
|
||||
return nil, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
return []*types.TaskDetectionResult{task}, nil
|
||||
return task
|
||||
}
|
||||
|
||||
// planBalanceDestination plans the destination for a balance operation
|
||||
|
||||
255
weed/worker/tasks/balance/detection_test.go
Normal file
255
weed/worker/tasks/balance/detection_test.go
Normal file
@@ -0,0 +1,255 @@
|
||||
package balance
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
|
||||
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
||||
)
|
||||
|
||||
func createMockTopology(volumes ...*types.VolumeHealthMetrics) *topology.ActiveTopology {
|
||||
at := topology.NewActiveTopology(0)
|
||||
|
||||
// Group volumes by server for easier topology construction
|
||||
volumesByServer := make(map[string][]*master_pb.VolumeInformationMessage)
|
||||
for _, v := range volumes {
|
||||
if _, ok := volumesByServer[v.Server]; !ok {
|
||||
volumesByServer[v.Server] = []*master_pb.VolumeInformationMessage{}
|
||||
}
|
||||
volumesByServer[v.Server] = append(volumesByServer[v.Server], &master_pb.VolumeInformationMessage{
|
||||
Id: v.VolumeID,
|
||||
Size: v.Size,
|
||||
Collection: v.Collection,
|
||||
ReplicaPlacement: 0,
|
||||
Ttl: 0,
|
||||
Version: 1,
|
||||
})
|
||||
}
|
||||
|
||||
topoInfo := &master_pb.TopologyInfo{
|
||||
DataCenterInfos: []*master_pb.DataCenterInfo{
|
||||
{
|
||||
Id: "dc1",
|
||||
RackInfos: []*master_pb.RackInfo{
|
||||
{
|
||||
Id: "rack1",
|
||||
DataNodeInfos: []*master_pb.DataNodeInfo{
|
||||
// SSD Nodes
|
||||
{
|
||||
Id: "ssd-server-1",
|
||||
Address: "ssd-server-1:8080",
|
||||
DiskInfos: map[string]*master_pb.DiskInfo{
|
||||
"ssd": {
|
||||
Type: "ssd",
|
||||
DiskId: 1,
|
||||
VolumeInfos: volumesByServer["ssd-server-1"],
|
||||
MaxVolumeCount: 1000,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Id: "ssd-server-2",
|
||||
Address: "ssd-server-2:8080",
|
||||
DiskInfos: map[string]*master_pb.DiskInfo{
|
||||
"ssd": {
|
||||
Type: "ssd",
|
||||
DiskId: 2,
|
||||
VolumeInfos: volumesByServer["ssd-server-2"],
|
||||
MaxVolumeCount: 1000,
|
||||
},
|
||||
},
|
||||
},
|
||||
// HDD Nodes
|
||||
{
|
||||
Id: "hdd-server-1",
|
||||
Address: "hdd-server-1:8080",
|
||||
DiskInfos: map[string]*master_pb.DiskInfo{
|
||||
"hdd": {
|
||||
Type: "hdd",
|
||||
DiskId: 3, // Changed index to avoid conflict
|
||||
VolumeInfos: volumesByServer["hdd-server-1"],
|
||||
MaxVolumeCount: 1000,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Id: "hdd-server-2",
|
||||
Address: "hdd-server-2:8080",
|
||||
DiskInfos: map[string]*master_pb.DiskInfo{
|
||||
"hdd": {
|
||||
Type: "hdd",
|
||||
DiskId: 4,
|
||||
VolumeInfos: volumesByServer["hdd-server-2"],
|
||||
MaxVolumeCount: 1000,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
at.UpdateTopology(topoInfo)
|
||||
return at
|
||||
}
|
||||
|
||||
func TestDetection_MixedDiskTypes(t *testing.T) {
|
||||
// Setup metrics
|
||||
// 2 SSD servers with 10 volumes each (Balanced)
|
||||
// 2 HDD servers with 100 volumes each (Balanced)
|
||||
|
||||
metrics := []*types.VolumeHealthMetrics{}
|
||||
|
||||
// SSD Servers
|
||||
for i := 0; i < 10; i++ {
|
||||
metrics = append(metrics, &types.VolumeHealthMetrics{
|
||||
VolumeID: uint32(i + 1),
|
||||
Server: "ssd-server-1",
|
||||
ServerAddress: "ssd-server-1:8080",
|
||||
DiskType: "ssd",
|
||||
Collection: "c1",
|
||||
Size: 1024,
|
||||
DataCenter: "dc1",
|
||||
Rack: "rack1",
|
||||
})
|
||||
}
|
||||
for i := 0; i < 10; i++ {
|
||||
metrics = append(metrics, &types.VolumeHealthMetrics{
|
||||
VolumeID: uint32(20 + i + 1),
|
||||
Server: "ssd-server-2",
|
||||
ServerAddress: "ssd-server-2:8080",
|
||||
DiskType: "ssd",
|
||||
Collection: "c1",
|
||||
Size: 1024,
|
||||
DataCenter: "dc1",
|
||||
Rack: "rack1",
|
||||
})
|
||||
}
|
||||
|
||||
// HDD Servers
|
||||
for i := 0; i < 100; i++ {
|
||||
metrics = append(metrics, &types.VolumeHealthMetrics{
|
||||
VolumeID: uint32(100 + i + 1),
|
||||
Server: "hdd-server-1",
|
||||
ServerAddress: "hdd-server-1:8080",
|
||||
DiskType: "hdd",
|
||||
Collection: "c1",
|
||||
Size: 1024,
|
||||
DataCenter: "dc1",
|
||||
Rack: "rack1",
|
||||
})
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
metrics = append(metrics, &types.VolumeHealthMetrics{
|
||||
VolumeID: uint32(200 + i + 1),
|
||||
Server: "hdd-server-2",
|
||||
ServerAddress: "hdd-server-2:8080",
|
||||
DiskType: "hdd",
|
||||
Collection: "c1",
|
||||
Size: 1024,
|
||||
DataCenter: "dc1",
|
||||
Rack: "rack1",
|
||||
})
|
||||
}
|
||||
|
||||
conf := &Config{
|
||||
BaseConfig: base.BaseConfig{
|
||||
Enabled: true,
|
||||
ScanIntervalSeconds: 30,
|
||||
MaxConcurrent: 1,
|
||||
},
|
||||
MinServerCount: 2,
|
||||
ImbalanceThreshold: 0.2, // 20%
|
||||
}
|
||||
|
||||
at := createMockTopology(metrics...)
|
||||
clusterInfo := &types.ClusterInfo{
|
||||
ActiveTopology: at,
|
||||
}
|
||||
|
||||
tasks, err := Detection(metrics, clusterInfo, conf)
|
||||
if err != nil {
|
||||
t.Fatalf("Detection failed: %v", err)
|
||||
}
|
||||
|
||||
if len(tasks) != 0 {
|
||||
t.Errorf("Expected 0 tasks for balanced mixed types, got %d", len(tasks))
|
||||
for _, task := range tasks {
|
||||
t.Logf("Computed Task: %+v", task.Reason)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDetection_ImbalancedDiskType(t *testing.T) {
|
||||
// Setup metrics
|
||||
// 2 SSD servers: One with 100, One with 10. Imbalance!
|
||||
metrics := []*types.VolumeHealthMetrics{}
|
||||
|
||||
// Server 1 (Overloaded SSD)
|
||||
for i := 0; i < 100; i++ {
|
||||
metrics = append(metrics, &types.VolumeHealthMetrics{
|
||||
VolumeID: uint32(i + 1),
|
||||
Server: "ssd-server-1",
|
||||
ServerAddress: "ssd-server-1:8080",
|
||||
DiskType: "ssd",
|
||||
Collection: "c1",
|
||||
Size: 1024,
|
||||
DataCenter: "dc1",
|
||||
Rack: "rack1",
|
||||
})
|
||||
}
|
||||
// Server 2 (Underloaded SSD)
|
||||
for i := 0; i < 10; i++ {
|
||||
metrics = append(metrics, &types.VolumeHealthMetrics{
|
||||
VolumeID: uint32(100 + i + 1),
|
||||
Server: "ssd-server-2",
|
||||
ServerAddress: "ssd-server-2:8080",
|
||||
DiskType: "ssd",
|
||||
Collection: "c1",
|
||||
Size: 1024,
|
||||
DataCenter: "dc1",
|
||||
Rack: "rack1",
|
||||
})
|
||||
}
|
||||
|
||||
conf := &Config{
|
||||
BaseConfig: base.BaseConfig{
|
||||
Enabled: true,
|
||||
ScanIntervalSeconds: 30,
|
||||
MaxConcurrent: 1,
|
||||
},
|
||||
MinServerCount: 2,
|
||||
ImbalanceThreshold: 0.2,
|
||||
}
|
||||
|
||||
at := createMockTopology(metrics...)
|
||||
clusterInfo := &types.ClusterInfo{
|
||||
ActiveTopology: at,
|
||||
}
|
||||
|
||||
tasks, err := Detection(metrics, clusterInfo, conf)
|
||||
if err != nil {
|
||||
t.Fatalf("Detection failed: %v", err)
|
||||
}
|
||||
|
||||
if len(tasks) == 0 {
|
||||
t.Error("Expected tasks for imbalanced SSD cluster, got 0")
|
||||
} else {
|
||||
// Verify task details
|
||||
task := tasks[0]
|
||||
if task.VolumeID == 0 {
|
||||
t.Error("Task has invalid VolumeID")
|
||||
}
|
||||
// Expect volume to be moving from ssd-server-1 to ssd-server-2
|
||||
if task.TypedParams.Sources[0].Node != "ssd-server-1:8080" {
|
||||
t.Errorf("Expected source ssd-server-1:8080, got %s", task.TypedParams.Sources[0].Node)
|
||||
}
|
||||
if task.TypedParams.Targets[0].Node != "ssd-server-2:8080" {
|
||||
t.Errorf("Expected target ssd-server-2:8080, got %s", task.TypedParams.Targets[0].Node)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user