Prevent overlapping maintenance tasks per volume (#8463)
* Prevent concurrent maintenance tasks per volume * fix panic
This commit is contained in:
@@ -131,6 +131,14 @@ func (mq *MaintenanceQueue) cleanupCompletedTasks() {
|
||||
func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) {
|
||||
mq.mutex.Lock()
|
||||
|
||||
// Enforce one queued/active task per volume (across all task types).
|
||||
if mq.hasQueuedOrActiveTaskForVolume(task.VolumeID) {
|
||||
mq.mutex.Unlock()
|
||||
glog.V(1).Infof("Task skipped (volume busy): %s for volume %d on %s (already queued or running)",
|
||||
task.Type, task.VolumeID, task.Server)
|
||||
return
|
||||
}
|
||||
|
||||
// Check for duplicate tasks (same type + volume + not completed)
|
||||
if mq.hasDuplicateTask(task) {
|
||||
mq.mutex.Unlock()
|
||||
@@ -188,6 +196,25 @@ func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) {
|
||||
taskSnapshot.ID, taskSnapshot.Type, taskSnapshot.VolumeID, taskSnapshot.Server, taskSnapshot.Priority, scheduleInfo, taskSnapshot.Reason)
|
||||
}
|
||||
|
||||
// hasQueuedOrActiveTaskForVolume checks if any pending/assigned/in-progress task already exists for this volume.
|
||||
// Caller must hold mq.mutex.
|
||||
func (mq *MaintenanceQueue) hasQueuedOrActiveTaskForVolume(volumeID uint32) bool {
|
||||
if volumeID == 0 {
|
||||
return false
|
||||
}
|
||||
for _, existingTask := range mq.tasks {
|
||||
if existingTask.VolumeID != volumeID {
|
||||
continue
|
||||
}
|
||||
if existingTask.Status == TaskStatusPending ||
|
||||
existingTask.Status == TaskStatusAssigned ||
|
||||
existingTask.Status == TaskStatusInProgress {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// hasDuplicateTask checks if a similar task already exists (same type, volume, and not completed)
|
||||
func (mq *MaintenanceQueue) hasDuplicateTask(newTask *MaintenanceTask) bool {
|
||||
for _, existingTask := range mq.tasks {
|
||||
@@ -260,6 +287,13 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena
|
||||
continue
|
||||
}
|
||||
|
||||
// Avoid scheduling concurrent operations on the same volume
|
||||
if activeTaskID, activeTaskType, hasActive := mq.activeTaskForVolume(task.VolumeID, task.ID); hasActive {
|
||||
glog.V(2).Infof("Task %s (%s) skipped for worker %s: volume %d is busy with task %s (%s)",
|
||||
task.ID, task.Type, workerID, task.VolumeID, activeTaskID, activeTaskType)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if worker can handle this task type
|
||||
if !mq.workerCanHandle(task.Type, capabilities) {
|
||||
glog.V(3).Infof("Task %s (%s) skipped for worker %s: capability mismatch (worker has: %v)", task.ID, task.Type, workerID, capabilities)
|
||||
@@ -304,6 +338,14 @@ func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []Maintena
|
||||
return nil
|
||||
}
|
||||
|
||||
// Re-check volume conflict after acquiring write lock
|
||||
if activeTaskID, activeTaskType, hasActive := mq.activeTaskForVolume(selectedTask.VolumeID, selectedTaskID); hasActive {
|
||||
mq.mutex.Unlock()
|
||||
glog.V(2).Infof("Task %s no longer available for worker %s: volume %d is busy with task %s (%s)",
|
||||
selectedTaskID, workerID, selectedTask.VolumeID, activeTaskID, activeTaskType)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Record assignment history
|
||||
workerAddress := ""
|
||||
if worker, exists := mq.workers[workerID]; exists {
|
||||
@@ -877,6 +919,28 @@ func (mq *MaintenanceQueue) workerCanHandle(taskType MaintenanceTaskType, capabi
|
||||
return false
|
||||
}
|
||||
|
||||
// activeTaskForVolume returns the active task ID/type for a volume, if any.
|
||||
// Caller must hold mq.mutex (read or write).
|
||||
func (mq *MaintenanceQueue) activeTaskForVolume(volumeID uint32, excludeTaskID string) (string, MaintenanceTaskType, bool) {
|
||||
if volumeID == 0 {
|
||||
return "", "", false
|
||||
}
|
||||
|
||||
for _, task := range mq.tasks {
|
||||
if task.ID == excludeTaskID {
|
||||
continue
|
||||
}
|
||||
if task.VolumeID != volumeID {
|
||||
continue
|
||||
}
|
||||
if task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress {
|
||||
return task.ID, task.Type, true
|
||||
}
|
||||
}
|
||||
|
||||
return "", "", false
|
||||
}
|
||||
|
||||
// canScheduleTaskNow determines if a task can be scheduled using task schedulers or fallback logic
|
||||
func (mq *MaintenanceQueue) canScheduleTaskNow(task *MaintenanceTask) bool {
|
||||
glog.V(2).Infof("Checking if task %s (type: %s) can be scheduled", task.ID, task.Type)
|
||||
|
||||
@@ -969,3 +969,154 @@ func TestMaintenanceQueue_AssignTaskRollback(t *testing.T) {
|
||||
t.Errorf("Task %s should still be in pendingTasks slice", taskID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetNextTask_SkipsVolumeConflictsAcrossTypes(t *testing.T) {
|
||||
policy := &MaintenancePolicy{
|
||||
TaskPolicies: map[string]*worker_pb.TaskPolicy{
|
||||
"balance": {MaxConcurrent: 2},
|
||||
"erasure_coding": {MaxConcurrent: 2},
|
||||
"vacuum": {MaxConcurrent: 2},
|
||||
},
|
||||
}
|
||||
|
||||
mq := NewMaintenanceQueue(policy)
|
||||
|
||||
now := time.Now()
|
||||
mq.AddTask(&MaintenanceTask{
|
||||
ID: "t1",
|
||||
Type: MaintenanceTaskType("balance"),
|
||||
Priority: PriorityHigh,
|
||||
VolumeID: 100,
|
||||
Server: "server1",
|
||||
ScheduledAt: now.Add(-3 * time.Second),
|
||||
})
|
||||
t2 := &MaintenanceTask{
|
||||
ID: "t2",
|
||||
Type: MaintenanceTaskType("erasure_coding"),
|
||||
Priority: PriorityNormal,
|
||||
VolumeID: 100,
|
||||
Server: "server1",
|
||||
Status: TaskStatusPending,
|
||||
ScheduledAt: now.Add(-2 * time.Second),
|
||||
}
|
||||
mq.mutex.Lock()
|
||||
mq.tasks[t2.ID] = t2
|
||||
mq.pendingTasks = append(mq.pendingTasks, t2)
|
||||
mq.mutex.Unlock()
|
||||
mq.AddTask(&MaintenanceTask{
|
||||
ID: "t3",
|
||||
Type: MaintenanceTaskType("vacuum"),
|
||||
Priority: PriorityNormal,
|
||||
VolumeID: 200,
|
||||
Server: "server1",
|
||||
ScheduledAt: now.Add(-1 * time.Second),
|
||||
})
|
||||
|
||||
mq.workers["worker1"] = &MaintenanceWorker{
|
||||
ID: "worker1",
|
||||
Status: "active",
|
||||
Capabilities: []MaintenanceTaskType{"balance", "erasure_coding", "vacuum"},
|
||||
MaxConcurrent: 2,
|
||||
LastHeartbeat: time.Now(),
|
||||
}
|
||||
mq.workers["worker2"] = &MaintenanceWorker{
|
||||
ID: "worker2",
|
||||
Status: "active",
|
||||
Capabilities: []MaintenanceTaskType{"balance", "erasure_coding", "vacuum"},
|
||||
MaxConcurrent: 2,
|
||||
LastHeartbeat: time.Now(),
|
||||
}
|
||||
|
||||
task1 := mq.GetNextTask("worker1", mq.workers["worker1"].Capabilities)
|
||||
if task1 == nil || task1.ID != "t1" {
|
||||
t.Fatalf("Expected first assignment to be t1, got %+v", task1)
|
||||
}
|
||||
|
||||
task2 := mq.GetNextTask("worker2", mq.workers["worker2"].Capabilities)
|
||||
if task2 == nil {
|
||||
t.Fatalf("Expected a second task to be assigned, got nil")
|
||||
}
|
||||
if task2.ID != "t3" {
|
||||
t.Fatalf("Expected second assignment to skip volume 100 and pick t3, got %s", task2.ID)
|
||||
}
|
||||
|
||||
if mq.tasks["t2"].Status != TaskStatusPending {
|
||||
t.Fatalf("Expected t2 to remain pending due to volume conflict, got %s", mq.tasks["t2"].Status)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddTask_OnePendingTaskPerVolume(t *testing.T) {
|
||||
mq := NewMaintenanceQueue(&MaintenancePolicy{
|
||||
TaskPolicies: map[string]*worker_pb.TaskPolicy{
|
||||
"balance": {MaxConcurrent: 1},
|
||||
"erasure_coding": {MaxConcurrent: 1},
|
||||
},
|
||||
})
|
||||
|
||||
mq.AddTask(&MaintenanceTask{
|
||||
ID: "t1",
|
||||
Type: MaintenanceTaskType("balance"),
|
||||
VolumeID: 100,
|
||||
Server: "server1",
|
||||
})
|
||||
mq.AddTask(&MaintenanceTask{
|
||||
ID: "t2",
|
||||
Type: MaintenanceTaskType("erasure_coding"),
|
||||
VolumeID: 100,
|
||||
Server: "server1",
|
||||
})
|
||||
|
||||
mq.mutex.RLock()
|
||||
defer mq.mutex.RUnlock()
|
||||
|
||||
if len(mq.tasks) != 1 {
|
||||
t.Fatalf("Expected 1 task in queue, got %d", len(mq.tasks))
|
||||
}
|
||||
if len(mq.pendingTasks) != 1 {
|
||||
t.Fatalf("Expected 1 pending task, got %d", len(mq.pendingTasks))
|
||||
}
|
||||
if _, exists := mq.tasks["t1"]; !exists {
|
||||
t.Fatalf("Expected task t1 to be queued")
|
||||
}
|
||||
if _, exists := mq.tasks["t2"]; exists {
|
||||
t.Fatalf("Did not expect task t2 to be queued due to pending volume")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddTask_RejectsWhenVolumeHasRunningTask(t *testing.T) {
|
||||
mq := NewMaintenanceQueue(&MaintenancePolicy{
|
||||
TaskPolicies: map[string]*worker_pb.TaskPolicy{
|
||||
"balance": {MaxConcurrent: 1},
|
||||
"erasure_coding": {MaxConcurrent: 1},
|
||||
},
|
||||
})
|
||||
|
||||
mq.AddTask(&MaintenanceTask{
|
||||
ID: "t1",
|
||||
Type: MaintenanceTaskType("balance"),
|
||||
VolumeID: 100,
|
||||
Server: "server1",
|
||||
})
|
||||
|
||||
// Simulate assignment to make it active
|
||||
mq.mutex.Lock()
|
||||
mq.tasks["t1"].Status = TaskStatusInProgress
|
||||
mq.mutex.Unlock()
|
||||
|
||||
mq.AddTask(&MaintenanceTask{
|
||||
ID: "t2",
|
||||
Type: MaintenanceTaskType("erasure_coding"),
|
||||
VolumeID: 100,
|
||||
Server: "server1",
|
||||
})
|
||||
|
||||
mq.mutex.RLock()
|
||||
defer mq.mutex.RUnlock()
|
||||
|
||||
if len(mq.tasks) != 1 {
|
||||
t.Fatalf("Expected 1 task in queue, got %d", len(mq.tasks))
|
||||
}
|
||||
if _, exists := mq.tasks["t2"]; exists {
|
||||
t.Fatalf("Did not expect task t2 to be queued due to active volume task")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user