* refactoring * add ec shard size * address comments * passing task id There seems to be a disconnect between the pending tasks created in ActiveTopology and the TaskDetectionResult returned by this function. A taskID is generated locally and used to create pending tasks via AddPendingECShardTask, but this taskID is not stored in the TaskDetectionResult or passed along in any way. This makes it impossible for the worker that eventually executes the task to know which pending task in ActiveTopology it corresponds to. Without the correct taskID, the worker cannot call AssignTask or CompleteTask on the master, breaking the entire task lifecycle and capacity management feature. A potential solution is to add a TaskID field to TaskDetectionResult and worker_pb.TaskParams, ensuring the ID is propagated from detection to execution. * 1 source multiple destinations * task supports multi source and destination * ec needs to clean up previous shards * use erasure coding constants * getPlanningCapacityUnsafe getEffectiveAvailableCapacityUnsafe should return StorageSlotChange for calculation * use CanAccommodate to calculate * remove dead code * address comments * fix Mutex Copying in Protobuf Structs * use constants * fix estimatedSize The calculation for estimatedSize only considers source.EstimatedSize and dest.StorageChange, but omits dest.EstimatedSize. The TaskDestination struct has an EstimatedSize field, which seems to be ignored here. This could lead to an incorrect estimation of the total size of data involved in tasks on a disk. The loop should probably also include estimatedSize += dest.EstimatedSize. * at.assignTaskToDisk(task) * refactoring * Update weed/admin/topology/internal.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * fail fast * fix compilation * Update weed/worker/tasks/erasure_coding/detection.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * indexes for volume and shard locations * dedup with ToVolumeSlots * return an additional boolean to indicate success, or an error * Update abstract_sql_store.go * fix * Update weed/worker/tasks/erasure_coding/detection.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/admin/topology/task_management.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * faster findVolumeDisk * Update weed/worker/tasks/erasure_coding/detection.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/admin/topology/storage_slot_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * refactor * simplify * remove unused GetDiskStorageImpact function * refactor * add comments * Update weed/admin/topology/storage_impact.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/admin/topology/storage_slot_test.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update storage_impact.go * AddPendingTask The unified AddPendingTask function now serves as the single entry point for all task creation, successfully consolidating the previously separate functions while maintaining full functionality and improving code organization. --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
265 lines
8.9 KiB
Go
265 lines
8.9 KiB
Go
package topology
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
)
|
|
|
|
// AssignTask moves a task from pending to assigned and reserves capacity
|
|
func (at *ActiveTopology) AssignTask(taskID string) error {
|
|
at.mutex.Lock()
|
|
defer at.mutex.Unlock()
|
|
|
|
task, exists := at.pendingTasks[taskID]
|
|
if !exists {
|
|
return fmt.Errorf("pending task %s not found", taskID)
|
|
}
|
|
|
|
// Check if all destination disks have sufficient capacity to reserve
|
|
for _, dest := range task.Destinations {
|
|
targetKey := fmt.Sprintf("%s:%d", dest.TargetServer, dest.TargetDisk)
|
|
if targetDisk, exists := at.disks[targetKey]; exists {
|
|
availableCapacity := at.getEffectiveAvailableCapacityUnsafe(targetDisk)
|
|
|
|
// Check if we have enough total capacity using the improved unified comparison
|
|
if !availableCapacity.CanAccommodate(dest.StorageChange) {
|
|
return fmt.Errorf("insufficient capacity on target disk %s:%d. Available: %+v, Required: %+v",
|
|
dest.TargetServer, dest.TargetDisk, availableCapacity, dest.StorageChange)
|
|
}
|
|
} else if dest.TargetServer != "" {
|
|
// Fail fast if destination disk is not found in topology
|
|
return fmt.Errorf("destination disk %s not found in topology", targetKey)
|
|
}
|
|
}
|
|
|
|
// Move task to assigned and reserve capacity
|
|
delete(at.pendingTasks, taskID)
|
|
task.Status = TaskStatusInProgress
|
|
at.assignedTasks[taskID] = task
|
|
at.reassignTaskStates()
|
|
|
|
// Log capacity reservation information for all sources and destinations
|
|
totalSourceImpact := StorageSlotChange{}
|
|
totalDestImpact := StorageSlotChange{}
|
|
for _, source := range task.Sources {
|
|
totalSourceImpact.AddInPlace(source.StorageChange)
|
|
}
|
|
for _, dest := range task.Destinations {
|
|
totalDestImpact.AddInPlace(dest.StorageChange)
|
|
}
|
|
|
|
glog.V(2).Infof("Task %s assigned and capacity reserved: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)",
|
|
taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots,
|
|
len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots)
|
|
|
|
return nil
|
|
}
|
|
|
|
// CompleteTask moves a task from assigned to recent and releases reserved capacity
|
|
// NOTE: This only releases the reserved capacity. The actual topology update (VolumeCount changes)
|
|
// should be handled by the master when it receives the task completion notification.
|
|
func (at *ActiveTopology) CompleteTask(taskID string) error {
|
|
at.mutex.Lock()
|
|
defer at.mutex.Unlock()
|
|
|
|
task, exists := at.assignedTasks[taskID]
|
|
if !exists {
|
|
return fmt.Errorf("assigned task %s not found", taskID)
|
|
}
|
|
|
|
// Release reserved capacity by moving task to completed state
|
|
delete(at.assignedTasks, taskID)
|
|
task.Status = TaskStatusCompleted
|
|
task.CompletedAt = time.Now()
|
|
at.recentTasks[taskID] = task
|
|
at.reassignTaskStates()
|
|
|
|
// Log capacity release information for all sources and destinations
|
|
totalSourceImpact := StorageSlotChange{}
|
|
totalDestImpact := StorageSlotChange{}
|
|
for _, source := range task.Sources {
|
|
totalSourceImpact.AddInPlace(source.StorageChange)
|
|
}
|
|
for _, dest := range task.Destinations {
|
|
totalDestImpact.AddInPlace(dest.StorageChange)
|
|
}
|
|
|
|
glog.V(2).Infof("Task %s completed and capacity released: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)",
|
|
taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots,
|
|
len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots)
|
|
|
|
// Clean up old recent tasks
|
|
at.cleanupRecentTasks()
|
|
|
|
return nil
|
|
}
|
|
|
|
// ApplyActualStorageChange updates the topology to reflect actual storage changes after task completion
|
|
// This should be called when the master updates the topology with new VolumeCount information
|
|
func (at *ActiveTopology) ApplyActualStorageChange(nodeID string, diskID uint32, volumeCountChange int64) {
|
|
at.mutex.Lock()
|
|
defer at.mutex.Unlock()
|
|
|
|
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
|
|
if disk, exists := at.disks[diskKey]; exists && disk.DiskInfo != nil && disk.DiskInfo.DiskInfo != nil {
|
|
oldCount := disk.DiskInfo.DiskInfo.VolumeCount
|
|
disk.DiskInfo.DiskInfo.VolumeCount += volumeCountChange
|
|
|
|
glog.V(2).Infof("Applied actual storage change on disk %s: volume_count %d -> %d (change: %+d)",
|
|
diskKey, oldCount, disk.DiskInfo.DiskInfo.VolumeCount, volumeCountChange)
|
|
}
|
|
}
|
|
|
|
// AddPendingTask is the unified function that handles both simple and complex task creation
|
|
func (at *ActiveTopology) AddPendingTask(spec TaskSpec) error {
|
|
// Validation
|
|
if len(spec.Sources) == 0 {
|
|
return fmt.Errorf("at least one source is required")
|
|
}
|
|
if len(spec.Destinations) == 0 {
|
|
return fmt.Errorf("at least one destination is required")
|
|
}
|
|
|
|
at.mutex.Lock()
|
|
defer at.mutex.Unlock()
|
|
|
|
// Build sources array
|
|
sources := make([]TaskSource, len(spec.Sources))
|
|
for i, sourceSpec := range spec.Sources {
|
|
var storageImpact StorageSlotChange
|
|
var estimatedSize int64
|
|
|
|
if sourceSpec.StorageImpact != nil {
|
|
// Use manually specified impact
|
|
storageImpact = *sourceSpec.StorageImpact
|
|
} else {
|
|
// Auto-calculate based on task type and cleanup type
|
|
storageImpact = at.calculateSourceStorageImpact(spec.TaskType, sourceSpec.CleanupType, spec.VolumeSize)
|
|
}
|
|
|
|
if sourceSpec.EstimatedSize != nil {
|
|
estimatedSize = *sourceSpec.EstimatedSize
|
|
} else {
|
|
estimatedSize = spec.VolumeSize // Default to volume size
|
|
}
|
|
|
|
sources[i] = TaskSource{
|
|
SourceServer: sourceSpec.ServerID,
|
|
SourceDisk: sourceSpec.DiskID,
|
|
StorageChange: storageImpact,
|
|
EstimatedSize: estimatedSize,
|
|
}
|
|
}
|
|
|
|
// Build destinations array
|
|
destinations := make([]TaskDestination, len(spec.Destinations))
|
|
for i, destSpec := range spec.Destinations {
|
|
var storageImpact StorageSlotChange
|
|
var estimatedSize int64
|
|
|
|
if destSpec.StorageImpact != nil {
|
|
// Use manually specified impact
|
|
storageImpact = *destSpec.StorageImpact
|
|
} else {
|
|
// Auto-calculate based on task type
|
|
_, storageImpact = CalculateTaskStorageImpact(spec.TaskType, spec.VolumeSize)
|
|
}
|
|
|
|
if destSpec.EstimatedSize != nil {
|
|
estimatedSize = *destSpec.EstimatedSize
|
|
} else {
|
|
estimatedSize = spec.VolumeSize // Default to volume size
|
|
}
|
|
|
|
destinations[i] = TaskDestination{
|
|
TargetServer: destSpec.ServerID,
|
|
TargetDisk: destSpec.DiskID,
|
|
StorageChange: storageImpact,
|
|
EstimatedSize: estimatedSize,
|
|
}
|
|
}
|
|
|
|
// Create the task
|
|
task := &taskState{
|
|
VolumeID: spec.VolumeID,
|
|
TaskType: spec.TaskType,
|
|
Status: TaskStatusPending,
|
|
StartedAt: time.Now(),
|
|
EstimatedSize: spec.VolumeSize,
|
|
Sources: sources,
|
|
Destinations: destinations,
|
|
}
|
|
|
|
at.pendingTasks[spec.TaskID] = task
|
|
at.assignTaskToDisk(task)
|
|
|
|
glog.V(2).Infof("Added pending %s task %s: volume %d, %d sources, %d destinations",
|
|
spec.TaskType, spec.TaskID, spec.VolumeID, len(sources), len(destinations))
|
|
|
|
return nil
|
|
}
|
|
|
|
// calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type
|
|
func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange {
|
|
switch taskType {
|
|
case TaskTypeErasureCoding:
|
|
switch cleanupType {
|
|
case CleanupVolumeReplica:
|
|
impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
|
|
return impact
|
|
case CleanupECShards:
|
|
return CalculateECShardCleanupImpact(volumeSize)
|
|
default:
|
|
impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
|
|
return impact
|
|
}
|
|
default:
|
|
impact, _ := CalculateTaskStorageImpact(taskType, volumeSize)
|
|
return impact
|
|
}
|
|
}
|
|
|
|
// SourceCleanupType indicates what type of data needs to be cleaned up from a source
|
|
type SourceCleanupType int
|
|
|
|
const (
|
|
CleanupVolumeReplica SourceCleanupType = iota // Clean up volume replica (frees volume slots)
|
|
CleanupECShards // Clean up existing EC shards (frees shard slots)
|
|
)
|
|
|
|
// TaskSourceSpec represents a source specification for task creation
|
|
type TaskSourceSpec struct {
|
|
ServerID string
|
|
DiskID uint32
|
|
CleanupType SourceCleanupType // For EC: volume replica vs existing shards
|
|
StorageImpact *StorageSlotChange // Optional: manual override
|
|
EstimatedSize *int64 // Optional: manual override
|
|
}
|
|
|
|
// TaskDestinationSpec represents a destination specification for task creation
|
|
type TaskDestinationSpec struct {
|
|
ServerID string
|
|
DiskID uint32
|
|
StorageImpact *StorageSlotChange // Optional: manual override
|
|
EstimatedSize *int64 // Optional: manual override
|
|
}
|
|
|
|
// TaskSpec represents a complete task specification
|
|
type TaskSpec struct {
|
|
TaskID string
|
|
TaskType TaskType
|
|
VolumeID uint32
|
|
VolumeSize int64 // Used for auto-calculation when manual impacts not provided
|
|
Sources []TaskSourceSpec // Can be single or multiple
|
|
Destinations []TaskDestinationSpec // Can be single or multiple
|
|
}
|
|
|
|
// TaskSourceLocation represents a source location for task creation (DEPRECATED: use TaskSourceSpec)
|
|
type TaskSourceLocation struct {
|
|
ServerID string
|
|
DiskID uint32
|
|
CleanupType SourceCleanupType // What type of cleanup is needed
|
|
}
|