admin: fix capacity leak in maintenance system by preserving Task IDs (#8214)

* admin: fix capacity leak in maintenance system by preserving Task IDs

Preserve the original TaskID generated during detection and sync task
states (Assign/Complete/Retry) with ActiveTopology. This ensures that
capacity reserved during task assignment is properly released when a
task completes or fails, preventing 'need 9, have 0' capacity exhaustion.

Fixes https://github.com/seaweedfs/seaweedfs/issues/8202

* Update weed/admin/maintenance/maintenance_queue.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/admin/maintenance/maintenance_queue.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* test: rename ActiveTopologySync to TaskIDPreservation

Rename the test case to more accurately reflect its scope, as suggested
by the code review bot.

* Add TestMaintenanceQueue_ActiveTopologySync to verify task state synchronization and capacity management

* Implement task assignment rollback and add verification test

* Enhance ActiveTopology.CompleteTask to support pending tasks

* Populate storage impact in MaintenanceIntegration.SyncTask

* Release capacity in RemoveStaleWorkers when worker becomes unavailable

* Release capacity in MaintenanceManager.CancelTask when pending task is cancelled

* Sync reloaded tasks with ActiveTopology in LoadTasksFromPersistence

* Add verification tests for consistent capacity management lifecycle

* Add TestMaintenanceQueue_RetryCapacitySync to verify capacity tracking during retries

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
Chris Lu
2026-02-04 20:39:34 -08:00
committed by GitHub
parent 2ecbae3611
commit 19c18d827a
6 changed files with 696 additions and 7 deletions

View File

@@ -2,7 +2,10 @@ package maintenance
import (
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
)
@@ -351,3 +354,618 @@ func TestCanScheduleTaskNow_WithPolicy(t *testing.T) {
t.Errorf("Expected canScheduleTaskNow to return false when at policy limit, got true")
}
}
func TestMaintenanceQueue_TaskIDPreservation(t *testing.T) {
// Setup Policy
policy := &MaintenancePolicy{
TaskPolicies: make(map[string]*worker_pb.TaskPolicy),
GlobalMaxConcurrent: 10,
}
// Setup Queue and Integration
mq := NewMaintenanceQueue(policy)
// We handle the integration manually to avoid complex setup
// integration := NewMaintenanceIntegration(mq, policy)
// mq.SetIntegration(integration)
// 2. Verify ID Preservation in AddTasksFromResults
originalID := "ec_task_123"
results := []*TaskDetectionResult{
{
TaskID: originalID,
TaskType: MaintenanceTaskType("erasure_coding"),
VolumeID: 100,
Server: "server1",
Priority: PriorityNormal,
TypedParams: &worker_pb.TaskParams{},
},
}
mq.AddTasksFromResults(results)
// Verify task exists with correct ID
queuedTask, exists := mq.tasks[originalID]
if !exists {
t.Errorf("Task with original ID %s not found in queue", originalID)
} else {
if queuedTask.ID != originalID {
t.Errorf("Task ID mismatch: expected %s, got %s", originalID, queuedTask.ID)
}
}
// 3. Verify AddTask preserves ID
manualTask := &MaintenanceTask{
ID: "manual_id_456",
Type: MaintenanceTaskType("vacuum"),
Status: TaskStatusPending,
}
mq.AddTask(manualTask)
if manualTask.ID != "manual_id_456" {
t.Errorf("AddTask overwrote ID: expected manual_id_456, got %s", manualTask.ID)
}
}
func TestMaintenanceQueue_ActiveTopologySync(t *testing.T) {
// Setup Policy
policy := &MaintenancePolicy{
TaskPolicies: map[string]*worker_pb.TaskPolicy{
"balance": {MaxConcurrent: 1},
},
GlobalMaxConcurrent: 10,
}
// Setup Queue and Integration
mq := NewMaintenanceQueue(policy)
integration := NewMaintenanceIntegration(mq, policy)
mq.SetIntegration(integration)
// 4. Verify ActiveTopology Synchronization (Assign and Complete)
// Get and Setup Topology
at := integration.GetActiveTopology()
if at == nil {
t.Fatalf("ActiveTopology not found in integration")
}
topologyInfo := &master_pb.TopologyInfo{
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
{
Id: "rack1",
DataNodeInfos: []*master_pb.DataNodeInfo{
{
Id: "server1",
DiskInfos: map[string]*master_pb.DiskInfo{
"hdd": {
DiskId: 1,
VolumeCount: 1,
MaxVolumeCount: 10,
VolumeInfos: []*master_pb.VolumeInformationMessage{
{Id: 100, Collection: "col1"},
},
},
"hdd2": {
DiskId: 2,
VolumeCount: 0,
MaxVolumeCount: 10,
},
},
},
},
},
},
},
},
}
at.UpdateTopology(topologyInfo)
// Add pending task to ActiveTopology
taskID := "sync_test_123"
err := at.AddPendingTask(topology.TaskSpec{
TaskID: taskID,
TaskType: topology.TaskTypeBalance,
VolumeID: 100,
VolumeSize: 1024 * 1024,
Sources: []topology.TaskSourceSpec{
{ServerID: "server1", DiskID: 1},
},
Destinations: []topology.TaskDestinationSpec{
{ServerID: "server1", DiskID: 2},
},
})
if err != nil {
t.Fatalf("Failed to add pending task to ActiveTopology: %v", err)
}
// Add the same task to MaintenanceQueue
mq.AddTask(&MaintenanceTask{
ID: taskID,
Type: MaintenanceTaskType("balance"),
VolumeID: 100,
Server: "server1",
Collection: "col1",
TypedParams: &worker_pb.TaskParams{
TaskId: taskID,
Targets: []*worker_pb.TaskTarget{
{Node: "server1", DiskId: 2},
},
},
})
// Check initial available capacity on destination disk (server1:2)
// server1:2 has MaxVolumeCount=10, VolumeCount=0.
// Capacity should be 9 because AddPendingTask already reserved 1 slot.
capacityBefore := at.GetEffectiveAvailableCapacity("server1", 2)
if capacityBefore != 9 {
t.Errorf("Expected capacity 9 after AddPendingTask, got %d", capacityBefore)
}
// 5. Verify AssignTask (via GetNextTask)
mq.workers["worker1"] = &MaintenanceWorker{
ID: "worker1",
Status: "active",
Capabilities: []MaintenanceTaskType{"balance"},
MaxConcurrent: 10,
}
taskFound := mq.GetNextTask("worker1", []MaintenanceTaskType{"balance"})
if taskFound == nil || taskFound.ID != taskID {
t.Fatalf("Expected to get task %s, got %+v", taskID, taskFound)
}
// Capacity should still be 9 on destination disk (server1:2)
capacityAfterAssign := at.GetEffectiveAvailableCapacity("server1", 2)
if capacityAfterAssign != 9 {
t.Errorf("Capacity should still be 9 after assignment, got %d", capacityAfterAssign)
}
// 6. Verify CompleteTask
mq.CompleteTask(taskID, "")
// Capacity should be released back to 10
capacityAfterComplete := at.GetEffectiveAvailableCapacity("server1", 2)
if capacityAfterComplete != 10 {
t.Errorf("Capacity should have returned to 10 after completion, got %d", capacityAfterComplete)
}
}
func TestMaintenanceQueue_StaleWorkerCapacityRelease(t *testing.T) {
// Setup
policy := &MaintenancePolicy{
TaskPolicies: map[string]*worker_pb.TaskPolicy{
"balance": {MaxConcurrent: 1},
},
}
mq := NewMaintenanceQueue(policy)
integration := NewMaintenanceIntegration(mq, policy)
mq.SetIntegration(integration)
at := integration.GetActiveTopology()
topologyInfo := &master_pb.TopologyInfo{
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
{
Id: "rack1",
DataNodeInfos: []*master_pb.DataNodeInfo{
{
Id: "server1",
DiskInfos: map[string]*master_pb.DiskInfo{
"hdd1": {DiskId: 1, VolumeCount: 1, MaxVolumeCount: 10},
"hdd2": {DiskId: 2, VolumeCount: 0, MaxVolumeCount: 10},
},
},
},
},
},
},
},
}
at.UpdateTopology(topologyInfo)
taskID := "stale_test_123"
at.AddPendingTask(topology.TaskSpec{
TaskID: taskID,
TaskType: topology.TaskTypeBalance,
VolumeID: 100,
VolumeSize: 1024,
Sources: []topology.TaskSourceSpec{{ServerID: "server1", DiskID: 1}},
Destinations: []topology.TaskDestinationSpec{{ServerID: "server1", DiskID: 2}},
})
mq.AddTask(&MaintenanceTask{
ID: taskID,
Type: "balance",
VolumeID: 100,
Server: "server1",
TypedParams: &worker_pb.TaskParams{
TaskId: taskID,
Targets: []*worker_pb.TaskTarget{{Node: "server1", DiskId: 2}},
},
})
mq.workers["worker1"] = &MaintenanceWorker{
ID: "worker1",
Status: "active",
Capabilities: []MaintenanceTaskType{"balance"},
MaxConcurrent: 1,
LastHeartbeat: time.Now(),
}
// Assign task
mq.GetNextTask("worker1", []MaintenanceTaskType{"balance"})
// Verify capacity reserved (9 left)
if at.GetEffectiveAvailableCapacity("server1", 2) != 9 {
t.Errorf("Expected capacity 9, got %d", at.GetEffectiveAvailableCapacity("server1", 2))
}
// Make worker stale
mq.workers["worker1"].LastHeartbeat = time.Now().Add(-1 * time.Hour)
// Remove stale workers
mq.RemoveStaleWorkers(10 * time.Minute)
// Verify capacity released (back to 10)
if at.GetEffectiveAvailableCapacity("server1", 2) != 10 {
t.Errorf("Expected capacity 10 after removing stale worker, got %d", at.GetEffectiveAvailableCapacity("server1", 2))
}
}
func TestMaintenanceManager_CancelTaskCapacityRelease(t *testing.T) {
// Setup Manager
config := DefaultMaintenanceConfig()
mm := NewMaintenanceManager(nil, config)
integration := mm.scanner.integration
mq := mm.queue
at := integration.GetActiveTopology()
topologyInfo := &master_pb.TopologyInfo{
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
{
Id: "rack1",
DataNodeInfos: []*master_pb.DataNodeInfo{
{
Id: "server1",
DiskInfos: map[string]*master_pb.DiskInfo{
"hdd1": {DiskId: 1, VolumeCount: 1, MaxVolumeCount: 10},
"hdd2": {DiskId: 2, VolumeCount: 0, MaxVolumeCount: 10},
},
},
},
},
},
},
},
}
at.UpdateTopology(topologyInfo)
taskID := "cancel_test_123"
// Note: AddPendingTask reserves capacity
at.AddPendingTask(topology.TaskSpec{
TaskID: taskID,
TaskType: topology.TaskTypeBalance,
VolumeID: 100,
VolumeSize: 1024,
Sources: []topology.TaskSourceSpec{{ServerID: "server1", DiskID: 1}},
Destinations: []topology.TaskDestinationSpec{{ServerID: "server1", DiskID: 2}},
})
mq.AddTask(&MaintenanceTask{
ID: taskID,
Type: "balance",
VolumeID: 100,
Server: "server1",
TypedParams: &worker_pb.TaskParams{
TaskId: taskID,
Targets: []*worker_pb.TaskTarget{{Node: "server1", DiskId: 2}},
},
})
// Verify capacity reserved (9 left)
if at.GetEffectiveAvailableCapacity("server1", 2) != 9 {
t.Errorf("Expected capacity 9, got %d", at.GetEffectiveAvailableCapacity("server1", 2))
}
// Cancel task
err := mm.CancelTask(taskID)
if err != nil {
t.Fatalf("Failed to cancel task: %v", err)
}
// Verify capacity released (back to 10)
if at.GetEffectiveAvailableCapacity("server1", 2) != 10 {
t.Errorf("Expected capacity 10 after cancelling task, got %d", at.GetEffectiveAvailableCapacity("server1", 2))
}
}
type MockPersistence struct {
tasks []*MaintenanceTask
}
func (m *MockPersistence) SaveTaskState(task *MaintenanceTask) error { return nil }
func (m *MockPersistence) LoadTaskState(taskID string) (*MaintenanceTask, error) { return nil, nil }
func (m *MockPersistence) LoadAllTaskStates() ([]*MaintenanceTask, error) { return m.tasks, nil }
func (m *MockPersistence) DeleteTaskState(taskID string) error { return nil }
func (m *MockPersistence) CleanupCompletedTasks() error { return nil }
func (m *MockPersistence) SaveTaskPolicy(taskType string, policy *TaskPolicy) error { return nil }
func TestMaintenanceQueue_LoadTasksCapacitySync(t *testing.T) {
// Setup
policy := &MaintenancePolicy{
TaskPolicies: map[string]*worker_pb.TaskPolicy{
"balance": {MaxConcurrent: 1},
},
}
mq := NewMaintenanceQueue(policy)
integration := NewMaintenanceIntegration(mq, policy)
mq.SetIntegration(integration)
at := integration.GetActiveTopology()
topologyInfo := &master_pb.TopologyInfo{
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
{
Id: "rack1",
DataNodeInfos: []*master_pb.DataNodeInfo{
{
Id: "server1",
DiskInfos: map[string]*master_pb.DiskInfo{
"hdd1": {DiskId: 1, VolumeCount: 1, MaxVolumeCount: 10},
"hdd2": {DiskId: 2, VolumeCount: 0, MaxVolumeCount: 10},
},
},
},
},
},
},
},
}
at.UpdateTopology(topologyInfo)
// Setup mock persistence with a pending task
taskID := "load_test_123"
mockTask := &MaintenanceTask{
ID: taskID,
Type: "balance",
Status: TaskStatusPending,
TypedParams: &worker_pb.TaskParams{
TaskId: taskID,
Sources: []*worker_pb.TaskSource{{Node: "server1", DiskId: 1}},
Targets: []*worker_pb.TaskTarget{{Node: "server1", DiskId: 2}},
},
}
mq.SetPersistence(&MockPersistence{tasks: []*MaintenanceTask{mockTask}})
// Load tasks
err := mq.LoadTasksFromPersistence()
if err != nil {
t.Fatalf("Failed to load tasks: %v", err)
}
// Verify capacity is reserved in ActiveTopology after loading (9 left)
if at.GetEffectiveAvailableCapacity("server1", 2) != 9 {
t.Errorf("Expected capacity 9 after loading tasks, got %d", at.GetEffectiveAvailableCapacity("server1", 2))
}
}
func TestMaintenanceQueue_RetryCapacitySync(t *testing.T) {
// Setup
policy := &MaintenancePolicy{
TaskPolicies: map[string]*worker_pb.TaskPolicy{
"balance": {MaxConcurrent: 1},
},
}
mq := NewMaintenanceQueue(policy)
integration := NewMaintenanceIntegration(mq, policy)
mq.SetIntegration(integration)
at := integration.GetActiveTopology()
topologyInfo := &master_pb.TopologyInfo{
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
{
Id: "rack1",
DataNodeInfos: []*master_pb.DataNodeInfo{
{
Id: "server1",
DiskInfos: map[string]*master_pb.DiskInfo{
"hdd1": {DiskId: 1, VolumeCount: 1, MaxVolumeCount: 10},
"hdd2": {DiskId: 2, VolumeCount: 0, MaxVolumeCount: 10},
},
},
},
},
},
},
},
}
at.UpdateTopology(topologyInfo)
taskID := "retry_test_123"
// 1. Add task
at.AddPendingTask(topology.TaskSpec{
TaskID: taskID,
TaskType: topology.TaskTypeBalance,
VolumeID: 100,
VolumeSize: 1024,
Sources: []topology.TaskSourceSpec{{ServerID: "server1", DiskID: 1}},
Destinations: []topology.TaskDestinationSpec{{ServerID: "server1", DiskID: 2}},
})
mq.AddTask(&MaintenanceTask{
ID: taskID,
Type: "balance",
VolumeID: 100,
Server: "server1",
MaxRetries: 3,
TypedParams: &worker_pb.TaskParams{
TaskId: taskID,
Sources: []*worker_pb.TaskSource{{Node: "server1", DiskId: 1}},
Targets: []*worker_pb.TaskTarget{{Node: "server1", DiskId: 2}},
},
})
mq.workers["worker1"] = &MaintenanceWorker{
ID: "worker1",
Status: "active",
Capabilities: []MaintenanceTaskType{"balance"},
MaxConcurrent: 1,
LastHeartbeat: time.Now(),
}
// 2. Assign task
mq.GetNextTask("worker1", []MaintenanceTaskType{"balance"})
// Verify capacity reserved (9 left)
if at.GetEffectiveAvailableCapacity("server1", 2) != 9 {
t.Errorf("Initial assignment: Expected capacity 9, got %d", at.GetEffectiveAvailableCapacity("server1", 2))
}
// 3. Complete with error (trigger retry)
mq.CompleteTask(taskID, "simulated failure")
// 4. Verify state after failure
task := mq.tasks[taskID]
if task.Status != TaskStatusPending {
t.Errorf("Expected status pending for retry, got %v", task.Status)
}
if task.RetryCount != 1 {
t.Errorf("Expected retry count 1, got %d", task.RetryCount)
}
// 5. Verify capacity in ActiveTopology
// It should first release (back to 10) and then re-reserve (SyncTask) because it's pending again.
// So it should still be 9.
if at.GetEffectiveAvailableCapacity("server1", 2) != 9 {
t.Errorf("After retry sync: Expected capacity 9, got %d", at.GetEffectiveAvailableCapacity("server1", 2))
}
}
func TestMaintenanceQueue_AssignTaskRollback(t *testing.T) {
// Setup Policy
policy := &MaintenancePolicy{
TaskPolicies: map[string]*worker_pb.TaskPolicy{
"balance": {MaxConcurrent: 1},
},
GlobalMaxConcurrent: 10,
}
// Setup Queue and Integration
mq := NewMaintenanceQueue(policy)
integration := NewMaintenanceIntegration(mq, policy)
mq.SetIntegration(integration)
// Get Topology
at := integration.GetActiveTopology()
topologyInfo := &master_pb.TopologyInfo{
DataCenterInfos: []*master_pb.DataCenterInfo{
{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{
{
Id: "rack1",
DataNodeInfos: []*master_pb.DataNodeInfo{
{
Id: "server1",
DiskInfos: map[string]*master_pb.DiskInfo{
"hdd": {
DiskId: 1,
VolumeCount: 1,
MaxVolumeCount: 1, // Only 1 slot
VolumeInfos: []*master_pb.VolumeInformationMessage{
{Id: 100, Collection: "col1"},
},
},
"hdd2": {
DiskId: 2,
VolumeCount: 0,
MaxVolumeCount: 0, // NO CAPACITY for target
},
},
},
},
},
},
},
},
}
at.UpdateTopology(topologyInfo)
taskID := "rollback_test_123"
// 1. Add task to MaintenanceQueue ONLY
// It's not in ActiveTopology, so AssignTask will fail with "pending task not found"
mq.AddTask(&MaintenanceTask{
ID: taskID,
Type: MaintenanceTaskType("balance"),
VolumeID: 100,
Server: "server1",
Collection: "col1",
TypedParams: &worker_pb.TaskParams{
TaskId: taskID,
Targets: []*worker_pb.TaskTarget{
{Node: "server1", DiskId: 2},
},
},
})
// 2. Setup worker
mq.workers["worker1"] = &MaintenanceWorker{
ID: "worker1",
Status: "active",
Capabilities: []MaintenanceTaskType{"balance"},
MaxConcurrent: 10,
}
// 3. Try to get next task
taskFound := mq.GetNextTask("worker1", []MaintenanceTaskType{"balance"})
// 4. Verify GetNextTask returned nil due to ActiveTopology.AssignTask failure
if taskFound != nil {
t.Errorf("Expected GetNextTask to return nil, got task %s", taskFound.ID)
}
// 5. Verify the task in MaintenanceQueue is rolled back to pending
mq.mutex.RLock()
task, exists := mq.tasks[taskID]
mq.mutex.RUnlock()
if !exists {
t.Fatalf("Task %s should still exist in MaintenanceQueue", taskID)
}
if task.Status != TaskStatusPending {
t.Errorf("Expected task status %v, got %v", TaskStatusPending, task.Status)
}
if task.WorkerID != "" {
t.Errorf("Expected task WorkerID to be empty, got %s", task.WorkerID)
}
if len(task.AssignmentHistory) != 0 {
t.Errorf("Expected assignment history to be empty, got %d records", len(task.AssignmentHistory))
}
// 6. Verify the task is still in pendingTasks slice
mq.mutex.RLock()
foundInPending := false
for _, pt := range mq.pendingTasks {
if pt.ID == taskID {
foundInPending = true
break
}
}
mq.mutex.RUnlock()
if !foundInPending {
t.Errorf("Task %s should still be in pendingTasks slice", taskID)
}
}