* fix: volume balance detection now returns multiple tasks per run (#8551) Previously, detectForDiskType() returned at most 1 balance task per disk type, making the MaxJobsPerDetection setting ineffective. The detection loop now iterates within each disk type, planning multiple moves until the imbalance drops below threshold or maxResults is reached. Effective volume counts are adjusted after each planned move so the algorithm correctly re-evaluates which server is overloaded. * fix: factor pending tasks into destination scoring and use UnixNano for task IDs - Use UnixNano instead of Unix for task IDs to avoid collisions when multiple tasks are created within the same second - Adjust calculateBalanceScore to include LoadCount (pending + assigned tasks) in the utilization estimate, so the destination picker avoids stacking multiple planned moves onto the same target disk * test: add comprehensive balance detection tests for complex scenarios Cover multi-server convergence, max-server shifting, destination spreading, pre-existing pending task skipping, no-duplicate-volume invariant, and parameterized convergence verification across different cluster shapes and thresholds. * fix: address PR review findings in balance detection - hasMore flag: compute from len(results) >= maxResults so the scheduler knows more pages may exist, matching vacuum/EC handler pattern - Exhausted server fallthrough: when no eligible volumes remain on the current maxServer (all have pending tasks) or destination planning fails, mark the server as exhausted and continue to the next overloaded server instead of stopping the entire detection loop - Return canonical destination server ID directly from createBalanceTask instead of resolving via findServerIDByAddress, eliminating the fragile address→ID lookup for adjustment tracking - Fix bestScore sentinel: use math.Inf(-1) instead of -1.0 so disks with negative scores (high pending load, same rack/DC) are still selected as the best available destination - Add TestDetection_ExhaustedServerFallsThrough covering the scenario where the top server's volumes are all blocked by pre-existing tasks * test: fix computeEffectiveCounts and add len guard in no-duplicate test - computeEffectiveCounts now takes a servers slice to seed counts for all known servers (including empty ones) and uses an address→ID map from the topology spec instead of scanning metrics, so destination servers with zero initial volumes are tracked correctly - TestDetection_NoDuplicateVolumesAcrossIterations now asserts len > 1 before checking duplicates, so the test actually fails if Detection regresses to returning a single task * fix: remove redundant HasAnyTask check in createBalanceTask The HasAnyTask check in createBalanceTask duplicated the same check already performed in detectForDiskType's volume selection loop. Since detection runs single-threaded (MaxDetectionConcurrency: 1), no race can occur between the two points. * fix: consistent hasMore pattern and remove double-counted LoadCount in scoring - Adopt vacuum_handler's hasMore pattern: over-fetch by 1, check len > maxResults, and truncate — consistent truncation semantics - Remove direct LoadCount penalty in calculateBalanceScore since LoadCount is already factored into effectiveVolumeCount for utilization scoring; bump utilization weight from 40 to 50 to compensate for the removed 10-point load penalty * fix: handle zero maxResults as no-cap, emit trace after trim, seed empty servers - When MaxResults is 0 (omitted), treat as no explicit cap instead of defaulting to 1; only apply the +1 over-fetch probe when caller supplies a positive limit - Move decision trace emission after hasMore/trim so the trace accurately reflects the returned proposals - Seed serverVolumeCounts from ActiveTopology so servers that have a matching disk type but zero volumes are included in the imbalance calculation and MinServerCount check * fix: nil-guard clusterInfo, uncap legacy DetectionFunc, deterministic disk type order - Add early nil guard for clusterInfo in Detection to prevent panics in downstream helpers (detectForDiskType, createBalanceTask) - Change register.go DetectionFunc wrapper from maxResults=1 to 0 (no cap) so the legacy code path returns all detected tasks - Sort disk type keys before iteration so results are deterministic when maxResults spans multiple disk types (HDD/SSD) * fix: don't over-fetch in stateful detection to avoid orphaned pending tasks Detection registers planned moves in ActiveTopology via AddPendingTask, so requesting maxResults+1 would create an extra pending task that gets discarded during trim. Use len(results) >= maxResults as the hasMore signal instead, which is correct since Detection already caps internally. * fix: return explicit truncated flag from Detection instead of approximating Detection now returns (results, truncated, error) where truncated is true only when the loop stopped because it hit maxResults, not when it ran out of work naturally. This eliminates false hasMore signals when detection happens to produce exactly maxResults results by resolving the imbalance. * cleanup: simplify detection logic and remove redundancies - Remove redundant clusterInfo nil check in detectForDiskType since Detection already guards against nil clusterInfo - Remove adjustments loop for destination servers not in serverVolumeCounts — topology seeding ensures all servers with matching disk type are already present - Merge two-loop min/max calculation into a single loop: min across all servers, max only among non-exhausted servers - Replace magic number 100 with len(metrics) for minC initialization in convergence test * fix: accurate truncation flag, deterministic server order, indexed volume lookup - Track balanced flag to distinguish "hit maxResults cap" from "cluster balanced at exactly maxResults" — truncated is only true when there's genuinely more work to do - Sort servers for deterministic iteration and tie-breaking when multiple servers have equal volume counts - Pre-index volumes by server with per-server cursors to avoid O(maxResults * volumes) rescanning on each iteration - Add truncation flag assertions to RespectsMaxResults test: true when capped, false when detection finishes naturally * fix: seed trace server counts from ActiveTopology to match detection logic The decision trace was building serverVolumeCounts only from metrics, missing zero-volume servers seeded from ActiveTopology by Detection. This could cause the trace to report wrong server counts, incorrect imbalance ratios, or spurious "too few servers" messages. Pass activeTopology into the trace function and seed server counts the same way Detection does. * fix: don't exhaust server on per-volume planning failure, sort volumes by ID - When createBalanceTask returns nil, continue to the next volume on the same server instead of marking the entire server as exhausted. The failure may be volume-specific (not found in topology, pending task registration failed) and other volumes on the server may still be viable candidates. - Sort each server's volume slice by VolumeID after pre-indexing so volume selection is fully deterministic regardless of input order. * fix: use require instead of assert to prevent nil dereference panic in CORS test The test used assert.NoError (non-fatal) for GetBucketCors, then immediately accessed getResp.CORSRules. When the API returns an error, getResp is nil causing a panic. Switch to require.NoError/NotNil/Len so the test stops before dereferencing a nil response. * fix: deterministic disk tie-breaking and stronger pre-existing task test - Sort available disks by NodeID then DiskID before scoring so destination selection is deterministic when two disks score equally - Add task count bounds assertion to SkipsPreExistingPendingTasks test: with 15 of 20 volumes already having pending tasks, at most 5 new tasks should be created and at least 1 (imbalance still exists) * fix: seed adjustments from existing pending/assigned tasks to prevent over-scheduling Detection now calls ActiveTopology.GetTaskServerAdjustments() to initialize the adjustments map with source/destination deltas from existing pending and assigned balance tasks. This ensures effectiveCounts reflects in-flight moves, preventing the algorithm from planning additional moves in the same direction when prior moves already address the imbalance. Added GetTaskServerAdjustments(taskType) to ActiveTopology which iterates pending and assigned tasks, decrementing source servers and incrementing destination servers for the given task type.
365 lines
12 KiB
Go
365 lines
12 KiB
Go
package topology
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
)
|
|
|
|
// AssignTask moves a task from pending to assigned and reserves capacity
|
|
func (at *ActiveTopology) AssignTask(taskID string) error {
|
|
at.mutex.Lock()
|
|
defer at.mutex.Unlock()
|
|
|
|
task, exists := at.pendingTasks[taskID]
|
|
if !exists {
|
|
return fmt.Errorf("pending task %s not found", taskID)
|
|
}
|
|
|
|
// Skip capacity check if topology hasn't been populated yet
|
|
if len(at.disks) == 0 {
|
|
glog.Warningf("AssignTask %s: topology has no disks yet, skipping capacity check", taskID)
|
|
} else {
|
|
// Check if all destination disks have sufficient capacity to reserve
|
|
for _, dest := range task.Destinations {
|
|
targetKey := fmt.Sprintf("%s:%d", dest.TargetServer, dest.TargetDisk)
|
|
if targetDisk, exists := at.disks[targetKey]; exists {
|
|
availableCapacity := at.getEffectiveAvailableCapacityUnsafe(targetDisk)
|
|
|
|
// Check if we have enough total capacity using the improved unified comparison
|
|
if !availableCapacity.CanAccommodate(dest.StorageChange) {
|
|
return fmt.Errorf("insufficient capacity on target disk %s:%d. Available: %+v, Required: %+v",
|
|
dest.TargetServer, dest.TargetDisk, availableCapacity, dest.StorageChange)
|
|
}
|
|
} else if dest.TargetServer != "" {
|
|
// Fail fast if destination disk is not found in topology
|
|
var existingKeys []string
|
|
for k := range at.disks {
|
|
existingKeys = append(existingKeys, k)
|
|
}
|
|
glog.Warningf("destination disk %s not found in topology. Existing disk keys: %v", targetKey, existingKeys)
|
|
return fmt.Errorf("destination disk %s not found in topology", targetKey)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Move task to assigned and reserve capacity
|
|
delete(at.pendingTasks, taskID)
|
|
task.Status = TaskStatusInProgress
|
|
at.assignedTasks[taskID] = task
|
|
at.reassignTaskStates()
|
|
|
|
// Log capacity reservation information for all sources and destinations
|
|
totalSourceImpact := StorageSlotChange{}
|
|
totalDestImpact := StorageSlotChange{}
|
|
for _, source := range task.Sources {
|
|
totalSourceImpact.AddInPlace(source.StorageChange)
|
|
}
|
|
for _, dest := range task.Destinations {
|
|
totalDestImpact.AddInPlace(dest.StorageChange)
|
|
}
|
|
|
|
glog.V(2).Infof("Task %s assigned and capacity reserved: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)",
|
|
taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots,
|
|
len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots)
|
|
|
|
return nil
|
|
}
|
|
|
|
// CompleteTask moves a task from assigned to recent and releases reserved capacity
|
|
// NOTE: This only releases the reserved capacity. The actual topology update (VolumeCount changes)
|
|
// should be handled by the master when it receives the task completion notification.
|
|
func (at *ActiveTopology) CompleteTask(taskID string) error {
|
|
at.mutex.Lock()
|
|
defer at.mutex.Unlock()
|
|
|
|
task, exists := at.assignedTasks[taskID]
|
|
if !exists {
|
|
// If not in assigned tasks, check pending tasks
|
|
if task, exists = at.pendingTasks[taskID]; exists {
|
|
delete(at.pendingTasks, taskID)
|
|
} else {
|
|
return fmt.Errorf("task %s not found in assigned or pending tasks", taskID)
|
|
}
|
|
} else {
|
|
delete(at.assignedTasks, taskID)
|
|
}
|
|
|
|
// Release reserved capacity by moving task to completed state
|
|
task.Status = TaskStatusCompleted
|
|
task.CompletedAt = time.Now()
|
|
at.recentTasks[taskID] = task
|
|
at.reassignTaskStates()
|
|
|
|
// Log capacity release information for all sources and destinations
|
|
totalSourceImpact := StorageSlotChange{}
|
|
totalDestImpact := StorageSlotChange{}
|
|
for _, source := range task.Sources {
|
|
totalSourceImpact.AddInPlace(source.StorageChange)
|
|
}
|
|
for _, dest := range task.Destinations {
|
|
totalDestImpact.AddInPlace(dest.StorageChange)
|
|
}
|
|
|
|
glog.V(2).Infof("Task %s completed and capacity released: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)",
|
|
taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots,
|
|
len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots)
|
|
|
|
// Clean up old recent tasks
|
|
at.cleanupRecentTasks()
|
|
|
|
return nil
|
|
}
|
|
|
|
// ApplyActualStorageChange updates the topology to reflect actual storage changes after task completion
|
|
// This should be called when the master updates the topology with new VolumeCount information
|
|
func (at *ActiveTopology) ApplyActualStorageChange(nodeID string, diskID uint32, volumeCountChange int64) {
|
|
at.mutex.Lock()
|
|
defer at.mutex.Unlock()
|
|
|
|
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
|
|
if disk, exists := at.disks[diskKey]; exists && disk.DiskInfo != nil && disk.DiskInfo.DiskInfo != nil {
|
|
oldCount := disk.DiskInfo.DiskInfo.VolumeCount
|
|
disk.DiskInfo.DiskInfo.VolumeCount += volumeCountChange
|
|
|
|
glog.V(2).Infof("Applied actual storage change on disk %s: volume_count %d -> %d (change: %+d)",
|
|
diskKey, oldCount, disk.DiskInfo.DiskInfo.VolumeCount, volumeCountChange)
|
|
}
|
|
}
|
|
|
|
// AddPendingTask is the unified function that handles both simple and complex task creation
|
|
func (at *ActiveTopology) AddPendingTask(spec TaskSpec) error {
|
|
// Validation
|
|
if len(spec.Sources) == 0 {
|
|
return fmt.Errorf("at least one source is required")
|
|
}
|
|
if len(spec.Destinations) == 0 {
|
|
return fmt.Errorf("at least one destination is required")
|
|
}
|
|
|
|
at.mutex.Lock()
|
|
defer at.mutex.Unlock()
|
|
|
|
// Build sources array
|
|
sources := make([]TaskSource, len(spec.Sources))
|
|
for i, sourceSpec := range spec.Sources {
|
|
var storageImpact StorageSlotChange
|
|
var estimatedSize int64
|
|
|
|
if sourceSpec.StorageImpact != nil {
|
|
// Use manually specified impact
|
|
storageImpact = *sourceSpec.StorageImpact
|
|
} else {
|
|
// Auto-calculate based on task type and cleanup type
|
|
storageImpact = at.calculateSourceStorageImpact(spec.TaskType, sourceSpec.CleanupType, spec.VolumeSize)
|
|
}
|
|
|
|
if sourceSpec.EstimatedSize != nil {
|
|
estimatedSize = *sourceSpec.EstimatedSize
|
|
} else {
|
|
estimatedSize = spec.VolumeSize // Default to volume size
|
|
}
|
|
|
|
sources[i] = TaskSource{
|
|
SourceServer: sourceSpec.ServerID,
|
|
SourceDisk: sourceSpec.DiskID,
|
|
StorageChange: storageImpact,
|
|
EstimatedSize: estimatedSize,
|
|
}
|
|
}
|
|
|
|
// Build destinations array
|
|
destinations := make([]TaskDestination, len(spec.Destinations))
|
|
for i, destSpec := range spec.Destinations {
|
|
var storageImpact StorageSlotChange
|
|
var estimatedSize int64
|
|
|
|
if destSpec.StorageImpact != nil {
|
|
// Use manually specified impact
|
|
storageImpact = *destSpec.StorageImpact
|
|
} else {
|
|
// Auto-calculate based on task type
|
|
_, storageImpact = CalculateTaskStorageImpact(spec.TaskType, spec.VolumeSize)
|
|
}
|
|
|
|
if destSpec.EstimatedSize != nil {
|
|
estimatedSize = *destSpec.EstimatedSize
|
|
} else {
|
|
estimatedSize = spec.VolumeSize // Default to volume size
|
|
}
|
|
|
|
destinations[i] = TaskDestination{
|
|
TargetServer: destSpec.ServerID,
|
|
TargetDisk: destSpec.DiskID,
|
|
StorageChange: storageImpact,
|
|
EstimatedSize: estimatedSize,
|
|
}
|
|
}
|
|
|
|
// Create the task
|
|
task := &taskState{
|
|
VolumeID: spec.VolumeID,
|
|
TaskType: spec.TaskType,
|
|
Status: TaskStatusPending,
|
|
StartedAt: time.Now(),
|
|
EstimatedSize: spec.VolumeSize,
|
|
Sources: sources,
|
|
Destinations: destinations,
|
|
}
|
|
|
|
at.pendingTasks[spec.TaskID] = task
|
|
at.assignTaskToDisk(task)
|
|
|
|
return nil
|
|
}
|
|
|
|
// RestoreMaintenanceTask restores a task from persistent storage into the active topology
|
|
func (at *ActiveTopology) RestoreMaintenanceTask(taskID string, volumeID uint32, taskType TaskType, status TaskStatus, sources []TaskSource, destinations []TaskDestination, estimatedSize int64) error {
|
|
at.mutex.Lock()
|
|
defer at.mutex.Unlock()
|
|
|
|
task := &taskState{
|
|
VolumeID: volumeID,
|
|
TaskType: taskType,
|
|
Status: status,
|
|
StartedAt: time.Now(), // Fallback if not provided, will be updated by heartbeats
|
|
EstimatedSize: estimatedSize,
|
|
Sources: sources,
|
|
Destinations: destinations,
|
|
}
|
|
|
|
if status == TaskStatusInProgress {
|
|
at.assignedTasks[taskID] = task
|
|
} else if status == TaskStatusPending {
|
|
at.pendingTasks[taskID] = task
|
|
} else {
|
|
return nil // Ignore other statuses for topology tracking
|
|
}
|
|
|
|
// Re-register task with disks for capacity tracking
|
|
at.assignTaskToDisk(task)
|
|
|
|
glog.V(1).Infof("Restored %s task %s in topology: volume %d, %d sources, %d destinations",
|
|
taskType, taskID, volumeID, len(sources), len(destinations))
|
|
|
|
return nil
|
|
}
|
|
|
|
// HasTask checks if there is any pending or assigned task for the given volume and task type.
|
|
// If taskType is TaskTypeNone, it checks for ANY task type.
|
|
func (at *ActiveTopology) HasTask(volumeID uint32, taskType TaskType) bool {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
for _, task := range at.pendingTasks {
|
|
if task.VolumeID == volumeID && (taskType == TaskTypeNone || task.TaskType == taskType) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
for _, task := range at.assignedTasks {
|
|
if task.VolumeID == volumeID && (taskType == TaskTypeNone || task.TaskType == taskType) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// HasAnyTask checks if there is any pending or assigned task for the given volume across all types.
|
|
func (at *ActiveTopology) HasAnyTask(volumeID uint32) bool {
|
|
return at.HasTask(volumeID, TaskTypeNone)
|
|
}
|
|
|
|
// GetTaskServerAdjustments returns per-server volume count adjustments for
|
|
// pending and assigned tasks of the given type. For each task, source servers
|
|
// are decremented and destination servers are incremented, reflecting the
|
|
// projected volume distribution once in-flight tasks complete.
|
|
func (at *ActiveTopology) GetTaskServerAdjustments(taskType TaskType) map[string]int {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
adjustments := make(map[string]int)
|
|
for _, task := range at.pendingTasks {
|
|
if task.TaskType != taskType {
|
|
continue
|
|
}
|
|
for _, src := range task.Sources {
|
|
adjustments[src.SourceServer]--
|
|
}
|
|
for _, dst := range task.Destinations {
|
|
adjustments[dst.TargetServer]++
|
|
}
|
|
}
|
|
for _, task := range at.assignedTasks {
|
|
if task.TaskType != taskType {
|
|
continue
|
|
}
|
|
for _, src := range task.Sources {
|
|
adjustments[src.SourceServer]--
|
|
}
|
|
for _, dst := range task.Destinations {
|
|
adjustments[dst.TargetServer]++
|
|
}
|
|
}
|
|
return adjustments
|
|
}
|
|
|
|
// calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type
|
|
func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange {
|
|
switch taskType {
|
|
case TaskTypeErasureCoding:
|
|
switch cleanupType {
|
|
case CleanupVolumeReplica:
|
|
impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
|
|
return impact
|
|
case CleanupECShards:
|
|
return CalculateECShardCleanupImpact(volumeSize)
|
|
default:
|
|
impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
|
|
return impact
|
|
}
|
|
default:
|
|
impact, _ := CalculateTaskStorageImpact(taskType, volumeSize)
|
|
return impact
|
|
}
|
|
}
|
|
|
|
// SourceCleanupType indicates what type of data needs to be cleaned up from a source
|
|
type SourceCleanupType int
|
|
|
|
const (
|
|
CleanupVolumeReplica SourceCleanupType = iota // Clean up volume replica (frees volume slots)
|
|
CleanupECShards // Clean up existing EC shards (frees shard slots)
|
|
)
|
|
|
|
// TaskSourceSpec represents a source specification for task creation
|
|
type TaskSourceSpec struct {
|
|
ServerID string
|
|
DiskID uint32
|
|
DataCenter string // Data center of the source server
|
|
Rack string // Rack of the source server
|
|
CleanupType SourceCleanupType // For EC: volume replica vs existing shards
|
|
StorageImpact *StorageSlotChange // Optional: manual override
|
|
EstimatedSize *int64 // Optional: manual override
|
|
}
|
|
|
|
// TaskDestinationSpec represents a destination specification for task creation
|
|
type TaskDestinationSpec struct {
|
|
ServerID string
|
|
DiskID uint32
|
|
StorageImpact *StorageSlotChange // Optional: manual override
|
|
EstimatedSize *int64 // Optional: manual override
|
|
}
|
|
|
|
// TaskSpec represents a complete task specification
|
|
type TaskSpec struct {
|
|
TaskID string
|
|
TaskType TaskType
|
|
VolumeID uint32
|
|
VolumeSize int64 // Used for auto-calculation when manual impacts not provided
|
|
Sources []TaskSourceSpec // Can be single or multiple
|
|
Destinations []TaskDestinationSpec // Can be single or multiple
|
|
}
|