Plugin scheduler: sequential iterations with max runtime (#8496)

* pb: add job type max runtime setting

* plugin: default job type max runtime

* plugin: redesign scheduler loop

* admin ui: update scheduler settings

* plugin: fix scheduler loop state name

* plugin scheduler: restore backlog skip

* plugin scheduler: drop legacy detection helper

* admin api: require scheduler config body

* admin ui: preserve detection interval on save

* plugin scheduler: use job context and drain cancels

* plugin scheduler: respect detection intervals

* plugin scheduler: gate runs and drain queue

* ec test: reuse req/resp vars

* ec test: add scheduler debug logs

* Adjust scheduler idle sleep and initial run delay

* Clear pending job queue before scheduler runs

* Log next detection time in EC integration test

* Improve plugin scheduler debug logging in EC test

* Expose scheduler next detection time

* Log scheduler next detection time in EC test

* Wake scheduler on config or worker updates

* Expose scheduler sleep interval in UI

* Fix scheduler sleep save value selection

* Set scheduler idle sleep default to 613s

* Show scheduler next run time in plugin UI

---------

Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
Chris Lu
2026-03-03 23:09:49 -08:00
committed by GitHub
parent e1e5b4a8a6
commit 18ccc9b773
19 changed files with 1241 additions and 191 deletions

View File

@@ -13,13 +13,17 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)
var errExecutorAtCapacity = errors.New("executor is at capacity")
var (
errExecutorAtCapacity = errors.New("executor is at capacity")
errSchedulerShutdown = errors.New("scheduler shutdown")
)
const (
defaultSchedulerTick = 5 * time.Second
defaultScheduledDetectionInterval = 300 * time.Second
defaultScheduledDetectionTimeout = 45 * time.Second
defaultScheduledExecutionTimeout = 90 * time.Second
defaultScheduledJobTypeMaxRuntime = 30 * time.Minute
defaultScheduledMaxResults int32 = 1000
defaultScheduledExecutionConcurrency = 1
defaultScheduledPerWorkerConcurrency = 1
@@ -34,6 +38,7 @@ type schedulerPolicy struct {
DetectionInterval time.Duration
DetectionTimeout time.Duration
ExecutionTimeout time.Duration
JobTypeMaxRuntime time.Duration
RetryBackoff time.Duration
MaxResults int32
ExecutionConcurrency int
@@ -44,31 +49,72 @@ type schedulerPolicy struct {
func (r *Plugin) schedulerLoop() {
defer r.wg.Done()
ticker := time.NewTicker(r.schedulerTick)
defer ticker.Stop()
// Try once immediately on startup.
r.runSchedulerTick()
for {
select {
case <-r.shutdownCh:
return
case <-ticker.C:
r.runSchedulerTick()
default:
}
hadJobs := r.runSchedulerIteration()
r.recordSchedulerIterationComplete(hadJobs)
if hadJobs {
continue
}
r.setSchedulerLoopState("", "sleeping")
idleSleep := r.GetSchedulerConfig().IdleSleepDuration()
if nextRun := r.earliestNextDetectionAt(); !nextRun.IsZero() {
if until := time.Until(nextRun); until <= 0 {
idleSleep = 0
} else if until < idleSleep {
idleSleep = until
}
}
if idleSleep <= 0 {
continue
}
timer := time.NewTimer(idleSleep)
select {
case <-r.shutdownCh:
timer.Stop()
return
case <-r.schedulerWakeCh:
if !timer.Stop() {
<-timer.C
}
continue
case <-timer.C:
}
}
}
func (r *Plugin) runSchedulerTick() {
func (r *Plugin) runSchedulerIteration() bool {
r.expireStaleJobs(time.Now().UTC())
jobTypes := r.registry.DetectableJobTypes()
if len(jobTypes) == 0 {
return
r.setSchedulerLoopState("", "idle")
return false
}
r.setSchedulerLoopState("", "waiting_for_lock")
releaseLock, err := r.acquireAdminLock("plugin scheduler iteration")
if err != nil {
glog.Warningf("Plugin scheduler failed to acquire lock: %v", err)
r.setSchedulerLoopState("", "idle")
return false
}
if releaseLock != nil {
defer releaseLock()
}
active := make(map[string]struct{}, len(jobTypes))
schedulerIdleSleep := r.GetSchedulerConfig().IdleSleepDuration()
hadJobs := false
for _, jobType := range jobTypes {
active[jobType] = struct{}{}
@@ -81,20 +127,212 @@ func (r *Plugin) runSchedulerTick() {
r.clearSchedulerJobType(jobType)
continue
}
if !r.markDetectionDue(jobType, policy.DetectionInterval) {
initialDelay := time.Duration(0)
if runInfo := r.snapshotSchedulerRun(jobType); runInfo.lastRunStartedAt.IsZero() {
initialDelay = schedulerIdleSleep / 2
}
if !r.markDetectionDue(jobType, policy.DetectionInterval, initialDelay) {
continue
}
r.wg.Add(1)
go func(jt string, p schedulerPolicy) {
defer r.wg.Done()
r.runScheduledDetection(jt, p)
}(jobType, policy)
detected := r.runJobTypeIteration(jobType, policy)
if detected {
hadJobs = true
}
}
r.pruneSchedulerState(active)
r.pruneDetectorLeases(active)
r.setSchedulerLoopState("", "idle")
return hadJobs
}
func (r *Plugin) wakeScheduler() {
if r == nil {
return
}
select {
case r.schedulerWakeCh <- struct{}{}:
default:
}
}
func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) bool {
r.recordSchedulerRunStart(jobType)
r.clearWaitingJobQueue(jobType)
r.setSchedulerLoopState(jobType, "detecting")
r.markJobTypeInFlight(jobType)
defer r.finishDetection(jobType)
start := time.Now().UTC()
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: "scheduled detection started",
Stage: "detecting",
OccurredAt: timeToPtr(start),
})
if skip, waitingCount, waitingThreshold := r.shouldSkipDetectionForWaitingJobs(jobType, policy); skip {
r.recordSchedulerDetectionSkip(jobType, fmt.Sprintf("waiting backlog %d reached threshold %d", waitingCount, waitingThreshold))
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: fmt.Sprintf("scheduled detection skipped: waiting backlog %d reached threshold %d", waitingCount, waitingThreshold),
Stage: "skipped_waiting_backlog",
OccurredAt: timeToPtr(time.Now().UTC()),
})
r.recordSchedulerRunComplete(jobType, "skipped")
return false
}
maxRuntime := policy.JobTypeMaxRuntime
if maxRuntime <= 0 {
maxRuntime = defaultScheduledJobTypeMaxRuntime
}
jobCtx, cancel := context.WithTimeout(context.Background(), maxRuntime)
defer cancel()
clusterContext, err := r.loadSchedulerClusterContext(jobCtx)
if err != nil {
r.recordSchedulerDetectionError(jobType, err)
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: fmt.Sprintf("scheduled detection aborted: %v", err),
Stage: "failed",
OccurredAt: timeToPtr(time.Now().UTC()),
})
r.recordSchedulerRunComplete(jobType, "error")
return false
}
detectionTimeout := policy.DetectionTimeout
remaining := time.Until(start.Add(maxRuntime))
if remaining <= 0 {
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: "scheduled run timed out before detection",
Stage: "timeout",
OccurredAt: timeToPtr(time.Now().UTC()),
})
r.recordSchedulerRunComplete(jobType, "timeout")
return false
}
if detectionTimeout <= 0 {
detectionTimeout = defaultScheduledDetectionTimeout
}
if detectionTimeout > remaining {
detectionTimeout = remaining
}
detectCtx, cancelDetect := context.WithTimeout(jobCtx, detectionTimeout)
proposals, err := r.RunDetection(detectCtx, jobType, clusterContext, policy.MaxResults)
cancelDetect()
if err != nil {
r.recordSchedulerDetectionError(jobType, err)
stage := "failed"
status := "error"
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
stage = "timeout"
status = "timeout"
}
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: fmt.Sprintf("scheduled detection failed: %v", err),
Stage: stage,
OccurredAt: timeToPtr(time.Now().UTC()),
})
r.recordSchedulerRunComplete(jobType, status)
return false
}
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: fmt.Sprintf("scheduled detection completed: %d proposal(s)", len(proposals)),
Stage: "detected",
OccurredAt: timeToPtr(time.Now().UTC()),
})
r.recordSchedulerDetectionSuccess(jobType, len(proposals))
detected := len(proposals) > 0
filteredByActive, skippedActive := r.filterProposalsWithActiveJobs(jobType, proposals)
if skippedActive > 0 {
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: fmt.Sprintf("scheduled detection skipped %d proposal(s) due to active assigned/running jobs", skippedActive),
Stage: "deduped_active_jobs",
OccurredAt: timeToPtr(time.Now().UTC()),
})
}
if len(filteredByActive) == 0 {
r.recordSchedulerRunComplete(jobType, "success")
return detected
}
filtered := r.filterScheduledProposals(filteredByActive)
if len(filtered) != len(filteredByActive) {
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: fmt.Sprintf("scheduled detection deduped %d proposal(s) within this run", len(filteredByActive)-len(filtered)),
Stage: "deduped",
OccurredAt: timeToPtr(time.Now().UTC()),
})
}
if len(filtered) == 0 {
r.recordSchedulerRunComplete(jobType, "success")
return detected
}
r.setSchedulerLoopState(jobType, "executing")
remaining = time.Until(start.Add(maxRuntime))
if remaining <= 0 {
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: "scheduled execution skipped: job type max runtime reached",
Stage: "timeout",
OccurredAt: timeToPtr(time.Now().UTC()),
})
r.recordSchedulerRunComplete(jobType, "timeout")
return detected
}
execPolicy := policy
if execPolicy.ExecutionTimeout <= 0 {
execPolicy.ExecutionTimeout = defaultScheduledExecutionTimeout
}
if execPolicy.ExecutionTimeout > remaining {
execPolicy.ExecutionTimeout = remaining
}
successCount, errorCount, canceledCount := r.dispatchScheduledProposals(jobCtx, jobType, filtered, clusterContext, execPolicy)
status := "success"
if jobCtx.Err() != nil {
status = "timeout"
} else if errorCount > 0 || canceledCount > 0 {
status = "error"
}
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: fmt.Sprintf("scheduled execution finished: success=%d error=%d canceled=%d", successCount, errorCount, canceledCount),
Stage: "executed",
OccurredAt: timeToPtr(time.Now().UTC()),
})
r.recordSchedulerRunComplete(jobType, status)
return detected
}
func (r *Plugin) loadSchedulerPolicy(jobType string) (schedulerPolicy, bool, error) {
@@ -119,6 +357,7 @@ func (r *Plugin) loadSchedulerPolicy(jobType string) (schedulerPolicy, bool, err
DetectionInterval: durationFromSeconds(adminRuntime.DetectionIntervalSeconds, defaultScheduledDetectionInterval),
DetectionTimeout: durationFromSeconds(adminRuntime.DetectionTimeoutSeconds, defaultScheduledDetectionTimeout),
ExecutionTimeout: defaultScheduledExecutionTimeout,
JobTypeMaxRuntime: durationFromSeconds(adminRuntime.JobTypeMaxRuntimeSeconds, defaultScheduledJobTypeMaxRuntime),
RetryBackoff: durationFromSeconds(adminRuntime.RetryBackoffSeconds, defaultScheduledRetryBackoff),
MaxResults: adminRuntime.MaxJobsPerDetection,
ExecutionConcurrency: int(adminRuntime.GlobalExecutionConcurrency),
@@ -148,6 +387,9 @@ func (r *Plugin) loadSchedulerPolicy(jobType string) (schedulerPolicy, bool, err
if policy.RetryLimit < 0 {
policy.RetryLimit = 0
}
if policy.JobTypeMaxRuntime <= 0 {
policy.JobTypeMaxRuntime = defaultScheduledJobTypeMaxRuntime
}
// Plugin protocol currently has only detection timeout in admin settings.
execTimeout := time.Duration(adminRuntime.DetectionTimeoutSeconds*2) * time.Second
@@ -199,6 +441,7 @@ func (r *Plugin) ListSchedulerStates() ([]SchedulerJobTypeState, error) {
state.DetectionIntervalSeconds = secondsFromDuration(policy.DetectionInterval)
state.DetectionTimeoutSeconds = secondsFromDuration(policy.DetectionTimeout)
state.ExecutionTimeoutSeconds = secondsFromDuration(policy.ExecutionTimeout)
state.JobTypeMaxRuntimeSeconds = secondsFromDuration(policy.JobTypeMaxRuntime)
state.MaxJobsPerDetection = policy.MaxResults
state.GlobalExecutionConcurrency = policy.ExecutionConcurrency
state.PerWorkerExecutionConcurrency = policy.PerWorkerConcurrency
@@ -207,6 +450,19 @@ func (r *Plugin) ListSchedulerStates() ([]SchedulerJobTypeState, error) {
}
}
runInfo := r.snapshotSchedulerRun(jobType)
if !runInfo.lastRunStartedAt.IsZero() {
at := runInfo.lastRunStartedAt
state.LastRunStartedAt = &at
}
if !runInfo.lastRunCompletedAt.IsZero() {
at := runInfo.lastRunCompletedAt
state.LastRunCompletedAt = &at
}
if runInfo.lastRunStatus != "" {
state.LastRunStatus = runInfo.lastRunStatus
}
leasedWorkerID := r.getDetectorLease(jobType)
if leasedWorkerID != "" {
state.DetectorWorkerID = leasedWorkerID
@@ -258,10 +514,11 @@ func deriveSchedulerAdminRuntime(
PerWorkerExecutionConcurrency: defaults.PerWorkerExecutionConcurrency,
RetryLimit: defaults.RetryLimit,
RetryBackoffSeconds: defaults.RetryBackoffSeconds,
JobTypeMaxRuntimeSeconds: defaults.JobTypeMaxRuntimeSeconds,
}
}
func (r *Plugin) markDetectionDue(jobType string, interval time.Duration) bool {
func (r *Plugin) markDetectionDue(jobType string, interval, initialDelay time.Duration) bool {
now := time.Now().UTC()
r.schedulerMu.Lock()
@@ -275,12 +532,43 @@ func (r *Plugin) markDetectionDue(jobType string, interval time.Duration) bool {
if exists && now.Before(nextRun) {
return false
}
if !exists && initialDelay > 0 {
r.nextDetectionAt[jobType] = now.Add(initialDelay)
return false
}
r.nextDetectionAt[jobType] = now.Add(interval)
r.detectionInFlight[jobType] = true
return true
}
func (r *Plugin) earliestNextDetectionAt() time.Time {
if r == nil {
return time.Time{}
}
r.schedulerMu.Lock()
defer r.schedulerMu.Unlock()
var earliest time.Time
for _, nextRun := range r.nextDetectionAt {
if nextRun.IsZero() {
continue
}
if earliest.IsZero() || nextRun.Before(earliest) {
earliest = nextRun
}
}
return earliest
}
func (r *Plugin) markJobTypeInFlight(jobType string) {
r.schedulerMu.Lock()
r.detectionInFlight[jobType] = true
r.schedulerMu.Unlock()
}
func (r *Plugin) finishDetection(jobType string) {
r.schedulerMu.Lock()
delete(r.detectionInFlight, jobType)
@@ -318,125 +606,18 @@ func (r *Plugin) pruneDetectorLeases(activeJobTypes map[string]struct{}) {
}
}
func (r *Plugin) runScheduledDetection(jobType string, policy schedulerPolicy) {
defer r.finishDetection(jobType)
releaseLock, lockErr := r.acquireAdminLock(fmt.Sprintf("plugin scheduled detection %s", jobType))
if lockErr != nil {
r.recordSchedulerDetectionError(jobType, lockErr)
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: fmt.Sprintf("scheduled detection aborted: failed to acquire lock: %v", lockErr),
Stage: "failed",
OccurredAt: timeToPtr(time.Now().UTC()),
})
return
}
if releaseLock != nil {
defer releaseLock()
}
start := time.Now().UTC()
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: "scheduled detection started",
Stage: "detecting",
OccurredAt: timeToPtr(start),
})
if skip, waitingCount, waitingThreshold := r.shouldSkipDetectionForWaitingJobs(jobType, policy); skip {
r.recordSchedulerDetectionSkip(jobType, fmt.Sprintf("waiting backlog %d reached threshold %d", waitingCount, waitingThreshold))
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: fmt.Sprintf("scheduled detection skipped: waiting backlog %d reached threshold %d", waitingCount, waitingThreshold),
Stage: "skipped_waiting_backlog",
OccurredAt: timeToPtr(time.Now().UTC()),
})
return
}
clusterContext, err := r.loadSchedulerClusterContext()
if err != nil {
r.recordSchedulerDetectionError(jobType, err)
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: fmt.Sprintf("scheduled detection aborted: %v", err),
Stage: "failed",
OccurredAt: timeToPtr(time.Now().UTC()),
})
return
}
ctx, cancel := context.WithTimeout(context.Background(), policy.DetectionTimeout)
proposals, err := r.RunDetection(ctx, jobType, clusterContext, policy.MaxResults)
cancel()
if err != nil {
r.recordSchedulerDetectionError(jobType, err)
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: fmt.Sprintf("scheduled detection failed: %v", err),
Stage: "failed",
OccurredAt: timeToPtr(time.Now().UTC()),
})
return
}
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: fmt.Sprintf("scheduled detection completed: %d proposal(s)", len(proposals)),
Stage: "detected",
OccurredAt: timeToPtr(time.Now().UTC()),
})
r.recordSchedulerDetectionSuccess(jobType, len(proposals))
filteredByActive, skippedActive := r.filterProposalsWithActiveJobs(jobType, proposals)
if skippedActive > 0 {
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: fmt.Sprintf("scheduled detection skipped %d proposal(s) due to active assigned/running jobs", skippedActive),
Stage: "deduped_active_jobs",
OccurredAt: timeToPtr(time.Now().UTC()),
})
}
if len(filteredByActive) == 0 {
return
}
filtered := r.filterScheduledProposals(filteredByActive)
if len(filtered) != len(filteredByActive) {
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: fmt.Sprintf("scheduled detection deduped %d proposal(s) within this run", len(filteredByActive)-len(filtered)),
Stage: "deduped",
OccurredAt: timeToPtr(time.Now().UTC()),
})
}
if len(filtered) == 0 {
return
}
r.dispatchScheduledProposals(jobType, filtered, clusterContext, policy)
}
func (r *Plugin) loadSchedulerClusterContext() (*plugin_pb.ClusterContext, error) {
func (r *Plugin) loadSchedulerClusterContext(ctx context.Context) (*plugin_pb.ClusterContext, error) {
if r.clusterContextProvider == nil {
return nil, fmt.Errorf("cluster context provider is not configured")
}
ctx, cancel := context.WithTimeout(context.Background(), defaultClusterContextTimeout)
if ctx == nil {
ctx = context.Background()
}
clusterCtx, cancel := context.WithTimeout(ctx, defaultClusterContextTimeout)
defer cancel()
clusterContext, err := r.clusterContextProvider(ctx)
clusterContext, err := r.clusterContextProvider(clusterCtx)
if err != nil {
return nil, err
}
@@ -447,11 +628,16 @@ func (r *Plugin) loadSchedulerClusterContext() (*plugin_pb.ClusterContext, error
}
func (r *Plugin) dispatchScheduledProposals(
ctx context.Context,
jobType string,
proposals []*plugin_pb.JobProposal,
clusterContext *plugin_pb.ClusterContext,
policy schedulerPolicy,
) {
) (int, int, int) {
if ctx == nil {
ctx = context.Background()
}
jobQueue := make(chan *plugin_pb.JobSpec, len(proposals))
for index, proposal := range proposals {
job := buildScheduledJobSpec(jobType, proposal, index)
@@ -459,7 +645,7 @@ func (r *Plugin) dispatchScheduledProposals(
select {
case <-r.shutdownCh:
close(jobQueue)
return
return 0, 0, 0
default:
jobQueue <- job
}
@@ -470,6 +656,7 @@ func (r *Plugin) dispatchScheduledProposals(
var statsMu sync.Mutex
successCount := 0
errorCount := 0
canceledCount := 0
workerCount := policy.ExecutionConcurrency
if workerCount < 1 {
@@ -481,6 +668,7 @@ func (r *Plugin) dispatchScheduledProposals(
go func() {
defer wg.Done()
jobLoop:
for job := range jobQueue {
select {
case <-r.shutdownCh:
@@ -488,19 +676,36 @@ func (r *Plugin) dispatchScheduledProposals(
default:
}
if ctx.Err() != nil {
r.cancelQueuedJob(job, ctx.Err())
statsMu.Lock()
canceledCount++
statsMu.Unlock()
continue
}
for {
select {
case <-r.shutdownCh:
return
default:
}
if ctx.Err() != nil {
r.cancelQueuedJob(job, ctx.Err())
statsMu.Lock()
canceledCount++
statsMu.Unlock()
continue jobLoop
}
executor, release, reserveErr := r.reserveScheduledExecutor(jobType, policy)
executor, release, reserveErr := r.reserveScheduledExecutor(ctx, jobType, policy)
if reserveErr != nil {
select {
case <-r.shutdownCh:
return
default:
if ctx.Err() != nil {
r.cancelQueuedJob(job, ctx.Err())
statsMu.Lock()
canceledCount++
statsMu.Unlock()
continue jobLoop
}
statsMu.Lock()
errorCount++
@@ -515,16 +720,23 @@ func (r *Plugin) dispatchScheduledProposals(
break
}
err := r.executeScheduledJobWithExecutor(executor, job, clusterContext, policy)
err := r.executeScheduledJobWithExecutor(ctx, executor, job, clusterContext, policy)
release()
if errors.Is(err, errExecutorAtCapacity) {
r.trackExecutionQueued(job)
if !waitForShutdownOrTimer(r.shutdownCh, policy.ExecutorReserveBackoff) {
if !waitForShutdownOrTimerWithContext(r.shutdownCh, ctx, policy.ExecutorReserveBackoff) {
return
}
continue
}
if err != nil {
if ctx.Err() != nil || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
r.cancelQueuedJob(job, err)
statsMu.Lock()
canceledCount++
statsMu.Unlock()
continue jobLoop
}
statsMu.Lock()
errorCount++
statsMu.Unlock()
@@ -550,23 +762,34 @@ func (r *Plugin) dispatchScheduledProposals(
wg.Wait()
r.appendActivity(JobActivity{
JobType: jobType,
Source: "admin_scheduler",
Message: fmt.Sprintf("scheduled execution finished: success=%d error=%d", successCount, errorCount),
Stage: "executed",
OccurredAt: timeToPtr(time.Now().UTC()),
})
drainErr := ctx.Err()
if drainErr == nil {
drainErr = errSchedulerShutdown
}
for job := range jobQueue {
r.cancelQueuedJob(job, drainErr)
canceledCount++
}
return successCount, errorCount, canceledCount
}
func (r *Plugin) reserveScheduledExecutor(
ctx context.Context,
jobType string,
policy schedulerPolicy,
) (*WorkerSession, func(), error) {
if ctx == nil {
ctx = context.Background()
}
deadline := time.Now().Add(policy.ExecutionTimeout)
if policy.ExecutionTimeout <= 0 {
deadline = time.Now().Add(10 * time.Minute) // Default cap
}
if ctxDeadline, ok := ctx.Deadline(); ok && ctxDeadline.Before(deadline) {
deadline = ctxDeadline
}
for {
select {
@@ -574,6 +797,9 @@ func (r *Plugin) reserveScheduledExecutor(
return nil, nil, fmt.Errorf("plugin is shutting down")
default:
}
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}
if time.Now().After(deadline) {
return nil, nil, fmt.Errorf("timed out waiting for executor capacity for %s", jobType)
@@ -581,7 +807,10 @@ func (r *Plugin) reserveScheduledExecutor(
executors, err := r.registry.ListExecutors(jobType)
if err != nil {
if !waitForShutdownOrTimer(r.shutdownCh, policy.ExecutorReserveBackoff) {
if !waitForShutdownOrTimerWithContext(r.shutdownCh, ctx, policy.ExecutorReserveBackoff) {
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}
return nil, nil, fmt.Errorf("plugin is shutting down")
}
continue
@@ -595,7 +824,10 @@ func (r *Plugin) reserveScheduledExecutor(
return executor, release, nil
}
if !waitForShutdownOrTimer(r.shutdownCh, policy.ExecutorReserveBackoff) {
if !waitForShutdownOrTimerWithContext(r.shutdownCh, ctx, policy.ExecutorReserveBackoff) {
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}
return nil, nil, fmt.Errorf("plugin is shutting down")
}
}
@@ -680,6 +912,7 @@ func schedulerWorkerExecutionLimit(executor *WorkerSession, jobType string, poli
}
func (r *Plugin) executeScheduledJobWithExecutor(
ctx context.Context,
executor *WorkerSession,
job *plugin_pb.JobSpec,
clusterContext *plugin_pb.ClusterContext,
@@ -697,8 +930,15 @@ func (r *Plugin) executeScheduledJobWithExecutor(
return fmt.Errorf("plugin is shutting down")
default:
}
if ctx != nil && ctx.Err() != nil {
return ctx.Err()
}
execCtx, cancel := context.WithTimeout(context.Background(), policy.ExecutionTimeout)
parent := ctx
if parent == nil {
parent = context.Background()
}
execCtx, cancel := context.WithTimeout(parent, policy.ExecutionTimeout)
_, err := r.executeJobWithExecutor(execCtx, executor, job, clusterContext, int32(attempt))
cancel()
if err == nil {
@@ -718,7 +958,10 @@ func (r *Plugin) executeScheduledJobWithExecutor(
Stage: "retry",
OccurredAt: timeToPtr(time.Now().UTC()),
})
if !waitForShutdownOrTimer(r.shutdownCh, policy.RetryBackoff) {
if !waitForShutdownOrTimerWithContext(r.shutdownCh, ctx, policy.RetryBackoff) {
if ctx != nil && ctx.Err() != nil {
return ctx.Err()
}
return fmt.Errorf("plugin is shutting down")
}
}
@@ -764,6 +1007,53 @@ func (r *Plugin) countWaitingTrackedJobs(jobType string) int {
return waiting
}
func (r *Plugin) clearWaitingJobQueue(jobType string) int {
normalizedJobType := strings.TrimSpace(jobType)
if normalizedJobType == "" {
return 0
}
jobIDs := make([]string, 0)
seen := make(map[string]struct{})
r.jobsMu.RLock()
for _, job := range r.jobs {
if job == nil {
continue
}
if strings.TrimSpace(job.JobType) != normalizedJobType {
continue
}
if !isWaitingTrackedJobState(job.State) {
continue
}
jobID := strings.TrimSpace(job.JobID)
if jobID == "" {
continue
}
if _, ok := seen[jobID]; ok {
continue
}
seen[jobID] = struct{}{}
jobIDs = append(jobIDs, jobID)
}
r.jobsMu.RUnlock()
if len(jobIDs) == 0 {
return 0
}
reason := fmt.Sprintf("cleared queued job before %s run", normalizedJobType)
for _, jobID := range jobIDs {
r.markJobCanceled(&plugin_pb.JobSpec{
JobId: jobID,
JobType: normalizedJobType,
}, reason)
}
return len(jobIDs)
}
func waitingBacklogThreshold(policy schedulerPolicy) int {
concurrency := policy.ExecutionConcurrency
if concurrency <= 0 {
@@ -861,6 +1151,27 @@ func waitForShutdownOrTimer(shutdown <-chan struct{}, duration time.Duration) bo
}
}
func waitForShutdownOrTimerWithContext(shutdown <-chan struct{}, ctx context.Context, duration time.Duration) bool {
if duration <= 0 {
return true
}
if ctx == nil {
ctx = context.Background()
}
timer := time.NewTimer(duration)
defer timer.Stop()
select {
case <-shutdown:
return false
case <-ctx.Done():
return false
case <-timer.C:
return true
}
}
// filterProposalsWithActiveJobs removes proposals whose dedupe keys already have active jobs.
// It first expires stale tracked jobs via expireStaleJobs, which can mutate scheduler state,
// so callers should treat this method as a stateful operation.