diff --git a/test/plugin_workers/volume_balance/detection_test.go b/test/plugin_workers/volume_balance/detection_test.go index 069c6c4c9..0aa7e3a37 100644 --- a/test/plugin_workers/volume_balance/detection_test.go +++ b/test/plugin_workers/volume_balance/detection_test.go @@ -37,17 +37,20 @@ func TestVolumeBalanceDetectionIntegration(t *testing.T) { MasterGrpcAddresses: []string{master.Address()}, }, 10) require.NoError(t, err) - require.Len(t, proposals, 1) + // With 10 volumes on one server and 1 on the other (avg=5.5), + // multiple balance moves should be detected until imbalance is within threshold. + require.Greater(t, len(proposals), 1, "expected multiple balance proposals") - proposal := proposals[0] - require.Equal(t, "volume_balance", proposal.JobType) - paramsValue := proposal.Parameters["task_params_pb"] - require.NotNil(t, paramsValue) + for _, proposal := range proposals { + require.Equal(t, "volume_balance", proposal.JobType) + paramsValue := proposal.Parameters["task_params_pb"] + require.NotNil(t, paramsValue) - params := &worker_pb.TaskParams{} - require.NoError(t, proto.Unmarshal(paramsValue.GetBytesValue(), params)) - require.NotEmpty(t, params.Sources) - require.NotEmpty(t, params.Targets) + params := &worker_pb.TaskParams{} + require.NoError(t, proto.Unmarshal(paramsValue.GetBytesValue(), params)) + require.NotEmpty(t, params.Sources) + require.NotEmpty(t, params.Targets) + } } func buildBalanceVolumeListResponse(t *testing.T) *master_pb.VolumeListResponse { diff --git a/test/s3/cors/s3_cors_test.go b/test/s3/cors/s3_cors_test.go index 4d3d4555e..18a113d99 100644 --- a/test/s3/cors/s3_cors_test.go +++ b/test/s3/cors/s3_cors_test.go @@ -140,7 +140,7 @@ func TestCORSConfigurationManagement(t *testing.T) { Bucket: aws.String(bucketName), CORSConfiguration: corsConfig, }) - assert.NoError(t, err, "Should be able to put CORS configuration") + require.NoError(t, err, "Should be able to put CORS configuration") // Wait for metadata subscription to update cache time.Sleep(50 * time.Millisecond) @@ -149,9 +149,9 @@ func TestCORSConfigurationManagement(t *testing.T) { getResp, err := client.GetBucketCors(context.TODO(), &s3.GetBucketCorsInput{ Bucket: aws.String(bucketName), }) - assert.NoError(t, err, "Should be able to get CORS configuration") - assert.NotNil(t, getResp.CORSRules, "CORS configuration should not be nil") - assert.Len(t, getResp.CORSRules, 1, "Should have one CORS rule") + require.NoError(t, err, "Should be able to get CORS configuration") + require.NotNil(t, getResp.CORSRules, "CORS configuration should not be nil") + require.Len(t, getResp.CORSRules, 1, "Should have one CORS rule") rule := getResp.CORSRules[0] assert.Equal(t, []string{"*"}, rule.AllowedHeaders, "Allowed headers should match") diff --git a/weed/admin/topology/task_management.go b/weed/admin/topology/task_management.go index 3710b06ab..5fad4eda6 100644 --- a/weed/admin/topology/task_management.go +++ b/weed/admin/topology/task_management.go @@ -272,6 +272,40 @@ func (at *ActiveTopology) HasAnyTask(volumeID uint32) bool { return at.HasTask(volumeID, TaskTypeNone) } +// GetTaskServerAdjustments returns per-server volume count adjustments for +// pending and assigned tasks of the given type. For each task, source servers +// are decremented and destination servers are incremented, reflecting the +// projected volume distribution once in-flight tasks complete. +func (at *ActiveTopology) GetTaskServerAdjustments(taskType TaskType) map[string]int { + at.mutex.RLock() + defer at.mutex.RUnlock() + + adjustments := make(map[string]int) + for _, task := range at.pendingTasks { + if task.TaskType != taskType { + continue + } + for _, src := range task.Sources { + adjustments[src.SourceServer]-- + } + for _, dst := range task.Destinations { + adjustments[dst.TargetServer]++ + } + } + for _, task := range at.assignedTasks { + if task.TaskType != taskType { + continue + } + for _, src := range task.Sources { + adjustments[src.SourceServer]-- + } + for _, dst := range task.Destinations { + adjustments[dst.TargetServer]++ + } + } + return adjustments +} + // calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange { switch taskType { diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index d15fc6de0..1a3af2b0d 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -224,19 +224,14 @@ func (h *VolumeBalanceHandler) Detect( } clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology} - results, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig) + maxResults := int(request.MaxResults) + results, hasMore, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults) if err != nil { return err } - if traceErr := emitVolumeBalanceDetectionDecisionTrace(sender, metrics, workerConfig.TaskConfig, results); traceErr != nil { - glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr) - } - maxResults := int(request.MaxResults) - hasMore := false - if maxResults > 0 && len(results) > maxResults { - hasMore = true - results = results[:maxResults] + if traceErr := emitVolumeBalanceDetectionDecisionTrace(sender, metrics, activeTopology, workerConfig.TaskConfig, results); traceErr != nil { + glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr) } proposals := make([]*plugin_pb.JobProposal, 0, len(results)) @@ -267,6 +262,7 @@ func (h *VolumeBalanceHandler) Detect( func emitVolumeBalanceDetectionDecisionTrace( sender DetectionSender, metrics []*workertypes.VolumeHealthMetrics, + activeTopology *topology.ActiveTopology, taskConfig *balancetask.Config, results []*workertypes.TaskDetectionResult, ) error { @@ -362,7 +358,25 @@ func emitVolumeBalanceDetectionDecisionTrace( continue } + // Seed server counts from topology so zero-volume servers are included, + // matching the same logic used in balancetask.Detection. serverVolumeCounts := make(map[string]int) + if activeTopology != nil { + topologyInfo := activeTopology.GetTopologyInfo() + if topologyInfo != nil { + for _, dc := range topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for diskTypeName := range node.DiskInfos { + if diskTypeName == diskType { + serverVolumeCounts[node.Id] = 0 + } + } + } + } + } + } + } for _, metric := range diskMetrics { serverVolumeCounts[metric.Server]++ } diff --git a/weed/plugin/worker/volume_balance_handler_test.go b/weed/plugin/worker/volume_balance_handler_test.go index 86c4453c1..ace71ae3a 100644 --- a/weed/plugin/worker/volume_balance_handler_test.go +++ b/weed/plugin/worker/volume_balance_handler_test.go @@ -228,7 +228,7 @@ func TestEmitVolumeBalanceDetectionDecisionTraceNoTasks(t *testing.T) { {VolumeID: 4, Server: "server-b", DiskType: "hdd"}, } - if err := emitVolumeBalanceDetectionDecisionTrace(sender, metrics, config, nil); err != nil { + if err := emitVolumeBalanceDetectionDecisionTrace(sender, metrics, nil, config, nil); err != nil { t.Fatalf("emitVolumeBalanceDetectionDecisionTrace error: %v", err) } if len(sender.events) < 2 { diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 4a52ea943..313ff1ca9 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -2,6 +2,8 @@ package balance import ( "fmt" + "math" + "sort" "time" "github.com/seaweedfs/seaweedfs/weed/admin/topology" @@ -12,202 +14,340 @@ import ( "github.com/seaweedfs/seaweedfs/weed/worker/types" ) -// Detection implements the detection logic for balance tasks -func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { +// Detection implements the detection logic for balance tasks. +// maxResults limits how many balance operations are returned per invocation. +// A non-positive maxResults means no explicit limit (uses a large default). +// The returned truncated flag is true when detection stopped because it hit +// maxResults rather than running out of work. +func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig, maxResults int) ([]*types.TaskDetectionResult, bool, error) { if !config.IsEnabled() { - return nil, nil + return nil, false, nil + } + if clusterInfo == nil { + return nil, false, nil } balanceConfig := config.(*Config) + if maxResults <= 0 { + maxResults = math.MaxInt32 + } + // Group volumes by disk type to ensure we compare apples to apples volumesByDiskType := make(map[string][]*types.VolumeHealthMetrics) for _, metric := range metrics { volumesByDiskType[metric.DiskType] = append(volumesByDiskType[metric.DiskType], metric) } - var allParams []*types.TaskDetectionResult + // Sort disk types for deterministic iteration order when maxResults + // spans multiple disk types. + diskTypes := make([]string, 0, len(volumesByDiskType)) + for dt := range volumesByDiskType { + diskTypes = append(diskTypes, dt) + } + sort.Strings(diskTypes) - for diskType, diskMetrics := range volumesByDiskType { - if task := detectForDiskType(diskType, diskMetrics, balanceConfig, clusterInfo); task != nil { - allParams = append(allParams, task) + var allParams []*types.TaskDetectionResult + truncated := false + + for _, diskType := range diskTypes { + remaining := maxResults - len(allParams) + if remaining <= 0 { + truncated = true + break + } + tasks, diskTruncated := detectForDiskType(diskType, volumesByDiskType[diskType], balanceConfig, clusterInfo, remaining) + allParams = append(allParams, tasks...) + if diskTruncated { + truncated = true } } - return allParams, nil + return allParams, truncated, nil } -// detectForDiskType performs balance detection for a specific disk type -func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics, balanceConfig *Config, clusterInfo *types.ClusterInfo) *types.TaskDetectionResult { +// detectForDiskType performs balance detection for a specific disk type, +// returning up to maxResults balance tasks and whether it was truncated by the limit. +func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics, balanceConfig *Config, clusterInfo *types.ClusterInfo, maxResults int) ([]*types.TaskDetectionResult, bool) { // Skip if cluster segment is too small minVolumeCount := 2 // More reasonable for small clusters if len(diskMetrics) < minVolumeCount { // Only log at verbose level to avoid spamming for small/empty disk types glog.V(1).Infof("BALANCE [%s]: No tasks created - cluster too small (%d volumes, need ≥%d)", diskType, len(diskMetrics), minVolumeCount) - return nil + return nil, false } - // Analyze volume distribution across servers + // Analyze volume distribution across servers. + // Seed from ActiveTopology so servers with matching disk type but zero + // volumes are included in the count and imbalance calculation. serverVolumeCounts := make(map[string]int) + if clusterInfo.ActiveTopology != nil { + topologyInfo := clusterInfo.ActiveTopology.GetTopologyInfo() + if topologyInfo != nil { + for _, dc := range topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for diskTypeName := range node.DiskInfos { + if diskTypeName == diskType { + serverVolumeCounts[node.Id] = 0 + } + } + } + } + } + } + } for _, metric := range diskMetrics { serverVolumeCounts[metric.Server]++ } if len(serverVolumeCounts) < balanceConfig.MinServerCount { glog.V(1).Infof("BALANCE [%s]: No tasks created - too few servers (%d servers, need ≥%d)", diskType, len(serverVolumeCounts), balanceConfig.MinServerCount) - return nil + return nil, false } - // Calculate balance metrics - totalVolumes := len(diskMetrics) - avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts)) - - maxVolumes := 0 - minVolumes := totalVolumes - maxServer := "" - minServer := "" - - for server, count := range serverVolumeCounts { - if count > maxVolumes { - maxVolumes = count - maxServer = server - } - if count < minVolumes { - minVolumes = count - minServer = server - } + // Seed adjustments from existing pending/assigned balance tasks so that + // effectiveCounts reflects in-flight moves and prevents over-scheduling. + var adjustments map[string]int + if clusterInfo.ActiveTopology != nil { + adjustments = clusterInfo.ActiveTopology.GetTaskServerAdjustments(topology.TaskTypeBalance) } - - // Check if imbalance exceeds threshold - imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer - if imbalanceRatio <= balanceConfig.ImbalanceThreshold { - glog.Infof("BALANCE [%s]: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f", - diskType, imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) - return nil + if adjustments == nil { + adjustments = make(map[string]int) } + // Servers where we can no longer find eligible volumes or plan destinations + exhaustedServers := make(map[string]bool) - // Select a volume from the overloaded server for balance - var selectedVolume *types.VolumeHealthMetrics + // Sort servers for deterministic iteration and tie-breaking + sortedServers := make([]string, 0, len(serverVolumeCounts)) + for server := range serverVolumeCounts { + sortedServers = append(sortedServers, server) + } + sort.Strings(sortedServers) + + // Pre-index volumes by server with cursors to avoid O(maxResults * volumes) scanning. + // Sort each server's volumes by VolumeID for deterministic selection. + volumesByServer := make(map[string][]*types.VolumeHealthMetrics, len(serverVolumeCounts)) for _, metric := range diskMetrics { - if metric.Server == maxServer { + volumesByServer[metric.Server] = append(volumesByServer[metric.Server], metric) + } + for _, vols := range volumesByServer { + sort.Slice(vols, func(i, j int) bool { + return vols[i].VolumeID < vols[j].VolumeID + }) + } + serverCursors := make(map[string]int, len(serverVolumeCounts)) + + var results []*types.TaskDetectionResult + balanced := false + + for len(results) < maxResults { + // Compute effective volume counts with adjustments from planned moves + effectiveCounts := make(map[string]int, len(serverVolumeCounts)) + totalVolumes := 0 + for server, count := range serverVolumeCounts { + effective := count + adjustments[server] + if effective < 0 { + effective = 0 + } + effectiveCounts[server] = effective + totalVolumes += effective + } + avgVolumesPerServer := float64(totalVolumes) / float64(len(effectiveCounts)) + + maxVolumes := 0 + minVolumes := totalVolumes + maxServer := "" + minServer := "" + + for _, server := range sortedServers { + count := effectiveCounts[server] + // Min is calculated across all servers for an accurate imbalance ratio + if count < minVolumes { + minVolumes = count + minServer = server + } + // Max is only among non-exhausted servers since we can only move from them + if exhaustedServers[server] { + continue + } + if count > maxVolumes { + maxVolumes = count + maxServer = server + } + } + + if maxServer == "" { + // All servers exhausted + glog.V(1).Infof("BALANCE [%s]: All overloaded servers exhausted after %d task(s)", diskType, len(results)) + break + } + + // Check if imbalance exceeds threshold + imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer + if imbalanceRatio <= balanceConfig.ImbalanceThreshold { + if len(results) == 0 { + glog.Infof("BALANCE [%s]: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f", + diskType, imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) + } else { + glog.Infof("BALANCE [%s]: Created %d task(s), cluster now balanced. Imbalance=%.1f%% (threshold=%.1f%%)", + diskType, len(results), imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100) + } + balanced = true + break + } + + // Select a volume from the overloaded server using per-server cursor + var selectedVolume *types.VolumeHealthMetrics + serverVols := volumesByServer[maxServer] + cursor := serverCursors[maxServer] + for cursor < len(serverVols) { + metric := serverVols[cursor] + cursor++ + // Skip volumes that already have a task in ActiveTopology + if clusterInfo.ActiveTopology != nil && clusterInfo.ActiveTopology.HasAnyTask(metric.VolumeID) { + continue + } selectedVolume = metric break } + serverCursors[maxServer] = cursor + + if selectedVolume == nil { + glog.V(1).Infof("BALANCE [%s]: No more eligible volumes on overloaded server %s, trying other servers", diskType, maxServer) + exhaustedServers[maxServer] = true + continue + } + + // Plan destination and create task. + // On failure, continue to the next volume on the same server rather + // than exhausting the entire server — the failure may be per-volume + // (e.g., volume not found in topology, AddPendingTask failed). + task, destServerID := createBalanceTask(diskType, selectedVolume, clusterInfo) + if task == nil { + glog.V(1).Infof("BALANCE [%s]: Cannot plan task for volume %d on server %s, trying next volume", diskType, selectedVolume.VolumeID, maxServer) + continue + } + + results = append(results, task) + + // Adjust effective counts for the next iteration + adjustments[maxServer]-- + if destServerID != "" { + adjustments[destServerID]++ + } } - if selectedVolume == nil { - glog.Warningf("BALANCE [%s]: Could not find volume on overloaded server %s", diskType, maxServer) - return nil - } + // Truncated only if we hit maxResults and detection didn't naturally finish + truncated := len(results) >= maxResults && !balanced + return results, truncated +} - // Create balance task with volume and destination planning info - reason := fmt.Sprintf("Cluster imbalance detected for %s: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", - diskType, imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) - - // Generate task ID for ActiveTopology integration - taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().Unix()) +// createBalanceTask creates a single balance task for the selected volume. +// Returns (nil, "") if destination planning fails. +// On success, returns the task result and the canonical destination server ID. +func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) (*types.TaskDetectionResult, string) { + taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().UnixNano()) task := &types.TaskDetectionResult{ - TaskID: taskID, // Link to ActiveTopology pending task + TaskID: taskID, TaskType: types.TaskTypeBalance, VolumeID: selectedVolume.VolumeID, Server: selectedVolume.Server, Collection: selectedVolume.Collection, Priority: types.TaskPriorityNormal, - Reason: reason, + Reason: fmt.Sprintf("Cluster imbalance detected for %s disk type", + diskType), ScheduleAt: time.Now(), } // Plan destination if ActiveTopology is available - if clusterInfo.ActiveTopology != nil { - // Check if ANY task already exists in ActiveTopology for this volume - if clusterInfo.ActiveTopology.HasAnyTask(selectedVolume.VolumeID) { - glog.V(2).Infof("BALANCE [%s]: Skipping volume %d, task already exists in ActiveTopology", diskType, selectedVolume.VolumeID) - return nil - } - - destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) - if err != nil { - glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err) - return nil - } - - // Find the actual disk containing the volume on the source server - sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) - if !found { - glog.Warningf("BALANCE [%s]: Could not find volume %d (collection: %s) on source server %s - unable to create balance task", - diskType, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) - return nil - } - - // Create typed parameters with unified source and target information - task.TypedParams = &worker_pb.TaskParams{ - TaskId: taskID, // Link to ActiveTopology pending task - VolumeId: selectedVolume.VolumeID, - Collection: selectedVolume.Collection, - VolumeSize: selectedVolume.Size, // Store original volume size for tracking changes - - // Unified sources and targets - the only way to specify locations - Sources: []*worker_pb.TaskSource{ - { - Node: selectedVolume.ServerAddress, - DiskId: sourceDisk, - VolumeId: selectedVolume.VolumeID, - EstimatedSize: selectedVolume.Size, - DataCenter: selectedVolume.DataCenter, - Rack: selectedVolume.Rack, - }, - }, - Targets: []*worker_pb.TaskTarget{ - { - Node: destinationPlan.TargetAddress, - DiskId: destinationPlan.TargetDisk, - VolumeId: selectedVolume.VolumeID, - EstimatedSize: destinationPlan.ExpectedSize, - DataCenter: destinationPlan.TargetDC, - Rack: destinationPlan.TargetRack, - }, - }, - - TaskParams: &worker_pb.TaskParams_BalanceParams{ - BalanceParams: &worker_pb.BalanceTaskParams{ - ForceMove: false, - TimeoutSeconds: 600, // 10 minutes default - }, - }, - } - - glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s", - selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode) - - // Add pending balance task to ActiveTopology for capacity management - targetDisk := destinationPlan.TargetDisk - - err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{ - TaskID: taskID, - TaskType: topology.TaskTypeBalance, - VolumeID: selectedVolume.VolumeID, - VolumeSize: int64(selectedVolume.Size), - Sources: []topology.TaskSourceSpec{ - {ServerID: selectedVolume.Server, DiskID: sourceDisk}, - }, - Destinations: []topology.TaskDestinationSpec{ - {ServerID: destinationPlan.TargetNode, DiskID: targetDisk}, - }, - }) - if err != nil { - glog.Warningf("BALANCE [%s]: Failed to add pending task for volume %d: %v", diskType, selectedVolume.VolumeID, err) - return nil - } - - glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d", - taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk) - } else { + if clusterInfo.ActiveTopology == nil { glog.Warningf("No ActiveTopology available for destination planning in balance detection") - return nil + return nil, "" } - return task + destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) + if err != nil { + glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err) + return nil, "" + } + + // Find the actual disk containing the volume on the source server + sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) + if !found { + glog.Warningf("BALANCE [%s]: Could not find volume %d (collection: %s) on source server %s - unable to create balance task", + diskType, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) + return nil, "" + } + + // Update reason with full details now that we have destination info + task.Reason = fmt.Sprintf("Cluster imbalance detected for %s: move volume %d from %s to %s", + diskType, selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode) + + // Create typed parameters with unified source and target information + task.TypedParams = &worker_pb.TaskParams{ + TaskId: taskID, + VolumeId: selectedVolume.VolumeID, + Collection: selectedVolume.Collection, + VolumeSize: selectedVolume.Size, + + Sources: []*worker_pb.TaskSource{ + { + Node: selectedVolume.ServerAddress, + DiskId: sourceDisk, + VolumeId: selectedVolume.VolumeID, + EstimatedSize: selectedVolume.Size, + DataCenter: selectedVolume.DataCenter, + Rack: selectedVolume.Rack, + }, + }, + Targets: []*worker_pb.TaskTarget{ + { + Node: destinationPlan.TargetAddress, + DiskId: destinationPlan.TargetDisk, + VolumeId: selectedVolume.VolumeID, + EstimatedSize: destinationPlan.ExpectedSize, + DataCenter: destinationPlan.TargetDC, + Rack: destinationPlan.TargetRack, + }, + }, + + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{ + ForceMove: false, + TimeoutSeconds: 600, // 10 minutes default + }, + }, + } + + glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s", + selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode) + + // Add pending balance task to ActiveTopology for capacity management + targetDisk := destinationPlan.TargetDisk + + err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{ + TaskID: taskID, + TaskType: topology.TaskTypeBalance, + VolumeID: selectedVolume.VolumeID, + VolumeSize: int64(selectedVolume.Size), + Sources: []topology.TaskSourceSpec{ + {ServerID: selectedVolume.Server, DiskID: sourceDisk}, + }, + Destinations: []topology.TaskDestinationSpec{ + {ServerID: destinationPlan.TargetNode, DiskID: targetDisk}, + }, + }) + if err != nil { + glog.Warningf("BALANCE [%s]: Failed to add pending task for volume %d: %v", diskType, selectedVolume.VolumeID, err) + return nil, "" + } + + glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d", + taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk) + + return task, destinationPlan.TargetNode } // planBalanceDestination plans the destination for a balance operation @@ -244,9 +384,17 @@ func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVol return nil, fmt.Errorf("no available disks for balance operation") } + // Sort available disks by NodeID then DiskID for deterministic tie-breaking + sort.Slice(availableDisks, func(i, j int) bool { + if availableDisks[i].NodeID != availableDisks[j].NodeID { + return availableDisks[i].NodeID < availableDisks[j].NodeID + } + return availableDisks[i].DiskID < availableDisks[j].DiskID + }) + // Find the best destination disk based on balance criteria var bestDisk *topology.DiskInfo - bestScore := -1.0 + bestScore := math.Inf(-1) for _, disk := range availableDisks { // Ensure disk type matches @@ -282,7 +430,9 @@ func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVol }, nil } -// calculateBalanceScore calculates placement score for balance operations +// calculateBalanceScore calculates placement score for balance operations. +// LoadCount reflects pending+assigned tasks on the disk, so we factor it into +// the utilization estimate to avoid stacking multiple moves onto the same target. func calculateBalanceScore(disk *topology.DiskInfo, sourceRack, sourceDC string, volumeSize uint64) float64 { if disk.DiskInfo == nil { return 0.0 @@ -290,10 +440,13 @@ func calculateBalanceScore(disk *topology.DiskInfo, sourceRack, sourceDC string, score := 0.0 - // Prefer disks with lower current volume count (better for balance) + // Prefer disks with lower effective volume count (current + pending moves). + // LoadCount is included so that disks already targeted by planned moves + // appear more utilized, naturally spreading work across targets. if disk.DiskInfo.MaxVolumeCount > 0 { - utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount) - score += (1.0 - utilization) * 40.0 // Up to 40 points for low utilization + effectiveVolumeCount := float64(disk.DiskInfo.VolumeCount) + float64(disk.LoadCount) + utilization := effectiveVolumeCount / float64(disk.DiskInfo.MaxVolumeCount) + score += (1.0 - utilization) * 50.0 // Up to 50 points for low utilization } // Prefer different racks for better distribution @@ -306,8 +459,5 @@ func calculateBalanceScore(disk *topology.DiskInfo, sourceRack, sourceDC string, score += 20.0 } - // Prefer disks with lower current load - score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load - return score } diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go index 3fff3720b..344c63f04 100644 --- a/weed/worker/tasks/balance/detection_test.go +++ b/weed/worker/tasks/balance/detection_test.go @@ -1,6 +1,7 @@ package balance import ( + "fmt" "testing" "github.com/seaweedfs/seaweedfs/weed/admin/topology" @@ -9,6 +10,145 @@ import ( "github.com/seaweedfs/seaweedfs/weed/worker/types" ) +// serverSpec describes a server for the topology builder. +type serverSpec struct { + id string // e.g. "node-1" + diskType string // e.g. "ssd", "hdd" + diskID uint32 + dc string + rack string + maxVolumes int64 +} + +// buildTopology constructs an ActiveTopology from server specs and volume metrics. +func buildTopology(servers []serverSpec, metrics []*types.VolumeHealthMetrics) *topology.ActiveTopology { + at := topology.NewActiveTopology(0) + + volumesByServer := make(map[string][]*master_pb.VolumeInformationMessage) + for _, m := range metrics { + volumesByServer[m.Server] = append(volumesByServer[m.Server], &master_pb.VolumeInformationMessage{ + Id: m.VolumeID, + Size: m.Size, + Collection: m.Collection, + Version: 1, + }) + } + + // Group servers by dc → rack for topology construction + type rackKey struct{ dc, rack string } + rackNodes := make(map[rackKey][]*master_pb.DataNodeInfo) + + for _, s := range servers { + maxVol := s.maxVolumes + if maxVol == 0 { + maxVol = 1000 + } + node := &master_pb.DataNodeInfo{ + Id: s.id, + Address: s.id + ":8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + s.diskType: { + Type: s.diskType, + DiskId: s.diskID, + VolumeInfos: volumesByServer[s.id], + VolumeCount: int64(len(volumesByServer[s.id])), + MaxVolumeCount: maxVol, + }, + }, + } + key := rackKey{s.dc, s.rack} + rackNodes[key] = append(rackNodes[key], node) + } + + // Build DC → Rack tree + dcRacks := make(map[string][]*master_pb.RackInfo) + for key, nodes := range rackNodes { + dcRacks[key.dc] = append(dcRacks[key.dc], &master_pb.RackInfo{ + Id: key.rack, + DataNodeInfos: nodes, + }) + } + + var dcInfos []*master_pb.DataCenterInfo + for dcID, racks := range dcRacks { + dcInfos = append(dcInfos, &master_pb.DataCenterInfo{ + Id: dcID, + RackInfos: racks, + }) + } + + at.UpdateTopology(&master_pb.TopologyInfo{DataCenterInfos: dcInfos}) + return at +} + +// makeVolumes generates n VolumeHealthMetrics for a server starting at volumeIDBase. +func makeVolumes(server, diskType, dc, rack, collection string, volumeIDBase uint32, n int) []*types.VolumeHealthMetrics { + out := make([]*types.VolumeHealthMetrics, n) + for i := range out { + out[i] = &types.VolumeHealthMetrics{ + VolumeID: volumeIDBase + uint32(i), + Server: server, + ServerAddress: server + ":8080", + DiskType: diskType, + Collection: collection, + Size: 1024, + DataCenter: dc, + Rack: rack, + } + } + return out +} + +func defaultConf() *Config { + return &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 30, + MaxConcurrent: 1, + }, + MinServerCount: 2, + ImbalanceThreshold: 0.2, + } +} + +// assertNoDuplicateVolumes verifies every task moves a distinct volume. +func assertNoDuplicateVolumes(t *testing.T, tasks []*types.TaskDetectionResult) { + t.Helper() + seen := make(map[uint32]bool) + for i, task := range tasks { + if seen[task.VolumeID] { + t.Errorf("duplicate volume %d in task %d", task.VolumeID, i) + } + seen[task.VolumeID] = true + } +} + +// computeEffectiveCounts returns per-server volume counts after applying all planned moves. +// servers seeds the map so that empty destination servers (no volumes in metrics) are tracked. +func computeEffectiveCounts(servers []serverSpec, metrics []*types.VolumeHealthMetrics, tasks []*types.TaskDetectionResult) map[string]int { + // Build address → server ID mapping from the topology spec + addrToServer := make(map[string]string, len(servers)) + counts := make(map[string]int, len(servers)) + for _, s := range servers { + counts[s.id] = 0 + addrToServer[s.id+":8080"] = s.id + addrToServer[s.id] = s.id + } + for _, m := range metrics { + counts[m.Server]++ + } + for _, task := range tasks { + counts[task.Server]-- // source loses one + if task.TypedParams != nil && len(task.TypedParams.Targets) > 0 { + addr := task.TypedParams.Targets[0].Node + if serverID, ok := addrToServer[addr]; ok { + counts[serverID]++ + } + } + } + return counts +} + func createMockTopology(volumes ...*types.VolumeHealthMetrics) *topology.ActiveTopology { at := topology.NewActiveTopology(0) @@ -171,7 +311,7 @@ func TestDetection_MixedDiskTypes(t *testing.T) { ActiveTopology: at, } - tasks, err := Detection(metrics, clusterInfo, conf) + tasks, _, err := Detection(metrics, clusterInfo, conf, 100) if err != nil { t.Fatalf("Detection failed: %v", err) } @@ -231,25 +371,484 @@ func TestDetection_ImbalancedDiskType(t *testing.T) { ActiveTopology: at, } - tasks, err := Detection(metrics, clusterInfo, conf) + tasks, _, err := Detection(metrics, clusterInfo, conf, 100) if err != nil { t.Fatalf("Detection failed: %v", err) } if len(tasks) == 0 { t.Error("Expected tasks for imbalanced SSD cluster, got 0") - } else { - // Verify task details - task := tasks[0] + } + + // With 100 volumes on server-1 and 10 on server-2, avg=55, detection should + // propose multiple moves until imbalance drops below 20% threshold. + // All tasks should move volumes from ssd-server-1 to ssd-server-2. + if len(tasks) < 2 { + t.Errorf("Expected multiple balance tasks, got %d", len(tasks)) + } + + for i, task := range tasks { if task.VolumeID == 0 { - t.Error("Task has invalid VolumeID") + t.Errorf("Task %d has invalid VolumeID", i) } - // Expect volume to be moving from ssd-server-1 to ssd-server-2 if task.TypedParams.Sources[0].Node != "ssd-server-1:8080" { - t.Errorf("Expected source ssd-server-1:8080, got %s", task.TypedParams.Sources[0].Node) + t.Errorf("Task %d: expected source ssd-server-1:8080, got %s", i, task.TypedParams.Sources[0].Node) } if task.TypedParams.Targets[0].Node != "ssd-server-2:8080" { - t.Errorf("Expected target ssd-server-2:8080, got %s", task.TypedParams.Targets[0].Node) + t.Errorf("Task %d: expected target ssd-server-2:8080, got %s", i, task.TypedParams.Targets[0].Node) } } } + +func TestDetection_RespectsMaxResults(t *testing.T) { + // Setup: 2 SSD servers with big imbalance (100 vs 10) + metrics := []*types.VolumeHealthMetrics{} + + for i := 0; i < 100; i++ { + metrics = append(metrics, &types.VolumeHealthMetrics{ + VolumeID: uint32(i + 1), + Server: "ssd-server-1", + ServerAddress: "ssd-server-1:8080", + DiskType: "ssd", + Collection: "c1", + Size: 1024, + DataCenter: "dc1", + Rack: "rack1", + }) + } + for i := 0; i < 10; i++ { + metrics = append(metrics, &types.VolumeHealthMetrics{ + VolumeID: uint32(100 + i + 1), + Server: "ssd-server-2", + ServerAddress: "ssd-server-2:8080", + DiskType: "ssd", + Collection: "c1", + Size: 1024, + DataCenter: "dc1", + Rack: "rack1", + }) + } + + conf := &Config{ + BaseConfig: base.BaseConfig{ + Enabled: true, + ScanIntervalSeconds: 30, + MaxConcurrent: 1, + }, + MinServerCount: 2, + ImbalanceThreshold: 0.2, + } + + at := createMockTopology(metrics...) + clusterInfo := &types.ClusterInfo{ + ActiveTopology: at, + } + + // Request only 3 results — there are enough volumes to produce more, + // so truncated should be true. + tasks, truncated, err := Detection(metrics, clusterInfo, conf, 3) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + if len(tasks) != 3 { + t.Errorf("Expected exactly 3 tasks (maxResults=3), got %d", len(tasks)) + } + if !truncated { + t.Errorf("Expected truncated=true when maxResults caps results") + } + + // Verify truncated=false when detection finishes naturally (no cap) + at2 := createMockTopology(metrics...) + clusterInfo2 := &types.ClusterInfo{ActiveTopology: at2} + tasks2, truncated2, err := Detection(metrics, clusterInfo2, conf, 500) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + if truncated2 { + t.Errorf("Expected truncated=false when detection finishes naturally, got true (len=%d)", len(tasks2)) + } +} + +// --- Complicated scenario tests --- + +// TestDetection_ThreeServers_ConvergesToBalance verifies that with 3 servers +// (60/30/10 volumes) the algorithm moves volumes from the heaviest server first, +// then re-evaluates, potentially shifting from the second-heaviest too. +func TestDetection_ThreeServers_ConvergesToBalance(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"}, + } + + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 60)...) + metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 30)...) + metrics = append(metrics, makeVolumes("node-c", "hdd", "dc1", "rack1", "c1", 200, 10)...) + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + if len(tasks) < 2 { + t.Fatalf("Expected multiple tasks for 60/30/10 imbalance, got %d", len(tasks)) + } + + assertNoDuplicateVolumes(t, tasks) + + // Verify convergence: effective counts should be within 20% imbalance. + effective := computeEffectiveCounts(servers, metrics, tasks) + total := 0 + maxC, minC := 0, len(metrics) + for _, c := range effective { + total += c + if c > maxC { + maxC = c + } + if c < minC { + minC = c + } + } + avg := float64(total) / float64(len(effective)) + imbalance := float64(maxC-minC) / avg + if imbalance > 0.2 { + t.Errorf("After %d moves, cluster still imbalanced: effective=%v, imbalance=%.1f%%", + len(tasks), effective, imbalance*100) + } + + // All sources should be from the overloaded nodes, never node-c + for i, task := range tasks { + src := task.TypedParams.Sources[0].Node + if src == "node-c:8080" { + t.Errorf("Task %d: should not move FROM the underloaded server node-c", i) + } + } +} + +// TestDetection_SkipsPreExistingPendingTasks verifies that volumes with +// already-registered pending tasks in ActiveTopology are skipped. +func TestDetection_SkipsPreExistingPendingTasks(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + } + + // node-a has 20, node-b has 5 + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 20)...) + metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 5)...) + + at := buildTopology(servers, metrics) + + // Pre-register pending tasks for the first 15 volumes on node-a. + // This simulates a previous detection run that already planned moves. + for i := 0; i < 15; i++ { + volID := uint32(1 + i) + err := at.AddPendingTask(topology.TaskSpec{ + TaskID: fmt.Sprintf("existing-%d", volID), + TaskType: topology.TaskTypeBalance, + VolumeID: volID, + VolumeSize: 1024, + Sources: []topology.TaskSourceSpec{{ServerID: "node-a", DiskID: 1}}, + Destinations: []topology.TaskDestinationSpec{{ServerID: "node-b", DiskID: 2}}, + }) + if err != nil { + t.Fatalf("AddPendingTask failed: %v", err) + } + } + + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + // None of the results should reference a volume with an existing task (IDs 1-15). + for i, task := range tasks { + if task.VolumeID >= 1 && task.VolumeID <= 15 { + t.Errorf("Task %d: volume %d already has a pending task, should have been skipped", + i, task.VolumeID) + } + } + + // With 15 pending A→B moves, effective counts are A=5, B=20. + // Detection sees B as overloaded and may plan moves from B (5 volumes). + // Should produce a reasonable number of tasks without over-scheduling. + if len(tasks) > 5 { + t.Errorf("Expected at most 5 new tasks, got %d", len(tasks)) + } + if len(tasks) == 0 { + t.Errorf("Expected at least 1 new task since projected imbalance still exists") + } + + assertNoDuplicateVolumes(t, tasks) +} + +// TestDetection_NoDuplicateVolumesAcrossIterations verifies that the loop +// never selects the same volume twice, even under high maxResults. +func TestDetection_NoDuplicateVolumesAcrossIterations(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "ssd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "ssd", diskID: 2, dc: "dc1", rack: "rack1"}, + } + + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "ssd", "dc1", "rack1", "c1", 1, 50)...) + metrics = append(metrics, makeVolumes("node-b", "ssd", "dc1", "rack1", "c1", 100, 10)...) + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 200) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + if len(tasks) <= 1 { + t.Fatalf("Expected multiple tasks to verify no-duplicate invariant across iterations, got %d", len(tasks)) + } + + assertNoDuplicateVolumes(t, tasks) +} + +// TestDetection_ThreeServers_MaxServerShifts verifies that after enough moves +// from the top server, the algorithm detects a new max server and moves from it. +func TestDetection_ThreeServers_MaxServerShifts(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"}, + } + + // node-a: 40, node-b: 38, node-c: 10. avg ≈ 29.3 + // Initial imbalance = (40-10)/29.3 ≈ 1.02 → move from node-a. + // After a few moves from node-a, node-b becomes the new max and should be + // picked as the source. + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 40)...) + metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 38)...) + metrics = append(metrics, makeVolumes("node-c", "hdd", "dc1", "rack1", "c1", 200, 10)...) + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + if len(tasks) < 3 { + t.Fatalf("Expected several tasks for 40/38/10 imbalance, got %d", len(tasks)) + } + + // Collect source servers + sourceServers := make(map[string]int) + for _, task := range tasks { + sourceServers[task.Server]++ + } + + // Both node-a and node-b should appear as sources (max server shifts) + if sourceServers["node-a"] == 0 { + t.Error("Expected node-a to be a source for some moves") + } + if sourceServers["node-b"] == 0 { + t.Error("Expected node-b to be a source after node-a is drained enough") + } + if sourceServers["node-c"] > 0 { + t.Error("node-c (underloaded) should never be a source") + } + + assertNoDuplicateVolumes(t, tasks) +} + +// TestDetection_FourServers_DestinationSpreading verifies that with 4 servers +// (1 heavy, 3 light) the algorithm spreads moves across multiple destinations. +func TestDetection_FourServers_DestinationSpreading(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "ssd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "ssd", diskID: 2, dc: "dc1", rack: "rack2"}, + {id: "node-c", diskType: "ssd", diskID: 3, dc: "dc1", rack: "rack3"}, + {id: "node-d", diskType: "ssd", diskID: 4, dc: "dc1", rack: "rack4"}, + } + + // node-a: 80, b/c/d: 5 each. avg=23.75 + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "ssd", "dc1", "rack1", "c1", 1, 80)...) + metrics = append(metrics, makeVolumes("node-b", "ssd", "dc1", "rack2", "c1", 100, 5)...) + metrics = append(metrics, makeVolumes("node-c", "ssd", "dc1", "rack3", "c1", 200, 5)...) + metrics = append(metrics, makeVolumes("node-d", "ssd", "dc1", "rack4", "c1", 300, 5)...) + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + if len(tasks) < 5 { + t.Fatalf("Expected many tasks, got %d", len(tasks)) + } + + // Count destination servers + destServers := make(map[string]int) + for _, task := range tasks { + if task.TypedParams != nil && len(task.TypedParams.Targets) > 0 { + destServers[task.TypedParams.Targets[0].Node]++ + } + } + + // With 3 eligible destinations (b, c, d) and pending-task-aware scoring, + // moves should go to more than just one destination. + if len(destServers) < 2 { + t.Errorf("Expected moves to spread across destinations, but only got: %v", destServers) + } + + assertNoDuplicateVolumes(t, tasks) +} + +// TestDetection_ConvergenceVerification verifies that after all planned moves, +// the effective volume distribution is within the configured threshold. +func TestDetection_ConvergenceVerification(t *testing.T) { + tests := []struct { + name string + counts []int // volumes per server + threshold float64 + }{ + {"2-server-big-gap", []int{100, 10}, 0.2}, + {"3-server-staircase", []int{90, 50, 10}, 0.2}, + {"4-server-one-hot", []int{200, 20, 20, 20}, 0.2}, + {"3-server-tight-threshold", []int{30, 20, 10}, 0.1}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var servers []serverSpec + var metrics []*types.VolumeHealthMetrics + volBase := uint32(1) + + for i, count := range tt.counts { + id := fmt.Sprintf("node-%d", i) + servers = append(servers, serverSpec{ + id: id, diskType: "hdd", diskID: uint32(i + 1), + dc: "dc1", rack: "rack1", + }) + metrics = append(metrics, makeVolumes(id, "hdd", "dc1", "rack1", "c1", volBase, count)...) + volBase += uint32(count) + } + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + conf := defaultConf() + conf.ImbalanceThreshold = tt.threshold + + tasks, _, err := Detection(metrics, clusterInfo, conf, 500) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + if len(tasks) == 0 { + t.Fatal("Expected balance tasks, got 0") + } + + assertNoDuplicateVolumes(t, tasks) + + // Verify convergence + effective := computeEffectiveCounts(servers, metrics, tasks) + total := 0 + maxC, minC := 0, len(metrics) + for _, c := range effective { + total += c + if c > maxC { + maxC = c + } + if c < minC { + minC = c + } + } + avg := float64(total) / float64(len(effective)) + imbalance := float64(maxC-minC) / avg + if imbalance > tt.threshold { + t.Errorf("After %d moves, still imbalanced: effective=%v, imbalance=%.1f%% (threshold=%.1f%%)", + len(tasks), effective, imbalance*100, tt.threshold*100) + } + t.Logf("%s: %d moves, effective=%v, imbalance=%.1f%%", + tt.name, len(tasks), effective, imbalance*100) + }) + } +} + +// TestDetection_ExhaustedServerFallsThrough verifies that when the most +// overloaded server has all its volumes blocked by pre-existing tasks, +// the algorithm falls through to the next overloaded server instead of stopping. +func TestDetection_ExhaustedServerFallsThrough(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"}, + } + + // node-a: 50 volumes, node-b: 40 volumes, node-c: 10 volumes + // avg = 33.3, imbalance = (50-10)/33.3 = 1.2 > 0.2 + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 50)...) + metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 40)...) + metrics = append(metrics, makeVolumes("node-c", "hdd", "dc1", "rack1", "c1", 200, 10)...) + + at := buildTopology(servers, metrics) + + // Block ALL of node-a's volumes with pre-existing tasks + for i := 0; i < 50; i++ { + volID := uint32(1 + i) + err := at.AddPendingTask(topology.TaskSpec{ + TaskID: fmt.Sprintf("existing-%d", volID), + TaskType: topology.TaskTypeBalance, + VolumeID: volID, + VolumeSize: 1024, + Sources: []topology.TaskSourceSpec{{ServerID: "node-a", DiskID: 1}}, + Destinations: []topology.TaskDestinationSpec{{ServerID: "node-c", DiskID: 3}}, + }) + if err != nil { + t.Fatalf("AddPendingTask failed: %v", err) + } + } + + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + // node-a is exhausted, but node-b (40 vols) vs node-c (10 vols) is still + // imbalanced. The algorithm should fall through and move from node-b. + if len(tasks) == 0 { + t.Fatal("Expected tasks from node-b after node-a was exhausted, got 0") + } + + for i, task := range tasks { + if task.Server == "node-a" { + t.Errorf("Task %d: should not move FROM node-a (all volumes blocked)", i) + } + } + + // Verify node-b is the source + hasNodeBSource := false + for _, task := range tasks { + if task.Server == "node-b" { + hasNodeBSource = true + break + } + } + if !hasNodeBSource { + t.Error("Expected node-b to be a source after node-a was exhausted") + } + + assertNoDuplicateVolumes(t, tasks) + t.Logf("Created %d tasks from node-b after node-a exhausted", len(tasks)) +} diff --git a/weed/worker/tasks/balance/register.go b/weed/worker/tasks/balance/register.go index 68e3458e9..ecd0216eb 100644 --- a/weed/worker/tasks/balance/register.go +++ b/weed/worker/tasks/balance/register.go @@ -58,7 +58,10 @@ func RegisterBalanceTask() { dialOpt, ), nil }, - DetectionFunc: Detection, + DetectionFunc: func(metrics []*types.VolumeHealthMetrics, info *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) { + results, _, err := Detection(metrics, info, config, 0) + return results, err + }, ScanInterval: 30 * time.Minute, SchedulingFunc: Scheduling, MaxConcurrent: 1,