Files
seaweedFS/weed/admin/maintenance/maintenance_queue.go
Chris Lu 076d504044 fix(admin): reduce memory usage and verbose logging for large clusters (#8927)
* fix(admin): reduce memory usage and verbose logging for large clusters (#8919)

The admin server used excessive memory and produced thousands of log lines
on clusters with many volumes (e.g., 33k volumes). Three root causes:

1. Scanner duplicated all volume metrics: getVolumeHealthMetrics() created
   VolumeHealthMetrics objects, then convertToTaskMetrics() copied them all
   into identical types.VolumeHealthMetrics. Now uses the task-system type
   directly, eliminating the duplicate allocation and removing convertToTaskMetrics.

2. All previous task states loaded at startup: LoadTasksFromPersistence read
   and deserialized every .pb file from disk, logging each one. With thousands
   of balance tasks persisted, this caused massive startup I/O, memory usage,
   and log noise (including unguarded DEBUG glog.Infof per task). Now starts
   with an empty queue — the scanner re-detects current needs from live cluster
   state. Terminal tasks are purged from memory and disk when new scan results
   arrive.

3. Verbose per-volume/per-node logging: V(2) and V(3) logs produced thousands
   of lines per scan. Per-volume logs bumped to V(4), per-node/rack/disk logs
   bumped to V(3). Topology summary now logs counts instead of full node ID arrays.

Also removes lastTopologyInfo field from MaintenanceScanner — the raw protobuf
topology is returned as a local value and not retained between 30-minute scans.

* fix(admin): delete stale task files at startup, add DeleteAllTaskStates

Old task .pb files from previous runs were left on disk. The periodic
CleanupCompletedTasks still loads all files to find completed ones —
the same expensive 4GB path from the pprof profile.

Now at startup, DeleteAllTaskStates removes all .pb files by scanning
the directory without reading or deserializing them. The scanner will
re-detect any tasks still needed from live cluster state.

* fix(admin): don't persist terminal tasks to disk

CompleteTask was saving failed/completed tasks to disk where they'd
accumulate. The periodic cleanup only triggered for completed tasks,
not failed ones. Now terminal tasks are deleted from disk immediately
and only kept in memory for the current session's UI.

* fix(admin): cap in-memory tasks to 100 per job type

Without a limit, the task map grows unbounded — balance could create
thousands of pending tasks for a cluster with many imbalanced volumes.
Now AddTask rejects new tasks when a job type already has 100 in the
queue. The scanner will re-detect skipped volumes on the next scan.

* fix(admin): address PR review - memory-only purge, active-only capacity

- purgeTerminalTasks now only cleans in-memory map (terminal tasks are
  already deleted from disk by CompleteTask)
- Per-type capacity limit counts only active tasks (pending/assigned/
  in_progress), not terminal ones
- When at capacity, purge terminal tasks first before rejecting

* fix(admin): fix orphaned comment, add TaskStatusCancelled to terminal switch

- Move hasQueuedOrActiveTaskForVolume comment to its function definition
- Add TaskStatusCancelled to the terminal state switch in CompleteTask
  so cancelled task files are deleted from disk
2026-04-04 18:45:57 -07:00

1189 lines
37 KiB
Go

package maintenance
import (
"crypto/rand"
"fmt"
"sort"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
)
// NewMaintenanceQueue creates a new maintenance queue
func NewMaintenanceQueue(policy *MaintenancePolicy) *MaintenanceQueue {
queue := &MaintenanceQueue{
tasks: make(map[string]*MaintenanceTask),
workers: make(map[string]*MaintenanceWorker),
pendingTasks: make([]*MaintenanceTask, 0),
policy: policy,
}
return queue
}
// SetIntegration sets the integration reference
func (mq *MaintenanceQueue) SetIntegration(integration *MaintenanceIntegration) {
mq.integration = integration
glog.V(1).Infof("Maintenance queue configured with integration")
}
// SetPersistence sets the task persistence interface
func (mq *MaintenanceQueue) SetPersistence(persistence TaskPersistence) {
mq.persistence = persistence
glog.V(1).Infof("Maintenance queue configured with task persistence")
}
// LoadTasksFromPersistence is called on startup. Previous task states are NOT loaded
// into memory — the maintenance scanner will re-detect current needs from the live
// cluster state. Stale task files from previous runs are deleted from disk.
func (mq *MaintenanceQueue) LoadTasksFromPersistence() error {
if mq.persistence != nil {
if err := mq.persistence.DeleteAllTaskStates(); err != nil {
glog.Warningf("Failed to clean up old task files: %v", err)
}
}
glog.Infof("Task queue initialized (previous tasks will be re-detected by scanner)")
return nil
}
// saveTaskState saves a task to persistent storage
func (mq *MaintenanceQueue) saveTaskState(task *MaintenanceTask) {
if mq.persistence != nil {
if err := mq.persistence.SaveTaskState(task); err != nil {
glog.Errorf("Failed to save task state for %s: %v", task.ID, err)
}
}
}
func (mq *MaintenanceQueue) deleteTaskState(taskID string) {
if mq.persistence != nil {
if err := mq.persistence.DeleteTaskState(taskID); err != nil {
glog.V(2).Infof("Failed to delete task state for %s: %v", taskID, err)
}
}
}
// cleanupCompletedTasks removes old completed tasks beyond the retention limit
func (mq *MaintenanceQueue) cleanupCompletedTasks() {
if mq.persistence != nil {
if err := mq.persistence.CleanupCompletedTasks(); err != nil {
glog.Errorf("Failed to cleanup completed tasks: %v", err)
}
}
}
const MaxTasksPerType = 100
// AddTask adds a new maintenance task to the queue with deduplication
func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) {
mq.mutex.Lock()
// Enforce per-type capacity limit (only counting active tasks)
if mq.countActiveTasksByType(task.Type) >= MaxTasksPerType {
// Purge terminal tasks first, then recheck
mq.purgeTerminalTasksLocked()
if mq.countActiveTasksByType(task.Type) >= MaxTasksPerType {
mq.mutex.Unlock()
glog.V(1).Infof("Task skipped (type %s at capacity %d): volume %d on %s",
task.Type, MaxTasksPerType, task.VolumeID, task.Server)
return
}
}
// Enforce one queued/active task per volume (across all task types).
if mq.hasQueuedOrActiveTaskForVolume(task.VolumeID) {
mq.mutex.Unlock()
glog.V(1).Infof("Task skipped (volume busy): %s for volume %d on %s (already queued or running)",
task.Type, task.VolumeID, task.Server)
return
}
// Check for duplicate tasks (same type + volume + not completed)
if mq.hasDuplicateTask(task) {
mq.mutex.Unlock()
glog.V(1).Infof("Task skipped (duplicate): %s for volume %d on %s (already queued or running)",
task.Type, task.VolumeID, task.Server)
return
}
if task.ID == "" {
task.ID = generateTaskID()
}
task.Status = TaskStatusPending
task.CreatedAt = time.Now()
task.MaxRetries = 3 // Default retry count
// Initialize assignment history and set creation context
task.AssignmentHistory = make([]*TaskAssignmentRecord, 0)
if task.CreatedBy == "" {
task.CreatedBy = "maintenance-system"
}
if task.CreationContext == "" {
task.CreationContext = "Automatic task creation based on system monitoring"
}
if task.Tags == nil {
task.Tags = make(map[string]string)
}
mq.tasks[task.ID] = task
mq.pendingTasks = append(mq.pendingTasks, task)
// Sort pending tasks by priority and schedule time
sort.Slice(mq.pendingTasks, func(i, j int) bool {
if mq.pendingTasks[i].Priority != mq.pendingTasks[j].Priority {
return mq.pendingTasks[i].Priority > mq.pendingTasks[j].Priority
}
return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt)
})
scheduleInfo := ""
if !task.ScheduledAt.IsZero() && time.Until(task.ScheduledAt) > time.Minute {
scheduleInfo = fmt.Sprintf(", scheduled for %v", task.ScheduledAt.Format("15:04:05"))
}
// Snapshot task state while lock is still held to avoid data race;
// also capture log fields from the snapshot so the live task pointer
// is not accessed after mq.mutex is released.
taskSnapshot := snapshotTask(task)
mq.mutex.Unlock()
// Save task state to persistence outside the lock to avoid blocking
// RegisterWorker and HTTP handlers (GetTasks) during disk I/O
mq.saveTaskState(taskSnapshot)
glog.Infof("Task queued: %s (%s) volume %d on %s, priority %d%s, reason: %s",
taskSnapshot.ID, taskSnapshot.Type, taskSnapshot.VolumeID, taskSnapshot.Server, taskSnapshot.Priority, scheduleInfo, taskSnapshot.Reason)
}
// countActiveTasksByType returns the number of active (non-terminal) tasks of a given type. Caller must hold mq.mutex.
func (mq *MaintenanceQueue) countActiveTasksByType(taskType MaintenanceTaskType) int {
count := 0
for _, t := range mq.tasks {
if t.Type == taskType {
switch t.Status {
case TaskStatusPending, TaskStatusAssigned, TaskStatusInProgress:
count++
}
}
}
return count
}
// purgeTerminalTasksLocked removes terminal tasks from the in-memory map. Caller must hold mq.mutex.
func (mq *MaintenanceQueue) purgeTerminalTasksLocked() {
for id, task := range mq.tasks {
switch task.Status {
case TaskStatusCompleted, TaskStatusFailed, TaskStatusCancelled:
delete(mq.tasks, id)
}
}
}
// hasQueuedOrActiveTaskForVolume checks if any pending/assigned/in-progress task already exists for this volume.
// Caller must hold mq.mutex.
func (mq *MaintenanceQueue) hasQueuedOrActiveTaskForVolume(volumeID uint32) bool {
if volumeID == 0 {
return false
}
for _, existingTask := range mq.tasks {
if existingTask.VolumeID != volumeID {
continue
}
if existingTask.Status == TaskStatusPending ||
existingTask.Status == TaskStatusAssigned ||
existingTask.Status == TaskStatusInProgress {
return true
}
}
return false
}
// hasDuplicateTask checks if a similar task already exists (same type, volume, and not completed)
func (mq *MaintenanceQueue) hasDuplicateTask(newTask *MaintenanceTask) bool {
for _, existingTask := range mq.tasks {
if existingTask.Type == newTask.Type &&
existingTask.VolumeID == newTask.VolumeID &&
existingTask.Server == newTask.Server &&
(existingTask.Status == TaskStatusPending ||
existingTask.Status == TaskStatusAssigned ||
existingTask.Status == TaskStatusInProgress) {
return true
}
}
return false
}
// CancelPendingTasksByType cancels all pending tasks of a given type.
// This is called before each detection cycle to ensure stale proposals
// from previous cycles are cleaned up before creating new ones.
func (mq *MaintenanceQueue) CancelPendingTasksByType(taskType MaintenanceTaskType) int {
mq.mutex.Lock()
var remaining []*MaintenanceTask
var cancelledSnapshots []*MaintenanceTask
cancelled := 0
for _, task := range mq.pendingTasks {
if task.Type == taskType {
task.Status = TaskStatusCancelled
now := time.Now()
task.CompletedAt = &now
cancelled++
cancelledSnapshots = append(cancelledSnapshots, snapshotTask(task))
glog.V(1).Infof("Cancelled stale pending task %s (%s) for volume %d before re-detection",
task.ID, task.Type, task.VolumeID)
// Release capacity in ActiveTopology and remove pending operation
if mq.integration != nil {
if at := mq.integration.GetActiveTopology(); at != nil {
_ = at.CompleteTask(task.ID)
}
}
mq.removePendingOperation(task.ID)
} else {
remaining = append(remaining, task)
}
}
mq.pendingTasks = remaining
mq.mutex.Unlock()
// Persist cancelled state outside the lock to avoid blocking
for _, snapshot := range cancelledSnapshots {
mq.saveTaskState(snapshot)
}
return cancelled
}
// AddTasksFromResults converts detection results to tasks and adds them to the queue
func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult) {
// Purge terminal tasks from memory before adding new ones
mq.purgeTerminalTasks()
for _, result := range results {
// Validate that task has proper typed parameters
if result.TypedParams == nil {
glog.Warningf("Rejecting invalid task: %s for volume %d on %s - no typed parameters (insufficient destinations or planning failed)",
result.TaskType, result.VolumeID, result.Server)
continue
}
task := &MaintenanceTask{
ID: result.TaskID,
Type: result.TaskType,
Priority: result.Priority,
VolumeID: result.VolumeID,
Server: result.Server,
Collection: result.Collection,
// Copy typed protobuf parameters
TypedParams: result.TypedParams,
Reason: result.Reason,
ScheduledAt: result.ScheduleAt,
}
mq.AddTask(task)
}
}
// purgeTerminalTasks removes completed/failed/cancelled tasks from memory.
// Terminal tasks are already deleted from disk by CompleteTask, so this
// only needs to clean up the in-memory map.
func (mq *MaintenanceQueue) purgeTerminalTasks() {
mq.mutex.Lock()
before := len(mq.tasks)
mq.purgeTerminalTasksLocked()
purged := before - len(mq.tasks)
mq.mutex.Unlock()
if purged > 0 {
glog.V(1).Infof("Purged %d terminal tasks from memory", purged)
}
}
// GetNextTask returns the next available task for a worker
func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask {
// Use read lock for initial checks and search
mq.mutex.RLock()
worker, exists := mq.workers[workerID]
if !exists {
mq.mutex.RUnlock()
glog.V(2).Infof("Task assignment failed for worker %s: worker not registered", workerID)
return nil
}
// Check if worker has capacity
if worker.CurrentLoad >= worker.MaxConcurrent {
mq.mutex.RUnlock()
glog.V(2).Infof("Task assignment failed for worker %s: at capacity (%d/%d)", workerID, worker.CurrentLoad, worker.MaxConcurrent)
return nil
}
now := time.Now()
var selectedTask *MaintenanceTask
var selectedIndex int = -1
// Find the next suitable task (using read lock)
for i, task := range mq.pendingTasks {
// Check if it's time to execute the task
if task.ScheduledAt.After(now) {
glog.V(3).Infof("Task %s skipped for worker %s: scheduled for future (%v)", task.ID, workerID, task.ScheduledAt)
continue
}
// Avoid scheduling concurrent operations on the same volume
if activeTaskID, activeTaskType, hasActive := mq.activeTaskForVolume(task.VolumeID, task.ID); hasActive {
glog.V(2).Infof("Task %s (%s) skipped for worker %s: volume %d is busy with task %s (%s)",
task.ID, task.Type, workerID, task.VolumeID, activeTaskID, activeTaskType)
continue
}
// Check if worker can handle this task type
if !mq.workerCanHandle(task.Type, capabilities) {
glog.V(3).Infof("Task %s (%s) skipped for worker %s: capability mismatch (worker has: %v)", task.ID, task.Type, workerID, capabilities)
continue
}
// Check if this task type needs a cooldown period
if !mq.canScheduleTaskNow(task) {
// Add detailed diagnostic information
runningCount := mq.GetRunningTaskCount(task.Type)
maxConcurrent := mq.getMaxConcurrentForTaskType(task.Type)
glog.V(2).Infof("Task %s (%s) skipped for worker %s: scheduling constraints not met (running: %d, max: %d)",
task.ID, task.Type, workerID, runningCount, maxConcurrent)
continue
}
// Found a suitable task
selectedTask = task
selectedIndex = i
break
}
// Release read lock
mq.mutex.RUnlock()
// If no task found, return nil
if selectedTask == nil {
glog.V(4).Infof("No suitable tasks available for worker %s (checked %d pending tasks)", workerID, len(mq.pendingTasks))
return nil
}
// Now acquire write lock to actually assign the task
mq.mutex.Lock()
// Capture ID before the re-check so it is available for logging after unlock.
selectedTaskID := selectedTask.ID
// Re-check that the task is still available (it might have been assigned to another worker)
if selectedIndex >= len(mq.pendingTasks) || mq.pendingTasks[selectedIndex].ID != selectedTaskID {
mq.mutex.Unlock()
glog.V(2).Infof("Task %s no longer available for worker %s: assigned to another worker", selectedTaskID, workerID)
return nil
}
// Re-check volume conflict after acquiring write lock
if activeTaskID, activeTaskType, hasActive := mq.activeTaskForVolume(selectedTask.VolumeID, selectedTaskID); hasActive {
mq.mutex.Unlock()
glog.V(2).Infof("Task %s no longer available for worker %s: volume %d is busy with task %s (%s)",
selectedTaskID, workerID, selectedTask.VolumeID, activeTaskID, activeTaskType)
return nil
}
// Record assignment history
workerAddress := ""
if worker, exists := mq.workers[workerID]; exists {
workerAddress = worker.Address
}
// Create assignment record
assignmentRecord := &TaskAssignmentRecord{
WorkerID: workerID,
WorkerAddress: workerAddress,
AssignedAt: now,
Reason: "Task assigned to available worker",
}
// Initialize assignment history if nil
if selectedTask.AssignmentHistory == nil {
selectedTask.AssignmentHistory = make([]*TaskAssignmentRecord, 0)
}
selectedTask.AssignmentHistory = append(selectedTask.AssignmentHistory, assignmentRecord)
// Assign the task
selectedTask.Status = TaskStatusAssigned
selectedTask.WorkerID = workerID
selectedTask.StartedAt = &now
// Notify ActiveTopology to reserve capacity (move from pending to assigned)
if mq.integration != nil {
if at := mq.integration.GetActiveTopology(); at != nil {
if err := at.AssignTask(selectedTask.ID); err != nil {
glog.Warningf("Failed to update ActiveTopology for task assignment %s: %v. Rolling back assignment.", selectedTask.ID, err)
// Rollback assignment in MaintenanceQueue
selectedTask.Status = TaskStatusPending
selectedTask.WorkerID = ""
selectedTask.StartedAt = nil
if len(selectedTask.AssignmentHistory) > 0 {
selectedTask.AssignmentHistory = selectedTask.AssignmentHistory[:len(selectedTask.AssignmentHistory)-1]
}
mq.mutex.Unlock()
// Return nil so the task is not removed from pendingTasks and not returned to the worker
return nil
}
}
}
// Remove from pending tasks
mq.pendingTasks = append(mq.pendingTasks[:selectedIndex], mq.pendingTasks[selectedIndex+1:]...)
// Update worker load
if worker, exists := mq.workers[workerID]; exists {
worker.CurrentLoad++
}
// Track pending operation
mq.trackPendingOperation(selectedTask)
// Snapshot task state while lock is still held to avoid data race
selectedSnapshot := snapshotTask(selectedTask)
mq.mutex.Unlock()
// Save task state to persistence outside the lock
mq.saveTaskState(selectedSnapshot)
glog.Infof("Task assigned: %s (%s) → worker %s (volume %d, server %s)",
selectedSnapshot.ID, selectedSnapshot.Type, workerID, selectedSnapshot.VolumeID, selectedSnapshot.Server)
return selectedTask
}
// CompleteTask marks a task as completed
func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) {
mq.mutex.Lock()
task, exists := mq.tasks[taskID]
if !exists {
mq.mutex.Unlock()
glog.Warningf("Attempted to complete non-existent task: %s", taskID)
return
}
// Notify ActiveTopology to release capacity (move from assigned to recent)
// We do this for both success and failure cases to release the capacity
if mq.integration != nil {
if at := mq.integration.GetActiveTopology(); at != nil {
if task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress {
// Ignore error as task might not be in ActiveTopology (e.g. after restart)
_ = at.CompleteTask(taskID)
}
}
}
completedTime := time.Now()
task.CompletedAt = &completedTime
// Calculate task duration
var duration time.Duration
if task.StartedAt != nil {
duration = completedTime.Sub(*task.StartedAt)
}
// Capture workerID before it may be cleared during retry
originalWorkerID := task.WorkerID
var taskToSave *MaintenanceTask
var logFn func()
if error != "" {
task.Status = TaskStatusFailed
task.Error = error
// Check if task should be retried (skip retry for permanent errors)
if task.RetryCount < task.MaxRetries && !isNonRetriableError(error) {
// Record unassignment due to failure/retry
if task.WorkerID != "" && len(task.AssignmentHistory) > 0 {
lastAssignment := task.AssignmentHistory[len(task.AssignmentHistory)-1]
if lastAssignment.UnassignedAt == nil {
unassignedTime := completedTime
lastAssignment.UnassignedAt = &unassignedTime
lastAssignment.Reason = fmt.Sprintf("Task failed, scheduling retry (attempt %d/%d): %s",
task.RetryCount+1, task.MaxRetries, error)
}
}
task.RetryCount++
task.Status = TaskStatusPending
task.WorkerID = ""
task.StartedAt = nil
task.CompletedAt = nil
task.Error = ""
task.ScheduledAt = time.Now().Add(15 * time.Minute) // Retry delay
mq.pendingTasks = append(mq.pendingTasks, task)
// Resync with ActiveTopology (re-add as pending)
if mq.integration != nil {
mq.integration.SyncTask(task)
}
taskToSave = task
retryCount, maxRetries := task.RetryCount, task.MaxRetries
logFn = func() {
glog.Warningf("Task failed, scheduling retry: %s (%s) attempt %d/%d, worker %s, duration %v, error: %s",
taskID, task.Type, retryCount, maxRetries, originalWorkerID, duration, error)
}
} else {
// Record unassignment due to permanent failure
if task.WorkerID != "" && len(task.AssignmentHistory) > 0 {
lastAssignment := task.AssignmentHistory[len(task.AssignmentHistory)-1]
if lastAssignment.UnassignedAt == nil {
unassignedTime := completedTime
lastAssignment.UnassignedAt = &unassignedTime
lastAssignment.Reason = fmt.Sprintf("Task failed permanently after %d retries: %s", task.MaxRetries, error)
}
}
taskToSave = task
maxRetries := task.MaxRetries
logFn = func() {
glog.Errorf("Task failed permanently: %s (%s) worker %s, duration %v, after %d retries: %s",
taskID, task.Type, originalWorkerID, duration, maxRetries, error)
}
}
} else {
task.Status = TaskStatusCompleted
task.Progress = 100
taskToSave = task
volumeID := task.VolumeID
logFn = func() {
glog.Infof("Task completed: %s (%s) worker %s, duration %v, volume %d",
taskID, task.Type, originalWorkerID, duration, volumeID)
}
}
// Update worker load and capture state before releasing lock
if originalWorkerID != "" {
if worker, exists := mq.workers[originalWorkerID]; exists {
worker.CurrentTask = nil
worker.CurrentLoad--
if worker.CurrentLoad == 0 {
worker.Status = "active"
}
}
}
taskStatus := task.Status
// Snapshot task state while lock is still held to avoid data race
var taskToSaveSnapshot *MaintenanceTask
if taskToSave != nil {
taskToSaveSnapshot = snapshotTask(taskToSave)
}
mq.mutex.Unlock()
// Only persist non-terminal tasks (retries). Completed/failed tasks stay
// in memory for the UI but are not written to disk — they would just
// accumulate and slow down future startups.
if taskToSaveSnapshot != nil {
switch taskStatus {
case TaskStatusPending:
// Retry — save so the task survives a restart
mq.saveTaskState(taskToSaveSnapshot)
case TaskStatusCompleted, TaskStatusFailed, TaskStatusCancelled:
// Terminal — delete the file if one exists from a previous state
mq.deleteTaskState(taskToSaveSnapshot.ID)
}
}
if logFn != nil {
logFn()
}
// Remove pending operation (unless it's being retried)
if taskStatus != TaskStatusPending {
mq.removePendingOperation(taskID)
}
}
// isNonRetriableError returns true for errors that will never succeed on retry,
// such as when the volume doesn't exist on the source server.
func isNonRetriableError(errMsg string) bool {
return strings.Contains(errMsg, "not found")
}
// UpdateTaskProgress updates the progress of a running task
func (mq *MaintenanceQueue) UpdateTaskProgress(taskID string, progress float64) {
mq.mutex.Lock()
task, exists := mq.tasks[taskID]
if !exists {
mq.mutex.Unlock()
glog.V(2).Infof("Progress update for unknown task: %s (%.1f%%)", taskID, progress)
return
}
oldProgress := task.Progress
task.Progress = progress
task.Status = TaskStatusInProgress
// Update pending operation status while lock is held
mq.updatePendingOperationStatus(taskID, "in_progress")
// Determine whether to persist and capture log fields before unlocking
shouldSave := progress == 0 || progress >= 100 || progress-oldProgress >= 10
var taskSnapshot *MaintenanceTask
if shouldSave {
taskSnapshot = snapshotTask(task)
}
taskType, workerID, volumeID := task.Type, task.WorkerID, task.VolumeID
mq.mutex.Unlock()
// Log progress at significant milestones or changes
if progress == 0 {
glog.V(1).Infof("Task started: %s (%s) worker %s, volume %d",
taskID, taskType, workerID, volumeID)
} else if progress >= 100 {
glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete",
taskID, taskType, workerID, progress)
} else if progress-oldProgress >= 25 { // Log every 25% increment
glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete",
taskID, taskType, workerID, progress)
}
// Save task state outside the lock to avoid blocking readers
if taskSnapshot != nil {
mq.saveTaskState(taskSnapshot)
}
}
// RegisterWorker registers a new worker
func (mq *MaintenanceQueue) RegisterWorker(worker *MaintenanceWorker) {
mq.mutex.Lock()
defer mq.mutex.Unlock()
isNewWorker := true
if existingWorker, exists := mq.workers[worker.ID]; exists {
isNewWorker = false
glog.Infof("Worker reconnected: %s at %s (capabilities: %v, max concurrent: %d)",
worker.ID, worker.Address, worker.Capabilities, worker.MaxConcurrent)
// Preserve current load when reconnecting
worker.CurrentLoad = existingWorker.CurrentLoad
} else {
glog.Infof("Worker registered: %s at %s (capabilities: %v, max concurrent: %d)",
worker.ID, worker.Address, worker.Capabilities, worker.MaxConcurrent)
}
worker.LastHeartbeat = time.Now()
worker.Status = "active"
if isNewWorker {
worker.CurrentLoad = 0
}
mq.workers[worker.ID] = worker
}
// UpdateWorkerHeartbeat updates worker heartbeat
func (mq *MaintenanceQueue) UpdateWorkerHeartbeat(workerID string) {
mq.mutex.Lock()
defer mq.mutex.Unlock()
if worker, exists := mq.workers[workerID]; exists {
lastSeen := worker.LastHeartbeat
worker.LastHeartbeat = time.Now()
// Log if worker was offline for a while
if time.Since(lastSeen) > 2*time.Minute {
glog.Infof("Worker %s heartbeat resumed after %v", workerID, time.Since(lastSeen))
}
} else {
glog.V(2).Infof("Heartbeat from unknown worker: %s", workerID)
}
}
// GetRunningTaskCount returns the number of running tasks of a specific type
func (mq *MaintenanceQueue) GetRunningTaskCount(taskType MaintenanceTaskType) int {
mq.mutex.RLock()
defer mq.mutex.RUnlock()
count := 0
for _, task := range mq.tasks {
if task.Type == taskType && (task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress) {
count++
}
}
return count
}
// WasTaskRecentlyCompleted checks if a similar task was recently completed
func (mq *MaintenanceQueue) WasTaskRecentlyCompleted(taskType MaintenanceTaskType, volumeID uint32, server string, now time.Time) bool {
mq.mutex.RLock()
defer mq.mutex.RUnlock()
// Get the repeat prevention interval for this task type
interval := mq.getRepeatPreventionInterval(taskType)
cutoff := now.Add(-interval)
for _, task := range mq.tasks {
if task.Type == taskType &&
task.VolumeID == volumeID &&
task.Server == server &&
task.Status == TaskStatusCompleted &&
task.CompletedAt != nil &&
task.CompletedAt.After(cutoff) {
return true
}
}
return false
}
// getRepeatPreventionInterval returns the interval for preventing task repetition
func (mq *MaintenanceQueue) getRepeatPreventionInterval(taskType MaintenanceTaskType) time.Duration {
// First try to get default from task scheduler
if mq.integration != nil {
if scheduler := mq.integration.GetTaskScheduler(taskType); scheduler != nil {
defaultInterval := scheduler.GetDefaultRepeatInterval()
if defaultInterval > 0 {
glog.V(3).Infof("Using task scheduler default repeat interval for %s: %v", taskType, defaultInterval)
return defaultInterval
}
}
}
// Fallback to policy configuration if no scheduler available or scheduler doesn't provide default
if mq.policy != nil {
repeatIntervalHours := GetRepeatInterval(mq.policy, taskType)
if repeatIntervalHours > 0 {
interval := time.Duration(repeatIntervalHours) * time.Hour
glog.V(3).Infof("Using policy configuration repeat interval for %s: %v", taskType, interval)
return interval
}
}
// Ultimate fallback - but avoid hardcoded values where possible
glog.V(2).Infof("No scheduler or policy configuration found for task type %s, using minimal default: 1h", taskType)
return time.Hour // Minimal safe default
}
// GetTasks returns tasks with optional filtering
func (mq *MaintenanceQueue) GetTasks(status MaintenanceTaskStatus, taskType MaintenanceTaskType, limit int) []*MaintenanceTask {
mq.mutex.RLock()
defer mq.mutex.RUnlock()
var tasks []*MaintenanceTask
for _, task := range mq.tasks {
if status != "" && task.Status != status {
continue
}
if taskType != "" && task.Type != taskType {
continue
}
tasks = append(tasks, task)
}
// Sort based on status
if status == TaskStatusCompleted || status == TaskStatusFailed || status == TaskStatusCancelled {
sort.Slice(tasks, func(i, j int) bool {
t1 := tasks[i].CompletedAt
t2 := tasks[j].CompletedAt
if t1 == nil && t2 == nil {
return tasks[i].CreatedAt.After(tasks[j].CreatedAt)
}
if t1 == nil {
return false
}
if t2 == nil {
return true
}
return t1.After(*t2)
})
} else {
// Default to creation time (newest first)
sort.Slice(tasks, func(i, j int) bool {
return tasks[i].CreatedAt.After(tasks[j].CreatedAt)
})
}
// Apply limit after sorting
if limit > 0 && len(tasks) > limit {
tasks = tasks[:limit]
}
return tasks
}
// GetWorkers returns all registered workers
func (mq *MaintenanceQueue) GetWorkers() []*MaintenanceWorker {
mq.mutex.RLock()
defer mq.mutex.RUnlock()
var workers []*MaintenanceWorker
for _, worker := range mq.workers {
workers = append(workers, worker)
}
return workers
}
// generateTaskID generates a unique ID for tasks
func generateTaskID() string {
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
b := make([]byte, 8)
randBytes := make([]byte, 8)
// Generate random bytes
if _, err := rand.Read(randBytes); err != nil {
// Fallback to timestamp-based ID if crypto/rand fails
timestamp := time.Now().UnixNano()
return fmt.Sprintf("task-%d", timestamp)
}
// Convert random bytes to charset
for i := range b {
b[i] = charset[int(randBytes[i])%len(charset)]
}
// Add timestamp suffix to ensure uniqueness
timestamp := time.Now().Unix() % 10000 // last 4 digits of timestamp
return fmt.Sprintf("%s-%04d", string(b), timestamp)
}
// CleanupOldTasks removes old completed and failed tasks
func (mq *MaintenanceQueue) CleanupOldTasks(retention time.Duration) int {
mq.mutex.Lock()
defer mq.mutex.Unlock()
cutoff := time.Now().Add(-retention)
removed := 0
for id, task := range mq.tasks {
if (task.Status == TaskStatusCompleted || task.Status == TaskStatusFailed) &&
task.CompletedAt != nil &&
task.CompletedAt.Before(cutoff) {
delete(mq.tasks, id)
removed++
}
}
glog.V(2).Infof("Cleaned up %d old maintenance tasks", removed)
return removed
}
// RemoveStaleWorkers removes workers that haven't sent heartbeat recently
func (mq *MaintenanceQueue) RemoveStaleWorkers(timeout time.Duration) int {
mq.mutex.Lock()
defer mq.mutex.Unlock()
cutoff := time.Now().Add(-timeout)
removed := 0
for id, worker := range mq.workers {
if worker.LastHeartbeat.Before(cutoff) {
// Mark any assigned tasks as failed and record unassignment
for _, task := range mq.tasks {
if task.WorkerID == id && (task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress) {
// Record unassignment due to worker becoming unavailable
if len(task.AssignmentHistory) > 0 {
lastAssignment := task.AssignmentHistory[len(task.AssignmentHistory)-1]
if lastAssignment.UnassignedAt == nil {
unassignedTime := time.Now()
lastAssignment.UnassignedAt = &unassignedTime
lastAssignment.Reason = "Worker became unavailable (stale heartbeat)"
}
}
task.Status = TaskStatusFailed
task.Error = "Worker became unavailable"
completedTime := time.Now()
task.CompletedAt = &completedTime
// Notify ActiveTopology to release capacity
if mq.integration != nil {
if at := mq.integration.GetActiveTopology(); at != nil {
_ = at.CompleteTask(task.ID)
}
}
}
}
delete(mq.workers, id)
removed++
glog.Warningf("Removed stale maintenance worker %s", id)
}
}
return removed
}
// GetStats returns maintenance statistics
func (mq *MaintenanceQueue) GetStats() *MaintenanceStats {
mq.mutex.RLock()
defer mq.mutex.RUnlock()
stats := &MaintenanceStats{
TotalTasks: len(mq.tasks),
TasksByStatus: make(map[MaintenanceTaskStatus]int),
TasksByType: make(map[MaintenanceTaskType]int),
ActiveWorkers: 0,
}
today := time.Now().Truncate(24 * time.Hour)
var totalDuration time.Duration
var completedTasks int
for _, task := range mq.tasks {
stats.TasksByStatus[task.Status]++
stats.TasksByType[task.Type]++
if task.CompletedAt != nil && task.CompletedAt.After(today) {
if task.Status == TaskStatusCompleted {
stats.CompletedToday++
} else if task.Status == TaskStatusFailed {
stats.FailedToday++
}
if task.StartedAt != nil {
duration := task.CompletedAt.Sub(*task.StartedAt)
totalDuration += duration
completedTasks++
}
}
}
for _, worker := range mq.workers {
if worker.Status == "active" || worker.Status == "busy" {
stats.ActiveWorkers++
}
}
if completedTasks > 0 {
stats.AverageTaskTime = totalDuration / time.Duration(completedTasks)
}
return stats
}
// workerCanHandle checks if a worker can handle a specific task type
func (mq *MaintenanceQueue) workerCanHandle(taskType MaintenanceTaskType, capabilities []MaintenanceTaskType) bool {
for _, capability := range capabilities {
if capability == taskType {
return true
}
}
return false
}
// activeTaskForVolume returns the active task ID/type for a volume, if any.
// Caller must hold mq.mutex (read or write).
func (mq *MaintenanceQueue) activeTaskForVolume(volumeID uint32, excludeTaskID string) (string, MaintenanceTaskType, bool) {
if volumeID == 0 {
return "", "", false
}
for _, task := range mq.tasks {
if task.ID == excludeTaskID {
continue
}
if task.VolumeID != volumeID {
continue
}
if task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress {
return task.ID, task.Type, true
}
}
return "", "", false
}
// canScheduleTaskNow determines if a task can be scheduled using task schedulers or fallback logic
func (mq *MaintenanceQueue) canScheduleTaskNow(task *MaintenanceTask) bool {
glog.V(2).Infof("Checking if task %s (type: %s) can be scheduled", task.ID, task.Type)
// TEMPORARY FIX: Skip integration task scheduler which is being overly restrictive
// Use fallback logic directly for now
glog.V(2).Infof("Using fallback logic for task scheduling")
canExecute := mq.canExecuteTaskType(task.Type)
glog.V(2).Infof("Fallback decision for task %s: %v", task.ID, canExecute)
return canExecute
// NOTE: Original integration code disabled temporarily
// Try task scheduling logic first
/*
if mq.integration != nil {
glog.Infof("DEBUG canScheduleTaskNow: Using integration task scheduler")
// Get all running tasks and available workers
runningTasks := mq.getRunningTasks()
availableWorkers := mq.getAvailableWorkers()
glog.Infof("DEBUG canScheduleTaskNow: Running tasks: %d, Available workers: %d", len(runningTasks), len(availableWorkers))
canSchedule := mq.integration.CanScheduleWithTaskSchedulers(task, runningTasks, availableWorkers)
glog.Infof("DEBUG canScheduleTaskNow: Task scheduler decision for task %s (%s): %v", task.ID, task.Type, canSchedule)
return canSchedule
}
*/
}
// canExecuteTaskType checks if we can execute more tasks of this type (concurrency limits) - fallback logic
func (mq *MaintenanceQueue) canExecuteTaskType(taskType MaintenanceTaskType) bool {
runningCount := mq.GetRunningTaskCount(taskType)
maxConcurrent := mq.getMaxConcurrentForTaskType(taskType)
canExecute := runningCount < maxConcurrent
glog.V(3).Infof("canExecuteTaskType for %s: running=%d, max=%d, canExecute=%v", taskType, runningCount, maxConcurrent, canExecute)
return canExecute
}
// getMaxConcurrentForTaskType returns the maximum concurrent tasks allowed for a task type
func (mq *MaintenanceQueue) getMaxConcurrentForTaskType(taskType MaintenanceTaskType) int {
// First try to get default from task scheduler
if mq.integration != nil {
if scheduler := mq.integration.GetTaskScheduler(taskType); scheduler != nil {
maxConcurrent := scheduler.GetMaxConcurrent()
if maxConcurrent > 0 {
glog.V(3).Infof("Using task scheduler max concurrent for %s: %d", taskType, maxConcurrent)
return maxConcurrent
}
}
}
// Fallback to policy configuration if no scheduler available or scheduler doesn't provide default
if mq.policy != nil {
maxConcurrent := GetMaxConcurrent(mq.policy, taskType)
if maxConcurrent > 0 {
glog.V(3).Infof("Using policy configuration max concurrent for %s: %d", taskType, maxConcurrent)
return maxConcurrent
}
}
// Ultimate fallback - minimal safe default
glog.V(2).Infof("No scheduler or policy configuration found for task type %s, using minimal default: 1", taskType)
return 1
}
// trackPendingOperation adds a task to the pending operations tracker
func (mq *MaintenanceQueue) trackPendingOperation(task *MaintenanceTask) {
if mq.integration == nil {
return
}
pendingOps := mq.integration.GetPendingOperations()
if pendingOps == nil {
return
}
// Skip tracking for tasks without proper typed parameters
if task.TypedParams == nil {
glog.V(2).Infof("Skipping pending operation tracking for task %s - no typed parameters", task.ID)
return
}
// Map maintenance task type to pending operation type
var opType PendingOperationType
switch task.Type {
case MaintenanceTaskType("balance"):
opType = OpTypeVolumeBalance
case MaintenanceTaskType("erasure_coding"):
opType = OpTypeErasureCoding
case MaintenanceTaskType("vacuum"):
opType = OpTypeVacuum
case MaintenanceTaskType("replication"):
opType = OpTypeReplication
default:
opType = OpTypeVolumeMove
}
// Determine destination node and estimated size from unified targets
destNode := ""
estimatedSize := uint64(1024 * 1024 * 1024) // Default 1GB estimate
// Use unified targets array - the only source of truth
if len(task.TypedParams.Targets) > 0 {
destNode = task.TypedParams.Targets[0].Node
if task.TypedParams.Targets[0].EstimatedSize > 0 {
estimatedSize = task.TypedParams.Targets[0].EstimatedSize
}
}
// Determine source node from unified sources
sourceNode := ""
if len(task.TypedParams.Sources) > 0 {
sourceNode = task.TypedParams.Sources[0].Node
}
operation := &PendingOperation{
VolumeID: task.VolumeID,
OperationType: opType,
SourceNode: sourceNode,
DestNode: destNode,
TaskID: task.ID,
StartTime: time.Now(),
EstimatedSize: estimatedSize,
Collection: task.Collection,
Status: "assigned",
}
pendingOps.AddOperation(operation)
}
// removePendingOperation removes a task from the pending operations tracker
func (mq *MaintenanceQueue) removePendingOperation(taskID string) {
if mq.integration == nil {
return
}
pendingOps := mq.integration.GetPendingOperations()
if pendingOps == nil {
return
}
pendingOps.RemoveOperation(taskID)
}
// updatePendingOperationStatus updates the status of a pending operation
func (mq *MaintenanceQueue) updatePendingOperationStatus(taskID string, status string) {
if mq.integration == nil {
return
}
pendingOps := mq.integration.GetPendingOperations()
if pendingOps == nil {
return
}
pendingOps.UpdateOperationStatus(taskID, status)
}
// snapshotTask returns a shallow copy of t with slice and map fields deep-copied
// so that the snapshot can be safely passed to saveTaskState after mq.mutex is
// released without racing against concurrent mutations of the live task struct.
// Must be called with mq.mutex held.
func snapshotTask(t *MaintenanceTask) *MaintenanceTask {
cp := *t // copy all scalar / pointer-sized fields
// Deep-copy AssignmentHistory: the slice header and each record pointer.
// Records themselves are never mutated after being appended, so copying
// the pointers is sufficient.
if t.AssignmentHistory != nil {
cp.AssignmentHistory = make([]*TaskAssignmentRecord, len(t.AssignmentHistory))
copy(cp.AssignmentHistory, t.AssignmentHistory)
}
// Deep-copy Tags map to avoid concurrent map read/write.
if t.Tags != nil {
cp.Tags = make(map[string]string, len(t.Tags))
for k, v := range t.Tags {
cp.Tags[k] = v
}
}
// Copy optional time pointers so a concurrent nil-assignment (e.g. retry
// path clearing StartedAt) does not race with maintenanceTaskToProtobuf
// reading the pointed-to value.
if t.StartedAt != nil {
ts := *t.StartedAt
cp.StartedAt = &ts
}
if t.CompletedAt != nil {
tc := *t.CompletedAt
cp.CompletedAt = &tc
}
return &cp
}