plugin scheduler: run iceberg and lifecycle lanes concurrently (#8821)
* plugin scheduler: run iceberg and lifecycle lanes concurrently The default lane serialises job types under a single admin lock because volume-management operations share global state. Iceberg and lifecycle lanes have no such constraint, so run each of their job types independently in separate goroutines. * Fix concurrent lane scheduler status * plugin scheduler: address review feedback - Extract collectDueJobTypes helper to deduplicate policy loading between locked and concurrent iteration paths. - Use atomic.Bool instead of sync.Mutex for hadJobs in the concurrent path. - Set lane loop state to "busy" before launching concurrent goroutines so the lane is not reported as idle while work runs. - Convert TestLaneRequiresLock to table-driven style. - Add TestRunLaneSchedulerIterationLockBehavior to verify the scheduler acquires the admin lock only for lanes that require it. - Fix flaky TestGetLaneSchedulerStatusShowsActiveConcurrentLaneWork by not starting background scheduler goroutines that race with the direct runJobTypeIteration call.
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
@@ -106,6 +107,14 @@ func (r *Plugin) schedulerLoop() {
|
||||
|
||||
// runLaneSchedulerIteration runs one scheduling pass for a single lane,
|
||||
// processing only the job types assigned to that lane.
|
||||
//
|
||||
// For lanes that require a lock (e.g. LaneDefault), all job types are
|
||||
// processed sequentially under one admin lock because their volume
|
||||
// management operations share global state.
|
||||
//
|
||||
// For lanes that do not require a lock (e.g. LaneIceberg, LaneLifecycle),
|
||||
// each job type runs independently in its own goroutine so they do not
|
||||
// block each other.
|
||||
func (r *Plugin) runLaneSchedulerIteration(ls *schedulerLaneState) bool {
|
||||
r.expireStaleJobs(time.Now().UTC())
|
||||
|
||||
@@ -122,21 +131,23 @@ func (r *Plugin) runLaneSchedulerIteration(ls *schedulerLaneState) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
r.setLaneLoopState(ls, "", "waiting_for_lock")
|
||||
lockName := fmt.Sprintf("plugin scheduler:%s", ls.lane)
|
||||
releaseLock, err := r.acquireAdminLock(lockName)
|
||||
if err != nil {
|
||||
glog.Warningf("Plugin scheduler [%s] failed to acquire lock: %v", ls.lane, err)
|
||||
r.setLaneLoopState(ls, "", "idle")
|
||||
return false
|
||||
}
|
||||
if releaseLock != nil {
|
||||
defer releaseLock()
|
||||
if LaneRequiresLock(ls.lane) {
|
||||
return r.runLaneSchedulerIterationLocked(ls, jobTypes)
|
||||
}
|
||||
return r.runLaneSchedulerIterationConcurrent(ls, jobTypes)
|
||||
}
|
||||
|
||||
active := make(map[string]struct{}, len(jobTypes))
|
||||
hadJobs := false
|
||||
// dueJobType pairs a job type with its resolved scheduling policy.
|
||||
type dueJobType struct {
|
||||
jobType string
|
||||
policy schedulerPolicy
|
||||
}
|
||||
|
||||
// collectDueJobTypes loads policies for all job types in the lane and
|
||||
// returns those whose detection interval has elapsed. It also returns
|
||||
// the full set of active job type names for later pruning.
|
||||
func (r *Plugin) collectDueJobTypes(ls *schedulerLaneState, jobTypes []string) (active map[string]struct{}, due []dueJobType) {
|
||||
active = make(map[string]struct{}, len(jobTypes))
|
||||
for _, jobType := range jobTypes {
|
||||
active[jobType] = struct{}{}
|
||||
|
||||
@@ -156,9 +167,31 @@ func (r *Plugin) runLaneSchedulerIteration(ls *schedulerLaneState) bool {
|
||||
if !r.markDetectionDue(jobType, policy.DetectionInterval, initialDelay) {
|
||||
continue
|
||||
}
|
||||
due = append(due, dueJobType{jobType: jobType, policy: policy})
|
||||
}
|
||||
return active, due
|
||||
}
|
||||
|
||||
detected := r.runJobTypeIteration(jobType, policy)
|
||||
if detected {
|
||||
// runLaneSchedulerIterationLocked processes job types sequentially under a
|
||||
// single admin lock. Used by the default lane where volume management
|
||||
// operations must be serialised.
|
||||
func (r *Plugin) runLaneSchedulerIterationLocked(ls *schedulerLaneState, jobTypes []string) bool {
|
||||
r.setLaneLoopState(ls, "", "waiting_for_lock")
|
||||
lockName := fmt.Sprintf("plugin scheduler:%s", ls.lane)
|
||||
releaseLock, err := r.acquireAdminLock(lockName)
|
||||
if err != nil {
|
||||
glog.Warningf("Plugin scheduler [%s] failed to acquire lock: %v", ls.lane, err)
|
||||
r.setLaneLoopState(ls, "", "idle")
|
||||
return false
|
||||
}
|
||||
if releaseLock != nil {
|
||||
defer releaseLock()
|
||||
}
|
||||
|
||||
active, due := r.collectDueJobTypes(ls, jobTypes)
|
||||
hadJobs := false
|
||||
for _, w := range due {
|
||||
if r.runJobTypeIteration(w.jobType, w.policy) {
|
||||
hadJobs = true
|
||||
}
|
||||
}
|
||||
@@ -169,6 +202,33 @@ func (r *Plugin) runLaneSchedulerIteration(ls *schedulerLaneState) bool {
|
||||
return hadJobs
|
||||
}
|
||||
|
||||
// runLaneSchedulerIterationConcurrent processes each job type in its own
|
||||
// goroutine so they run independently. Used by lanes (e.g. iceberg,
|
||||
// lifecycle) whose job types do not share global state.
|
||||
func (r *Plugin) runLaneSchedulerIterationConcurrent(ls *schedulerLaneState, jobTypes []string) bool {
|
||||
active, due := r.collectDueJobTypes(ls, jobTypes)
|
||||
|
||||
r.setLaneLoopState(ls, "", "busy")
|
||||
|
||||
var hadJobs atomic.Bool
|
||||
var wg sync.WaitGroup
|
||||
for _, w := range due {
|
||||
wg.Add(1)
|
||||
go func(jobType string, policy schedulerPolicy) {
|
||||
defer wg.Done()
|
||||
if r.runJobTypeIteration(jobType, policy) {
|
||||
hadJobs.Store(true)
|
||||
}
|
||||
}(w.jobType, w.policy)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
r.pruneSchedulerState(active)
|
||||
r.pruneDetectorLeases(active)
|
||||
r.setLaneLoopState(ls, "", "idle")
|
||||
return hadJobs.Load()
|
||||
}
|
||||
|
||||
// runSchedulerIteration is kept for backward compatibility. It runs a
|
||||
// single iteration across ALL job types (equivalent to the old single-loop
|
||||
// behavior). It is only used by the legacy schedulerLoop() fallback.
|
||||
@@ -267,7 +327,7 @@ func (r *Plugin) wakeScheduler() {
|
||||
func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) bool {
|
||||
r.recordSchedulerRunStart(jobType)
|
||||
r.clearWaitingJobQueue(jobType)
|
||||
r.setSchedulerLoopState(jobType, "detecting")
|
||||
r.setSchedulerLoopStateForJobType(jobType, "detecting")
|
||||
r.markJobTypeInFlight(jobType)
|
||||
defer r.finishDetection(jobType)
|
||||
|
||||
@@ -399,7 +459,7 @@ func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) boo
|
||||
return detected
|
||||
}
|
||||
|
||||
r.setSchedulerLoopState(jobType, "executing")
|
||||
r.setSchedulerLoopStateForJobType(jobType, "executing")
|
||||
|
||||
// Scan proposals for the maximum estimated_runtime_seconds so the
|
||||
// execution phase gets enough time for large jobs (e.g. vacuum on
|
||||
|
||||
@@ -3,6 +3,7 @@ package plugin
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -594,3 +595,81 @@ func TestPickDetectorReassignsWhenLeaseIsStale(t *testing.T) {
|
||||
t.Fatalf("expected detector lease to be updated to worker-a, got=%s", lease)
|
||||
}
|
||||
}
|
||||
|
||||
// trackingLockManager records whether Acquire was called and how many times.
|
||||
type trackingLockManager struct {
|
||||
mu sync.Mutex
|
||||
acquired int
|
||||
}
|
||||
|
||||
func (m *trackingLockManager) Acquire(reason string) (func(), error) {
|
||||
m.mu.Lock()
|
||||
m.acquired++
|
||||
m.mu.Unlock()
|
||||
return func() {}, nil
|
||||
}
|
||||
|
||||
func (m *trackingLockManager) count() int {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return m.acquired
|
||||
}
|
||||
|
||||
func TestRunLaneSchedulerIterationLockBehavior(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
lane SchedulerLane
|
||||
jobType string
|
||||
wantLock bool
|
||||
}{
|
||||
{"Default", LaneDefault, "vacuum", true},
|
||||
{"Iceberg", LaneIceberg, "iceberg_maintenance", false},
|
||||
{"Lifecycle", LaneLifecycle, "s3_lifecycle", false},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
lm := &trackingLockManager{}
|
||||
pluginSvc, err := New(Options{
|
||||
LockManager: lm,
|
||||
ClusterContextProvider: func(context.Context) (*plugin_pb.ClusterContext, error) {
|
||||
return &plugin_pb.ClusterContext{}, nil
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New: %v", err)
|
||||
}
|
||||
defer pluginSvc.Shutdown()
|
||||
|
||||
// Register a detectable worker for the job type.
|
||||
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
|
||||
WorkerId: "worker-a",
|
||||
Capabilities: []*plugin_pb.JobTypeCapability{
|
||||
{JobType: tt.jobType, CanDetect: true},
|
||||
},
|
||||
})
|
||||
|
||||
// Enable the job type so the scheduler picks it up.
|
||||
err = pluginSvc.SaveJobTypeConfig(&plugin_pb.PersistedJobTypeConfig{
|
||||
JobType: tt.jobType,
|
||||
AdminRuntime: &plugin_pb.AdminRuntimeConfig{
|
||||
Enabled: true,
|
||||
DetectionIntervalSeconds: 1,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("SaveJobTypeConfig: %v", err)
|
||||
}
|
||||
|
||||
ls := pluginSvc.lanes[tt.lane]
|
||||
pluginSvc.runLaneSchedulerIteration(ls)
|
||||
|
||||
if got := lm.count(); (got > 0) != tt.wantLock {
|
||||
t.Errorf("lock acquired %d times, wantLock=%v", got, tt.wantLock)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,6 +38,25 @@ var laneIdleSleep = map[SchedulerLane]time.Duration{
|
||||
LaneLifecycle: 5 * time.Minute,
|
||||
}
|
||||
|
||||
// laneRequiresLock maps each lane to whether its job types must be
|
||||
// serialised under a single admin lock. The default lane needs this
|
||||
// because volume-management operations share global state. Other
|
||||
// lanes run each job type independently.
|
||||
var laneRequiresLock = map[SchedulerLane]bool{
|
||||
LaneDefault: true,
|
||||
LaneIceberg: false,
|
||||
LaneLifecycle: false,
|
||||
}
|
||||
|
||||
// LaneRequiresLock returns true if the given lane needs a single admin
|
||||
// lock to serialise its job types. Unknown lanes default to true.
|
||||
func LaneRequiresLock(lane SchedulerLane) bool {
|
||||
if v, ok := laneRequiresLock[lane]; ok {
|
||||
return v
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// LaneIdleSleep returns the idle sleep duration for the given lane,
|
||||
// falling back to defaultSchedulerIdleSleep if the lane is unknown.
|
||||
func LaneIdleSleep(lane SchedulerLane) time.Duration {
|
||||
|
||||
@@ -27,6 +27,26 @@ func TestAllLanesHaveIdleSleep(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLaneRequiresLock(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
lane SchedulerLane
|
||||
want bool
|
||||
}{
|
||||
{"Default", LaneDefault, true},
|
||||
{"Iceberg", LaneIceberg, false},
|
||||
{"Lifecycle", LaneLifecycle, false},
|
||||
{"Unknown", "unknown_lane", true},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := LaneRequiresLock(tt.lane); got != tt.want {
|
||||
t.Errorf("LaneRequiresLock(%q) = %v, want %v", tt.lane, got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestKnownJobTypesInMap(t *testing.T) {
|
||||
// Ensure the well-known job types are mapped. This catches drift
|
||||
// if a handler's job type string changes without updating the map.
|
||||
|
||||
@@ -195,6 +195,21 @@ func (r *Plugin) setSchedulerLoopState(jobType, phase string) {
|
||||
r.schedulerLoopMu.Unlock()
|
||||
}
|
||||
|
||||
// setSchedulerLoopStateForJobType keeps the aggregate scheduler state and the
|
||||
// owning lane state in sync while a specific job type is active.
|
||||
func (r *Plugin) setSchedulerLoopStateForJobType(jobType, phase string) {
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
r.setSchedulerLoopState(jobType, phase)
|
||||
if jobType == "" {
|
||||
return
|
||||
}
|
||||
if ls := r.lanes[JobTypeLane(jobType)]; ls != nil {
|
||||
r.setLaneLoopState(ls, jobType, phase)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Plugin) recordSchedulerIterationComplete(hadJobs bool) {
|
||||
if r == nil {
|
||||
return
|
||||
@@ -251,7 +266,6 @@ func (r *Plugin) aggregateLaneLoopStates() schedulerLoopState {
|
||||
return agg
|
||||
}
|
||||
|
||||
|
||||
// --- Per-lane loop state helpers ---
|
||||
|
||||
func (r *Plugin) setLaneLoopState(ls *schedulerLaneState, jobType, phase string) {
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
||||
)
|
||||
@@ -62,3 +64,84 @@ func TestGetSchedulerStatusIncludesLastDetectionCount(t *testing.T) {
|
||||
t.Fatalf("expected job type status for %s", jobType)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetLaneSchedulerStatusShowsActiveConcurrentLaneWork(t *testing.T) {
|
||||
clusterContextStarted := make(chan struct{})
|
||||
releaseClusterContext := make(chan struct{})
|
||||
|
||||
// Create the Plugin without a ClusterContextProvider so no background
|
||||
// scheduler goroutines are started; they would race with the direct
|
||||
// runJobTypeIteration call below.
|
||||
pluginSvc, err := New(Options{})
|
||||
if err != nil {
|
||||
t.Fatalf("New: %v", err)
|
||||
}
|
||||
defer pluginSvc.Shutdown()
|
||||
|
||||
// Set the provider after construction so runJobTypeIteration can use it.
|
||||
pluginSvc.clusterContextProvider = func(context.Context) (*plugin_pb.ClusterContext, error) {
|
||||
close(clusterContextStarted)
|
||||
<-releaseClusterContext
|
||||
return nil, context.Canceled
|
||||
}
|
||||
|
||||
const jobType = "s3_lifecycle"
|
||||
err = pluginSvc.SaveJobTypeConfig(&plugin_pb.PersistedJobTypeConfig{
|
||||
JobType: jobType,
|
||||
AdminRuntime: &plugin_pb.AdminRuntimeConfig{
|
||||
Enabled: true,
|
||||
DetectionIntervalSeconds: 30,
|
||||
DetectionTimeoutSeconds: 15,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("SaveJobTypeConfig: %v", err)
|
||||
}
|
||||
|
||||
policy, enabled, err := pluginSvc.loadSchedulerPolicy(jobType)
|
||||
if err != nil {
|
||||
t.Fatalf("loadSchedulerPolicy: %v", err)
|
||||
}
|
||||
if !enabled {
|
||||
t.Fatalf("expected enabled policy")
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
pluginSvc.runJobTypeIteration(jobType, policy)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-clusterContextStarted:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("timed out waiting for job type iteration to start")
|
||||
}
|
||||
|
||||
var laneStatus SchedulerStatus
|
||||
var aggregateStatus SchedulerStatus
|
||||
deadline := time.Now().Add(time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
laneStatus = pluginSvc.GetLaneSchedulerStatus(LaneLifecycle)
|
||||
aggregateStatus = pluginSvc.GetSchedulerStatus()
|
||||
if laneStatus.CurrentJobType == jobType && laneStatus.CurrentPhase == "detecting" &&
|
||||
aggregateStatus.CurrentJobType == jobType && aggregateStatus.CurrentPhase == "detecting" {
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
if laneStatus.CurrentJobType != jobType || laneStatus.CurrentPhase != "detecting" {
|
||||
t.Fatalf("unexpected lane status while work is active: job=%q phase=%q", laneStatus.CurrentJobType, laneStatus.CurrentPhase)
|
||||
}
|
||||
if aggregateStatus.CurrentJobType != jobType || aggregateStatus.CurrentPhase != "detecting" {
|
||||
t.Fatalf("unexpected aggregate status while work is active: job=%q phase=%q", aggregateStatus.CurrentJobType, aggregateStatus.CurrentPhase)
|
||||
}
|
||||
|
||||
close(releaseClusterContext)
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("timed out waiting for job type iteration to finish")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user