diff --git a/weed/admin/dash/config_persistence.go b/weed/admin/dash/config_persistence.go index cf55d48eb..ed0e60b28 100644 --- a/weed/admin/dash/config_persistence.go +++ b/weed/admin/dash/config_persistence.go @@ -1091,7 +1091,7 @@ func (cp *ConfigPersistence) loadTaskStateLocked(taskID string) (*maintenance.Ma // Convert protobuf to maintenance task task := cp.protobufToMaintenanceTask(taskStateFile.Task) - glog.V(2).Infof("Loaded task state for task %s from %s", taskID, taskFilePath) + glog.V(3).Infof("Loaded task state for task %s from %s", taskID, taskFilePath) return task, nil } @@ -1135,6 +1135,43 @@ func (cp *ConfigPersistence) loadAllTaskStatesLocked() ([]*maintenance.Maintenan return tasks, nil } +// DeleteAllTaskStates removes all task state .pb files from disk without reading them. +// Used at startup to clean up stale files from previous runs — the scanner will +// re-detect any tasks that are still needed from live cluster state. +func (cp *ConfigPersistence) DeleteAllTaskStates() error { + cp.tasksMu.Lock() + defer cp.tasksMu.Unlock() + + if cp.dataDir == "" { + return nil + } + + tasksDir := filepath.Join(cp.dataDir, TasksSubdir) + entries, err := os.ReadDir(tasksDir) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("failed to read tasks directory: %w", err) + } + + var removed int + for _, entry := range entries { + if !entry.IsDir() && filepath.Ext(entry.Name()) == ".pb" { + if err := os.Remove(filepath.Join(tasksDir, entry.Name())); err != nil && !os.IsNotExist(err) { + glog.Warningf("Failed to delete task file %s: %v", entry.Name(), err) + } else { + removed++ + } + } + } + + if removed > 0 { + glog.Infof("Cleaned up %d stale task files from disk", removed) + } + return nil +} + // DeleteTaskState removes a task state file from disk func (cp *ConfigPersistence) DeleteTaskState(taskID string) error { cp.tasksMu.Lock() diff --git a/weed/admin/maintenance/maintenance_queue.go b/weed/admin/maintenance/maintenance_queue.go index dc6546d40..4b0e29bce 100644 --- a/weed/admin/maintenance/maintenance_queue.go +++ b/weed/admin/maintenance/maintenance_queue.go @@ -33,80 +33,16 @@ func (mq *MaintenanceQueue) SetPersistence(persistence TaskPersistence) { glog.V(1).Infof("Maintenance queue configured with task persistence") } -// LoadTasksFromPersistence loads tasks from persistent storage on startup +// LoadTasksFromPersistence is called on startup. Previous task states are NOT loaded +// into memory — the maintenance scanner will re-detect current needs from the live +// cluster state. Stale task files from previous runs are deleted from disk. func (mq *MaintenanceQueue) LoadTasksFromPersistence() error { - if mq.persistence == nil { - glog.V(1).Infof("No task persistence configured, skipping task loading") - return nil - } - - mq.mutex.Lock() - defer mq.mutex.Unlock() - - glog.Infof("Loading tasks from persistence...") - - tasks, err := mq.persistence.LoadAllTaskStates() - if err != nil { - return fmt.Errorf("failed to load task states: %w", err) - } - - glog.Infof("DEBUG LoadTasksFromPersistence: Found %d tasks in persistence", len(tasks)) - - // Reset task maps - mq.tasks = make(map[string]*MaintenanceTask) - mq.pendingTasks = make([]*MaintenanceTask, 0) - - // Load tasks by status - for _, task := range tasks { - glog.Infof("DEBUG LoadTasksFromPersistence: Loading task %s (type: %s, status: %s, scheduled: %v)", task.ID, task.Type, task.Status, task.ScheduledAt) - mq.tasks[task.ID] = task - - switch task.Status { - case TaskStatusPending: - glog.Infof("DEBUG LoadTasksFromPersistence: Adding task %s to pending queue", task.ID) - mq.pendingTasks = append(mq.pendingTasks, task) - case TaskStatusAssigned, TaskStatusInProgress: - // For assigned/in-progress tasks, we need to check if the worker is still available - // If not, we should fail them and make them eligible for retry - if task.WorkerID != "" { - if _, exists := mq.workers[task.WorkerID]; !exists { - glog.Warningf("Task %s was assigned to unavailable worker %s, marking as failed", task.ID, task.WorkerID) - task.Status = TaskStatusFailed - task.Error = "Worker unavailable after restart" - completedTime := time.Now() - task.CompletedAt = &completedTime - - // Check if it should be retried - if task.RetryCount < task.MaxRetries { - task.RetryCount++ - task.Status = TaskStatusPending - task.WorkerID = "" - task.StartedAt = nil - task.CompletedAt = nil - task.Error = "" - task.ScheduledAt = time.Now().Add(1 * time.Minute) // Retry after restart delay - glog.Infof("DEBUG LoadTasksFromPersistence: Retrying task %s, adding to pending queue", task.ID) - mq.pendingTasks = append(mq.pendingTasks, task) - } - } - } - } - - // Sync task with ActiveTopology for capacity tracking - if mq.integration != nil { - mq.integration.SyncTask(task) + if mq.persistence != nil { + if err := mq.persistence.DeleteAllTaskStates(); err != nil { + glog.Warningf("Failed to clean up old task files: %v", err) } } - - // Sort pending tasks by priority and schedule time - sort.Slice(mq.pendingTasks, func(i, j int) bool { - if mq.pendingTasks[i].Priority != mq.pendingTasks[j].Priority { - return mq.pendingTasks[i].Priority > mq.pendingTasks[j].Priority - } - return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt) - }) - - glog.Infof("Loaded %d tasks from persistence (%d pending)", len(tasks), len(mq.pendingTasks)) + glog.Infof("Task queue initialized (previous tasks will be re-detected by scanner)") return nil } @@ -119,6 +55,14 @@ func (mq *MaintenanceQueue) saveTaskState(task *MaintenanceTask) { } } +func (mq *MaintenanceQueue) deleteTaskState(taskID string) { + if mq.persistence != nil { + if err := mq.persistence.DeleteTaskState(taskID); err != nil { + glog.V(2).Infof("Failed to delete task state for %s: %v", taskID, err) + } + } +} + // cleanupCompletedTasks removes old completed tasks beyond the retention limit func (mq *MaintenanceQueue) cleanupCompletedTasks() { if mq.persistence != nil { @@ -128,10 +72,24 @@ func (mq *MaintenanceQueue) cleanupCompletedTasks() { } } +const MaxTasksPerType = 100 + // AddTask adds a new maintenance task to the queue with deduplication func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { mq.mutex.Lock() + // Enforce per-type capacity limit (only counting active tasks) + if mq.countActiveTasksByType(task.Type) >= MaxTasksPerType { + // Purge terminal tasks first, then recheck + mq.purgeTerminalTasksLocked() + if mq.countActiveTasksByType(task.Type) >= MaxTasksPerType { + mq.mutex.Unlock() + glog.V(1).Infof("Task skipped (type %s at capacity %d): volume %d on %s", + task.Type, MaxTasksPerType, task.VolumeID, task.Server) + return + } + } + // Enforce one queued/active task per volume (across all task types). if mq.hasQueuedOrActiveTaskForVolume(task.VolumeID) { mq.mutex.Unlock() @@ -197,6 +155,30 @@ func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { taskSnapshot.ID, taskSnapshot.Type, taskSnapshot.VolumeID, taskSnapshot.Server, taskSnapshot.Priority, scheduleInfo, taskSnapshot.Reason) } +// countActiveTasksByType returns the number of active (non-terminal) tasks of a given type. Caller must hold mq.mutex. +func (mq *MaintenanceQueue) countActiveTasksByType(taskType MaintenanceTaskType) int { + count := 0 + for _, t := range mq.tasks { + if t.Type == taskType { + switch t.Status { + case TaskStatusPending, TaskStatusAssigned, TaskStatusInProgress: + count++ + } + } + } + return count +} + +// purgeTerminalTasksLocked removes terminal tasks from the in-memory map. Caller must hold mq.mutex. +func (mq *MaintenanceQueue) purgeTerminalTasksLocked() { + for id, task := range mq.tasks { + switch task.Status { + case TaskStatusCompleted, TaskStatusFailed, TaskStatusCancelled: + delete(mq.tasks, id) + } + } +} + // hasQueuedOrActiveTaskForVolume checks if any pending/assigned/in-progress task already exists for this volume. // Caller must hold mq.mutex. func (mq *MaintenanceQueue) hasQueuedOrActiveTaskForVolume(volumeID uint32) bool { @@ -273,6 +255,9 @@ func (mq *MaintenanceQueue) CancelPendingTasksByType(taskType MaintenanceTaskTyp // AddTasksFromResults converts detection results to tasks and adds them to the queue func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult) { + // Purge terminal tasks from memory before adding new ones + mq.purgeTerminalTasks() + for _, result := range results { // Validate that task has proper typed parameters if result.TypedParams == nil { @@ -297,6 +282,21 @@ func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult) } } +// purgeTerminalTasks removes completed/failed/cancelled tasks from memory. +// Terminal tasks are already deleted from disk by CompleteTask, so this +// only needs to clean up the in-memory map. +func (mq *MaintenanceQueue) purgeTerminalTasks() { + mq.mutex.Lock() + before := len(mq.tasks) + mq.purgeTerminalTasksLocked() + purged := before - len(mq.tasks) + mq.mutex.Unlock() + + if purged > 0 { + glog.V(1).Infof("Purged %d terminal tasks from memory", purged) + } +} + // GetNextTask returns the next available task for a worker func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask { // Use read lock for initial checks and search @@ -570,7 +570,6 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { } } taskStatus := task.Status - taskCount := len(mq.tasks) // Snapshot task state while lock is still held to avoid data race var taskToSaveSnapshot *MaintenanceTask if taskToSave != nil { @@ -578,9 +577,18 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { } mq.mutex.Unlock() - // Save task state to persistence outside the lock + // Only persist non-terminal tasks (retries). Completed/failed tasks stay + // in memory for the UI but are not written to disk — they would just + // accumulate and slow down future startups. if taskToSaveSnapshot != nil { - mq.saveTaskState(taskToSaveSnapshot) + switch taskStatus { + case TaskStatusPending: + // Retry — save so the task survives a restart + mq.saveTaskState(taskToSaveSnapshot) + case TaskStatusCompleted, TaskStatusFailed, TaskStatusCancelled: + // Terminal — delete the file if one exists from a previous state + mq.deleteTaskState(taskToSaveSnapshot.ID) + } } if logFn != nil { @@ -591,13 +599,6 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { if taskStatus != TaskStatusPending { mq.removePendingOperation(taskID) } - - // Periodically cleanup old completed tasks (when total task count is a multiple of 10) - if taskStatus == TaskStatusCompleted { - if taskCount%10 == 0 { - go mq.cleanupCompletedTasks() - } - } } // isNonRetriableError returns true for errors that will never succeed on retry, diff --git a/weed/admin/maintenance/maintenance_queue_test.go b/weed/admin/maintenance/maintenance_queue_test.go index 959addf3f..bd8fe6839 100644 --- a/weed/admin/maintenance/maintenance_queue_test.go +++ b/weed/admin/maintenance/maintenance_queue_test.go @@ -693,10 +693,11 @@ func (m *MockPersistence) SaveTaskState(task *MaintenanceTask) error func (m *MockPersistence) LoadTaskState(taskID string) (*MaintenanceTask, error) { return nil, nil } func (m *MockPersistence) LoadAllTaskStates() ([]*MaintenanceTask, error) { return m.tasks, nil } func (m *MockPersistence) DeleteTaskState(taskID string) error { return nil } +func (m *MockPersistence) DeleteAllTaskStates() error { return nil } func (m *MockPersistence) CleanupCompletedTasks() error { return nil } func (m *MockPersistence) SaveTaskPolicy(taskType string, policy *TaskPolicy) error { return nil } -func TestMaintenanceQueue_LoadTasksCapacitySync(t *testing.T) { +func TestMaintenanceQueue_LoadTasksStartsEmpty(t *testing.T) { // Setup policy := &MaintenancePolicy{ TaskPolicies: map[string]*worker_pb.TaskPolicy{ @@ -704,56 +705,25 @@ func TestMaintenanceQueue_LoadTasksCapacitySync(t *testing.T) { }, } mq := NewMaintenanceQueue(policy) - integration := NewMaintenanceIntegration(mq, policy) - mq.SetIntegration(integration) - at := integration.GetActiveTopology() - topologyInfo := &master_pb.TopologyInfo{ - DataCenterInfos: []*master_pb.DataCenterInfo{ - { - Id: "dc1", - RackInfos: []*master_pb.RackInfo{ - { - Id: "rack1", - DataNodeInfos: []*master_pb.DataNodeInfo{ - { - Id: "server1", - DiskInfos: map[string]*master_pb.DiskInfo{ - "hdd1": {DiskId: 1, VolumeCount: 1, MaxVolumeCount: 10}, - "hdd2": {DiskId: 2, VolumeCount: 0, MaxVolumeCount: 10}, - }, - }, - }, - }, - }, - }, - }, - } - at.UpdateTopology(topologyInfo) - - // Setup mock persistence with a pending task - taskID := "load_test_123" + // Setup mock persistence with tasks — these should NOT be loaded mockTask := &MaintenanceTask{ - ID: taskID, + ID: "old_task_123", Type: "balance", Status: TaskStatusPending, - TypedParams: &worker_pb.TaskParams{ - TaskId: taskID, - Sources: []*worker_pb.TaskSource{{Node: "server1", DiskId: 1}}, - Targets: []*worker_pb.TaskTarget{{Node: "server1", DiskId: 2}}, - }, } mq.SetPersistence(&MockPersistence{tasks: []*MaintenanceTask{mockTask}}) - // Load tasks + // LoadTasksFromPersistence should be a no-op — scanner will re-detect err := mq.LoadTasksFromPersistence() if err != nil { - t.Fatalf("Failed to load tasks: %v", err) + t.Fatalf("LoadTasksFromPersistence failed: %v", err) } - // Verify capacity is reserved in ActiveTopology after loading (9 left) - if at.GetEffectiveAvailableCapacity("server1", 2) != 9 { - t.Errorf("Expected capacity 9 after loading tasks, got %d", at.GetEffectiveAvailableCapacity("server1", 2)) + // Queue should be empty — tasks will be re-detected by scanner + stats := mq.GetStats() + if stats.TotalTasks != 0 { + t.Errorf("Expected 0 tasks after startup, got %d", stats.TotalTasks) } } diff --git a/weed/admin/maintenance/maintenance_scanner.go b/weed/admin/maintenance/maintenance_scanner.go index ddbf44f55..df5a3f065 100644 --- a/weed/admin/maintenance/maintenance_scanner.go +++ b/weed/admin/maintenance/maintenance_scanner.go @@ -32,21 +32,18 @@ func NewMaintenanceScanner(adminClient AdminClient, policy *MaintenancePolicy, q // ScanForMaintenanceTasks analyzes the cluster and generates maintenance tasks func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult, error) { - // Get volume health metrics - volumeMetrics, err := ms.getVolumeHealthMetrics() + // Get volume health metrics directly in task-system format, along with topology info + taskMetrics, topologyInfo, err := ms.getVolumeHealthMetrics() if err != nil { return nil, fmt.Errorf("failed to get volume health metrics: %w", err) } // Use task system for all task types if ms.integration != nil { - // Convert metrics to task system format - taskMetrics := ms.convertToTaskMetrics(volumeMetrics) - // Update topology information for complete cluster view (including empty servers) // This must happen before task detection to ensure EC placement can consider all servers - if ms.lastTopologyInfo != nil { - if err := ms.integration.UpdateTopologyInfo(ms.lastTopologyInfo); err != nil { + if topologyInfo != nil { + if err := ms.integration.UpdateTopologyInfo(topologyInfo); err != nil { glog.Errorf("Failed to update topology info for empty servers: %v", err) // Don't fail the scan - continue with just volume-bearing servers } else { @@ -70,9 +67,12 @@ func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult, return []*TaskDetectionResult{}, nil } -// getVolumeHealthMetrics collects health information for all volumes -func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, error) { - var metrics []*VolumeHealthMetrics +// getVolumeHealthMetrics collects health information for all volumes. +// Returns metrics in task-system format directly (no intermediate copy) and +// the topology info for updating the active topology. +func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*types.VolumeHealthMetrics, *master_pb.TopologyInfo, error) { + var metrics []*types.VolumeHealthMetrics + var topologyInfo *master_pb.TopologyInfo glog.V(1).Infof("Collecting volume health metrics from master") err := ms.adminClient.WithMasterClient(func(client master_pb.SeaweedClient) error { @@ -89,30 +89,28 @@ func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, volumeSizeLimitBytes := uint64(resp.VolumeSizeLimitMb) * 1024 * 1024 // Convert MB to bytes - // Track all nodes discovered in topology - var allNodesInTopology []string - var nodesWithVolumes []string - var nodesWithoutVolumes []string + // Track node counts for summary logging (avoid accumulating full ID slices) + var totalNodes, nodesWithVolumes, nodesWithoutVolumes int for _, dc := range resp.TopologyInfo.DataCenterInfos { - glog.V(2).Infof("Processing datacenter: %s", dc.Id) + glog.V(3).Infof("Processing datacenter: %s", dc.Id) for _, rack := range dc.RackInfos { - glog.V(2).Infof("Processing rack: %s in datacenter: %s", rack.Id, dc.Id) + glog.V(3).Infof("Processing rack: %s in datacenter: %s", rack.Id, dc.Id) for _, node := range rack.DataNodeInfos { - allNodesInTopology = append(allNodesInTopology, node.Id) - glog.V(2).Infof("Found volume server in topology: %s (disks: %d)", node.Id, len(node.DiskInfos)) + totalNodes++ + glog.V(3).Infof("Found volume server in topology: %s (disks: %d)", node.Id, len(node.DiskInfos)) hasVolumes := false // Process each disk on this node for diskType, diskInfo := range node.DiskInfos { if len(diskInfo.VolumeInfos) > 0 { hasVolumes = true - glog.V(2).Infof("Volume server %s disk %s has %d volumes", node.Id, diskType, len(diskInfo.VolumeInfos)) + glog.V(3).Infof("Volume server %s disk %s has %d volumes", node.Id, diskType, len(diskInfo.VolumeInfos)) } // Process volumes on this specific disk for _, volInfo := range diskInfo.VolumeInfos { - metric := &VolumeHealthMetrics{ + metric := &types.VolumeHealthMetrics{ VolumeID: volInfo.Id, Server: node.Id, ServerAddress: node.Address, @@ -138,7 +136,7 @@ func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, } metric.Age = time.Since(metric.LastModified) - glog.V(3).Infof("Volume %d on %s:%s (ID %d): size=%d, limit=%d, fullness=%.2f", + glog.V(4).Infof("Volume %d on %s:%s (ID %d): size=%d, limit=%d, fullness=%.2f", metric.VolumeID, metric.Server, metric.DiskType, metric.DiskId, metric.Size, volumeSizeLimitBytes, metric.FullnessRatio) metrics = append(metrics, metric) @@ -146,29 +144,27 @@ func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, } if hasVolumes { - nodesWithVolumes = append(nodesWithVolumes, node.Id) + nodesWithVolumes++ } else { - nodesWithoutVolumes = append(nodesWithoutVolumes, node.Id) + nodesWithoutVolumes++ glog.V(1).Infof("Volume server %s found in topology but has no volumes", node.Id) } } } } - glog.Infof("Topology discovery complete:") - glog.Infof(" - Total volume servers in topology: %d (%v)", len(allNodesInTopology), allNodesInTopology) - glog.Infof(" - Volume servers with volumes: %d (%v)", len(nodesWithVolumes), nodesWithVolumes) - glog.Infof(" - Volume servers without volumes: %d (%v)", len(nodesWithoutVolumes), nodesWithoutVolumes) + glog.Infof("Topology discovery: %d volume servers (%d with volumes, %d without)", + totalNodes, nodesWithVolumes, nodesWithoutVolumes) - // Store topology info for volume shard tracker - ms.lastTopologyInfo = resp.TopologyInfo + // Return topology info as a local value (not retained on the scanner struct) + topologyInfo = resp.TopologyInfo return nil }) if err != nil { glog.Errorf("Failed to get volume health metrics: %v", err) - return nil, err + return nil, nil, err } glog.V(1).Infof("Successfully collected metrics for %d actual volumes with disk ID information", len(metrics)) @@ -176,13 +172,13 @@ func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, // Count actual replicas and identify EC volumes ms.enrichVolumeMetrics(metrics) - return metrics, nil + return metrics, topologyInfo, nil } // enrichVolumeMetrics adds additional information like replica counts -func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*VolumeHealthMetrics) { +func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*types.VolumeHealthMetrics) { // Group volumes by ID to count replicas - volumeGroups := make(map[uint32][]*VolumeHealthMetrics) + volumeGroups := make(map[uint32][]*types.VolumeHealthMetrics) for _, metric := range metrics { volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric) } @@ -193,41 +189,9 @@ func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*VolumeHealthMetrics for _, replica := range replicas { replica.ReplicaCount = replicaCount } - glog.V(3).Infof("Volume %d has %d replicas", volumeID, replicaCount) + glog.V(4).Infof("Volume %d has %d replicas", volumeID, replicaCount) } // TODO: Identify EC volumes by checking volume structure // This would require querying volume servers for EC shard information } - -// convertToTaskMetrics converts existing volume metrics to task system format -func (ms *MaintenanceScanner) convertToTaskMetrics(metrics []*VolumeHealthMetrics) []*types.VolumeHealthMetrics { - var simplified []*types.VolumeHealthMetrics - - for _, metric := range metrics { - simplified = append(simplified, &types.VolumeHealthMetrics{ - VolumeID: metric.VolumeID, - Server: metric.Server, - ServerAddress: metric.ServerAddress, - DiskType: metric.DiskType, - DiskId: metric.DiskId, - DataCenter: metric.DataCenter, - Rack: metric.Rack, - Collection: metric.Collection, - Size: metric.Size, - DeletedBytes: metric.DeletedBytes, - GarbageRatio: metric.GarbageRatio, - LastModified: metric.LastModified, - Age: metric.Age, - ReplicaCount: metric.ReplicaCount, - ExpectedReplicas: metric.ExpectedReplicas, - IsReadOnly: metric.IsReadOnly, - HasRemoteCopy: metric.HasRemoteCopy, - IsECVolume: metric.IsECVolume, - FullnessRatio: metric.FullnessRatio, - }) - } - - glog.V(2).Infof("Converted %d volume metrics with disk ID information for task detection", len(simplified)) - return simplified -} diff --git a/weed/admin/maintenance/maintenance_types.go b/weed/admin/maintenance/maintenance_types.go index bb8f0a737..811893953 100644 --- a/weed/admin/maintenance/maintenance_types.go +++ b/weed/admin/maintenance/maintenance_types.go @@ -130,6 +130,7 @@ type TaskPersistence interface { LoadTaskState(taskID string) (*MaintenanceTask, error) LoadAllTaskStates() ([]*MaintenanceTask, error) DeleteTaskState(taskID string) error + DeleteAllTaskStates() error CleanupCompletedTasks() error // Policy persistence @@ -206,12 +207,11 @@ type MaintenanceQueue struct { // MaintenanceScanner analyzes the cluster and generates maintenance tasks type MaintenanceScanner struct { - adminClient AdminClient - policy *MaintenancePolicy - queue *MaintenanceQueue - lastScan map[MaintenanceTaskType]time.Time - integration *MaintenanceIntegration - lastTopologyInfo *master_pb.TopologyInfo + adminClient AdminClient + policy *MaintenancePolicy + queue *MaintenanceQueue + lastScan map[MaintenanceTaskType]time.Time + integration *MaintenanceIntegration } // TaskDetectionResult represents the result of scanning for maintenance needs diff --git a/weed/admin/topology/topology_management.go b/weed/admin/topology/topology_management.go index 1343deab8..8073ae3da 100644 --- a/weed/admin/topology/topology_management.go +++ b/weed/admin/topology/topology_management.go @@ -87,7 +87,7 @@ func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) e } diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId) - glog.V(2).Infof("UpdateTopology: adding disk key=%q nodeId=%q diskId=%d diskType=%q address=%q grpcPort=%d volumes=%d maxVolumes=%d", + glog.V(3).Infof("UpdateTopology: adding disk key=%q nodeId=%q diskId=%d diskType=%q address=%q grpcPort=%d volumes=%d maxVolumes=%d", diskKey, nodeInfo.Id, diskInfo.DiskId, diskType, nodeInfo.Address, nodeInfo.GrpcPort, diskInfo.VolumeCount, diskInfo.MaxVolumeCount) node.disks[diskInfo.DiskId] = disk at.disks[diskKey] = disk