* feat: introduce scheduler lanes for independent per-workload scheduling
Split the single plugin scheduler loop into independent per-lane
goroutines so that volume management, iceberg compaction, and lifecycle
operations never block each other.
Each lane has its own:
- Goroutine (laneSchedulerLoop)
- Wake channel for immediate scheduling
- Admin lock scope (e.g. "plugin scheduler:default")
- Configurable idle sleep duration
- Loop state tracking
Three lanes are defined:
- default: vacuum, volume_balance, ec_balance, erasure_coding, admin_script
- iceberg: iceberg_maintenance
- lifecycle: s3_lifecycle (new, handler coming in a later commit)
Job types are mapped to lanes via a hardcoded map with LaneDefault as
the fallback. The SchedulerJobTypeState and SchedulerStatus types now
include a Lane field for API consumers.
* feat: per-lane execution reservation pools for resource isolation
Each scheduler lane now maintains its own execution reservation map
so that a busy volume lane cannot consume execution slots needed by
iceberg or lifecycle lanes. The per-lane pool is used by default when
dispatching jobs through the lane scheduler; the global pool remains
as a fallback for the public DispatchProposals API.
* feat: add per-lane scheduler status API and lane worker UI pages
- GET /api/plugin/lanes returns all lanes with status and job types
- GET /api/plugin/workers?lane=X filters workers by lane
- GET /api/plugin/scheduler-states?lane=X filters job types by lane
- GET /api/plugin/scheduler-status?lane=X returns lane-scoped status
- GET /plugin/lanes/{lane}/workers renders per-lane worker page
- SchedulerJobTypeState now includes a "lane" field
The lane worker pages show scheduler status, job type configuration,
and connected workers scoped to a single lane, with links back to
the main plugin overview.
* feat: add s3_lifecycle worker handler for object store lifecycle management
Implements a full plugin worker handler for S3 lifecycle management,
assigned to the new "lifecycle" scheduler lane.
Detection phase:
- Reads filer.conf to find buckets with TTL lifecycle rules
- Creates one job proposal per bucket with active lifecycle rules
- Supports bucket_filter wildcard pattern from admin config
Execution phase:
- Walks the bucket directory tree breadth-first
- Identifies expired objects by checking TtlSec + Crtime < now
- Deletes expired objects in configurable batches
- Reports progress with scanned/expired/error counts
- Supports dry_run mode for safe testing
Configurable via admin UI:
- batch_size: entries per filer listing page (default 1000)
- max_deletes_per_bucket: safety cap per run (default 10000)
- dry_run: detect without deleting
- delete_marker_cleanup: clean expired delete markers
- abort_mpu_days: abort stale multipart uploads
The handler integrates with the existing PutBucketLifecycle flow which
sets TtlSec on entries via filer.conf path rules.
* feat: add per-lane submenu items under Workers sidebar menu
Replace the single "Workers" sidebar link with a collapsible submenu
containing three lane entries:
- Default (volume management + admin scripts) -> /plugin
- Iceberg (table compaction) -> /plugin/lanes/iceberg/workers
- Lifecycle (S3 object expiration) -> /plugin/lanes/lifecycle/workers
The submenu auto-expands when on any /plugin page and highlights the
active lane. Icons match each lane's job type descriptor (server,
snowflake, hourglass).
* feat: scope plugin pages to their scheduler lane
The plugin overview, configuration, detection, queue, and execution
pages now filter workers, job types, scheduler states, and scheduler
status to only show data for their lane.
- Plugin() templ function accepts a lane parameter (default: "default")
- JavaScript appends ?lane= to /api/plugin/workers, /job-types,
/scheduler-states, and /scheduler-status API calls
- GET /api/plugin/job-types now supports ?lane= filtering
- When ?job= is provided (e.g. ?job=iceberg_maintenance), the lane is
auto-derived from the job type so the page scopes correctly
This ensures /plugin shows only default-lane workers and
/plugin/configuration?job=iceberg_maintenance scopes to the iceberg lane.
* fix: remove "Lane" from lane worker page titles and capitalize properly
"lifecycle Lane Workers" -> "Lifecycle Workers"
"iceberg Lane Workers" -> "Iceberg Workers"
* refactor: promote lane items to top-level sidebar menu entries
Move Default, Iceberg, and Lifecycle from a collapsible submenu to
direct top-level items under the WORKERS heading. Removes the
intermediate "Workers" parent link and collapse toggle.
* admin: unify plugin lane routes and handlers
* admin: filter plugin jobs and activities by lane
* admin: reuse plugin UI for worker lane pages
* fix: use ServerAddress.ToGrpcAddress() for filer connections in lifecycle handler
ClusterContext addresses use ServerAddress format (host:port.grpcPort).
Convert to the actual gRPC address via ToGrpcAddress() before dialing,
and add a Ping verification after connecting.
Fixes: "dial tcp: lookup tcp/8888.18888: unknown port"
* fix: resolve ServerAddress gRPC port in iceberg and lifecycle filer connections
ClusterContext addresses use ServerAddress format (host:httpPort.grpcPort).
Both the iceberg and lifecycle handlers now detect the compound format
and extract the gRPC port via ToGrpcAddress() before dialing. Plain
host:port addresses (e.g. from tests) are passed through unchanged.
Fixes: "dial tcp: lookup tcp/8888.18888: unknown port"
* align url
* Potential fix for code scanning alert no. 335: Incorrect conversion between integer types
Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
* fix: address PR review findings across scheduler lanes and lifecycle handler
- Fix variable shadowing: rename loop var `w` to `worker` in
GetPluginWorkersAPI to avoid shadowing the http.ResponseWriter param
- Fix stale GetSchedulerStatus: aggregate loop states across all lanes
instead of reading never-updated legacy schedulerLoopState
- Scope InProcessJobs to lane in GetLaneSchedulerStatus
- Fix AbortMPUDays=0 treated as unset: change <= 0 to < 0 so 0 disables
- Propagate listing errors in lifecycle bucket walk instead of swallowing
- Implement DeleteMarkerCleanup: scan for S3 delete marker entries and
remove them
- Implement AbortMPUDays: scan .uploads directory and remove stale
multipart uploads older than the configured threshold
- Fix success determination: mark job failed when result.errors > 0
even if no fatal error occurred
- Add regression test for jobTypeLaneMap to catch drift from handler
registrations
* fix: guard against nil result in lifecycle completion and trim filer addresses
- Guard result dereference in completion summary: use local vars
defaulting to 0 when result is nil to prevent panic
- Append trimmed filer addresses instead of originals so whitespace
is not passed to the gRPC dialer
* fix: propagate ctx cancellation from deleteExpiredObjects and add config logging
- deleteExpiredObjects now returns a third error value when the context
is canceled mid-batch; the caller stops processing further batches
and returns the cancellation error to the job completion handler
- readBoolConfig and readInt64Config now log unexpected ConfigValue
types at V(1) for debugging, consistent with readStringConfig
* fix: propagate errors in lifecycle cleanup helpers and use correct delete marker key
- cleanupDeleteMarkers: return error on ctx cancellation and SeaweedList
failures instead of silently continuing
- abortIncompleteMPUs: log SeaweedList errors instead of discarding
- isDeleteMarker: use ExtDeleteMarkerKey ("Seaweed-X-Amz-Delete-Marker")
instead of ExtLatestVersionIsDeleteMarker which is for the parent entry
- batchSize cap: use math.MaxInt instead of math.MaxInt32
* fix: propagate ctx cancellation from abortIncompleteMPUs and log unrecognized bool strings
- abortIncompleteMPUs now returns (aborted, errors, ctxErr) matching
cleanupDeleteMarkers; caller stops on cancellation or listing failure
- readBoolConfig logs unrecognized string values before falling back
* fix: shared per-bucket budget across lifecycle phases and allow cleanup without expired objects
- Thread a shared remaining counter through TTL deletion, delete marker
cleanup, and MPU abort so the total operations per bucket never exceed
MaxDeletesPerBucket
- Remove early return when no TTL-expired objects found so delete marker
cleanup and MPU abort still run
- Add NOTE on cleanupDeleteMarkers about version-safety limitation
---------
Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
1534 lines
41 KiB
Go
1534 lines
41 KiB
Go
package plugin
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
)
|
|
|
|
var (
|
|
errExecutorAtCapacity = errors.New("executor is at capacity")
|
|
errSchedulerShutdown = errors.New("scheduler shutdown")
|
|
)
|
|
|
|
const (
|
|
defaultSchedulerTick = 5 * time.Second
|
|
defaultScheduledDetectionInterval = 300 * time.Second
|
|
defaultScheduledDetectionTimeout = 45 * time.Second
|
|
defaultScheduledExecutionTimeout = 90 * time.Second
|
|
defaultScheduledJobTypeMaxRuntime = 30 * time.Minute
|
|
defaultScheduledMaxResults int32 = 1000
|
|
defaultScheduledExecutionConcurrency = 1
|
|
defaultScheduledPerWorkerConcurrency = 1
|
|
maxScheduledExecutionConcurrency = 128
|
|
defaultScheduledRetryBackoff = 5 * time.Second
|
|
defaultClusterContextTimeout = 10 * time.Second
|
|
defaultWaitingBacklogFloor = 8
|
|
defaultWaitingBacklogMultiplier = 4
|
|
maxEstimatedRuntimeCap = 8 * time.Hour
|
|
)
|
|
|
|
type schedulerPolicy struct {
|
|
DetectionInterval time.Duration
|
|
DetectionTimeout time.Duration
|
|
ExecutionTimeout time.Duration
|
|
JobTypeMaxRuntime time.Duration
|
|
RetryBackoff time.Duration
|
|
MaxResults int32
|
|
ExecutionConcurrency int
|
|
PerWorkerConcurrency int
|
|
RetryLimit int
|
|
ExecutorReserveBackoff time.Duration
|
|
}
|
|
|
|
// laneSchedulerLoop is the main scheduling goroutine for a single lane.
|
|
// Each lane runs independently with its own timing, lock scope, and wake channel.
|
|
func (r *Plugin) laneSchedulerLoop(ls *schedulerLaneState) {
|
|
defer r.wg.Done()
|
|
for {
|
|
select {
|
|
case <-r.shutdownCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
hadJobs := r.runLaneSchedulerIteration(ls)
|
|
r.recordLaneIterationComplete(ls, hadJobs)
|
|
|
|
if hadJobs {
|
|
continue
|
|
}
|
|
|
|
r.setLaneLoopState(ls, "", "sleeping")
|
|
idleSleep := LaneIdleSleep(ls.lane)
|
|
if nextRun := r.earliestLaneDetectionAt(ls.lane); !nextRun.IsZero() {
|
|
if until := time.Until(nextRun); until <= 0 {
|
|
idleSleep = 0
|
|
} else if until < idleSleep {
|
|
idleSleep = until
|
|
}
|
|
}
|
|
if idleSleep <= 0 {
|
|
continue
|
|
}
|
|
|
|
timer := time.NewTimer(idleSleep)
|
|
select {
|
|
case <-r.shutdownCh:
|
|
timer.Stop()
|
|
return
|
|
case <-ls.wakeCh:
|
|
if !timer.Stop() {
|
|
<-timer.C
|
|
}
|
|
continue
|
|
case <-timer.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
// schedulerLoop is kept for backward compatibility; it delegates to
|
|
// laneSchedulerLoop with the default lane. New code should not call this.
|
|
func (r *Plugin) schedulerLoop() {
|
|
ls := r.lanes[LaneDefault]
|
|
if ls == nil {
|
|
ls = newLaneState(LaneDefault)
|
|
}
|
|
r.laneSchedulerLoop(ls)
|
|
}
|
|
|
|
// runLaneSchedulerIteration runs one scheduling pass for a single lane,
|
|
// processing only the job types assigned to that lane.
|
|
func (r *Plugin) runLaneSchedulerIteration(ls *schedulerLaneState) bool {
|
|
r.expireStaleJobs(time.Now().UTC())
|
|
|
|
allJobTypes := r.registry.DetectableJobTypes()
|
|
// Filter to only job types belonging to this lane.
|
|
var jobTypes []string
|
|
for _, jt := range allJobTypes {
|
|
if JobTypeLane(jt) == ls.lane {
|
|
jobTypes = append(jobTypes, jt)
|
|
}
|
|
}
|
|
if len(jobTypes) == 0 {
|
|
r.setLaneLoopState(ls, "", "idle")
|
|
return false
|
|
}
|
|
|
|
r.setLaneLoopState(ls, "", "waiting_for_lock")
|
|
lockName := fmt.Sprintf("plugin scheduler:%s", ls.lane)
|
|
releaseLock, err := r.acquireAdminLock(lockName)
|
|
if err != nil {
|
|
glog.Warningf("Plugin scheduler [%s] failed to acquire lock: %v", ls.lane, err)
|
|
r.setLaneLoopState(ls, "", "idle")
|
|
return false
|
|
}
|
|
if releaseLock != nil {
|
|
defer releaseLock()
|
|
}
|
|
|
|
active := make(map[string]struct{}, len(jobTypes))
|
|
hadJobs := false
|
|
|
|
for _, jobType := range jobTypes {
|
|
active[jobType] = struct{}{}
|
|
|
|
policy, enabled, err := r.loadSchedulerPolicy(jobType)
|
|
if err != nil {
|
|
glog.Warningf("Plugin scheduler [%s] failed to load policy for %s: %v", ls.lane, jobType, err)
|
|
continue
|
|
}
|
|
if !enabled {
|
|
r.clearSchedulerJobType(jobType)
|
|
continue
|
|
}
|
|
initialDelay := time.Duration(0)
|
|
if runInfo := r.snapshotSchedulerRun(jobType); runInfo.lastRunStartedAt.IsZero() {
|
|
initialDelay = 5 * time.Second
|
|
}
|
|
if !r.markDetectionDue(jobType, policy.DetectionInterval, initialDelay) {
|
|
continue
|
|
}
|
|
|
|
detected := r.runJobTypeIteration(jobType, policy)
|
|
if detected {
|
|
hadJobs = true
|
|
}
|
|
}
|
|
|
|
r.pruneSchedulerState(active)
|
|
r.pruneDetectorLeases(active)
|
|
r.setLaneLoopState(ls, "", "idle")
|
|
return hadJobs
|
|
}
|
|
|
|
// runSchedulerIteration is kept for backward compatibility. It runs a
|
|
// single iteration across ALL job types (equivalent to the old single-loop
|
|
// behavior). It is only used by the legacy schedulerLoop() fallback.
|
|
func (r *Plugin) runSchedulerIteration() bool {
|
|
ls := r.lanes[LaneDefault]
|
|
if ls == nil {
|
|
ls = newLaneState(LaneDefault)
|
|
}
|
|
// For backward compat, the old function processes all job types.
|
|
r.expireStaleJobs(time.Now().UTC())
|
|
|
|
jobTypes := r.registry.DetectableJobTypes()
|
|
if len(jobTypes) == 0 {
|
|
r.setSchedulerLoopState("", "idle")
|
|
return false
|
|
}
|
|
|
|
r.setSchedulerLoopState("", "waiting_for_lock")
|
|
releaseLock, err := r.acquireAdminLock("plugin scheduler iteration")
|
|
if err != nil {
|
|
glog.Warningf("Plugin scheduler failed to acquire lock: %v", err)
|
|
r.setSchedulerLoopState("", "idle")
|
|
return false
|
|
}
|
|
if releaseLock != nil {
|
|
defer releaseLock()
|
|
}
|
|
|
|
active := make(map[string]struct{}, len(jobTypes))
|
|
hadJobs := false
|
|
|
|
for _, jobType := range jobTypes {
|
|
active[jobType] = struct{}{}
|
|
|
|
policy, enabled, err := r.loadSchedulerPolicy(jobType)
|
|
if err != nil {
|
|
glog.Warningf("Plugin scheduler failed to load policy for %s: %v", jobType, err)
|
|
continue
|
|
}
|
|
if !enabled {
|
|
r.clearSchedulerJobType(jobType)
|
|
continue
|
|
}
|
|
initialDelay := time.Duration(0)
|
|
if runInfo := r.snapshotSchedulerRun(jobType); runInfo.lastRunStartedAt.IsZero() {
|
|
initialDelay = 5 * time.Second
|
|
}
|
|
if !r.markDetectionDue(jobType, policy.DetectionInterval, initialDelay) {
|
|
continue
|
|
}
|
|
|
|
detected := r.runJobTypeIteration(jobType, policy)
|
|
if detected {
|
|
hadJobs = true
|
|
}
|
|
}
|
|
|
|
r.pruneSchedulerState(active)
|
|
r.pruneDetectorLeases(active)
|
|
r.setSchedulerLoopState("", "idle")
|
|
return hadJobs
|
|
}
|
|
|
|
// wakeLane wakes the scheduler goroutine for a specific lane.
|
|
func (r *Plugin) wakeLane(lane SchedulerLane) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
if ls, ok := r.lanes[lane]; ok {
|
|
select {
|
|
case ls.wakeCh <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// wakeAllLanes wakes all lane scheduler goroutines.
|
|
func (r *Plugin) wakeAllLanes() {
|
|
if r == nil {
|
|
return
|
|
}
|
|
for _, ls := range r.lanes {
|
|
select {
|
|
case ls.wakeCh <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// wakeScheduler wakes the lane that owns the given job type, or all lanes
|
|
// if no job type is specified. Kept for backward compatibility.
|
|
func (r *Plugin) wakeScheduler() {
|
|
r.wakeAllLanes()
|
|
}
|
|
|
|
func (r *Plugin) runJobTypeIteration(jobType string, policy schedulerPolicy) bool {
|
|
r.recordSchedulerRunStart(jobType)
|
|
r.clearWaitingJobQueue(jobType)
|
|
r.setSchedulerLoopState(jobType, "detecting")
|
|
r.markJobTypeInFlight(jobType)
|
|
defer r.finishDetection(jobType)
|
|
|
|
start := time.Now().UTC()
|
|
r.appendActivity(JobActivity{
|
|
JobType: jobType,
|
|
Source: "admin_scheduler",
|
|
Message: "scheduled detection started",
|
|
Stage: "detecting",
|
|
OccurredAt: timeToPtr(start),
|
|
})
|
|
|
|
if skip, waitingCount, waitingThreshold := r.shouldSkipDetectionForWaitingJobs(jobType, policy); skip {
|
|
r.recordSchedulerDetectionSkip(jobType, fmt.Sprintf("waiting backlog %d reached threshold %d", waitingCount, waitingThreshold))
|
|
r.appendActivity(JobActivity{
|
|
JobType: jobType,
|
|
Source: "admin_scheduler",
|
|
Message: fmt.Sprintf("scheduled detection skipped: waiting backlog %d reached threshold %d", waitingCount, waitingThreshold),
|
|
Stage: "skipped_waiting_backlog",
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
})
|
|
r.recordSchedulerRunComplete(jobType, "skipped")
|
|
return false
|
|
}
|
|
|
|
maxRuntime := policy.JobTypeMaxRuntime
|
|
if maxRuntime <= 0 {
|
|
maxRuntime = defaultScheduledJobTypeMaxRuntime
|
|
}
|
|
jobCtx, cancel := context.WithTimeout(context.Background(), maxRuntime)
|
|
defer cancel()
|
|
|
|
clusterContext, err := r.loadSchedulerClusterContext(jobCtx)
|
|
if err != nil {
|
|
r.recordSchedulerDetectionError(jobType, err)
|
|
r.appendActivity(JobActivity{
|
|
JobType: jobType,
|
|
Source: "admin_scheduler",
|
|
Message: fmt.Sprintf("scheduled detection aborted: %v", err),
|
|
Stage: "failed",
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
})
|
|
r.recordSchedulerRunComplete(jobType, "error")
|
|
return false
|
|
}
|
|
|
|
detectionTimeout := policy.DetectionTimeout
|
|
remaining := time.Until(start.Add(maxRuntime))
|
|
if remaining <= 0 {
|
|
r.appendActivity(JobActivity{
|
|
JobType: jobType,
|
|
Source: "admin_scheduler",
|
|
Message: "scheduled run timed out before detection",
|
|
Stage: "timeout",
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
})
|
|
r.recordSchedulerRunComplete(jobType, "timeout")
|
|
return false
|
|
}
|
|
if detectionTimeout <= 0 {
|
|
detectionTimeout = defaultScheduledDetectionTimeout
|
|
}
|
|
if detectionTimeout > remaining {
|
|
detectionTimeout = remaining
|
|
}
|
|
|
|
detectCtx, cancelDetect := context.WithTimeout(jobCtx, detectionTimeout)
|
|
proposals, err := r.RunDetection(detectCtx, jobType, clusterContext, policy.MaxResults)
|
|
cancelDetect()
|
|
if err != nil {
|
|
r.recordSchedulerDetectionError(jobType, err)
|
|
stage := "failed"
|
|
status := "error"
|
|
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
|
stage = "timeout"
|
|
status = "timeout"
|
|
}
|
|
r.appendActivity(JobActivity{
|
|
JobType: jobType,
|
|
Source: "admin_scheduler",
|
|
Message: fmt.Sprintf("scheduled detection failed: %v", err),
|
|
Stage: stage,
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
})
|
|
r.recordSchedulerRunComplete(jobType, status)
|
|
return false
|
|
}
|
|
|
|
r.appendActivity(JobActivity{
|
|
JobType: jobType,
|
|
Source: "admin_scheduler",
|
|
Message: fmt.Sprintf("scheduled detection completed: %d proposal(s)", len(proposals)),
|
|
Stage: "detected",
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
})
|
|
r.recordSchedulerDetectionSuccess(jobType, len(proposals))
|
|
|
|
detected := len(proposals) > 0
|
|
|
|
filteredByActive, skippedActive := r.filterProposalsWithActiveJobs(jobType, proposals)
|
|
if skippedActive > 0 {
|
|
r.appendActivity(JobActivity{
|
|
JobType: jobType,
|
|
Source: "admin_scheduler",
|
|
Message: fmt.Sprintf("scheduled detection skipped %d proposal(s) due to active assigned/running jobs", skippedActive),
|
|
Stage: "deduped_active_jobs",
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
})
|
|
}
|
|
|
|
if len(filteredByActive) == 0 {
|
|
r.recordSchedulerRunComplete(jobType, "success")
|
|
return detected
|
|
}
|
|
|
|
filtered := r.filterScheduledProposals(filteredByActive)
|
|
if len(filtered) != len(filteredByActive) {
|
|
r.appendActivity(JobActivity{
|
|
JobType: jobType,
|
|
Source: "admin_scheduler",
|
|
Message: fmt.Sprintf("scheduled detection deduped %d proposal(s) within this run", len(filteredByActive)-len(filtered)),
|
|
Stage: "deduped",
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
})
|
|
}
|
|
|
|
if len(filtered) == 0 {
|
|
r.recordSchedulerRunComplete(jobType, "success")
|
|
return detected
|
|
}
|
|
|
|
r.setSchedulerLoopState(jobType, "executing")
|
|
|
|
// Scan proposals for the maximum estimated_runtime_seconds so the
|
|
// execution phase gets enough time for large jobs (e.g. vacuum on
|
|
// big volumes). If any proposal needs more time than the remaining
|
|
// JobTypeMaxRuntime, extend the execution context accordingly.
|
|
var maxEstimatedRuntime time.Duration
|
|
for _, p := range filtered {
|
|
if p.Parameters != nil {
|
|
if est, ok := p.Parameters["estimated_runtime_seconds"]; ok {
|
|
if v := est.GetInt64Value(); v > 0 {
|
|
if d := time.Duration(v) * time.Second; d > maxEstimatedRuntime {
|
|
maxEstimatedRuntime = d
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if maxEstimatedRuntime > maxEstimatedRuntimeCap {
|
|
maxEstimatedRuntime = maxEstimatedRuntimeCap
|
|
}
|
|
|
|
remaining = time.Until(start.Add(maxRuntime))
|
|
if remaining <= 0 {
|
|
r.appendActivity(JobActivity{
|
|
JobType: jobType,
|
|
Source: "admin_scheduler",
|
|
Message: "scheduled execution skipped: job type max runtime reached",
|
|
Stage: "timeout",
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
})
|
|
r.recordSchedulerRunComplete(jobType, "timeout")
|
|
return detected
|
|
}
|
|
|
|
// If the longest estimated job exceeds the remaining JobTypeMaxRuntime,
|
|
// create a new execution context with enough headroom instead of using
|
|
// jobCtx which would cancel too early.
|
|
execCtx := jobCtx
|
|
execCancel := context.CancelFunc(func() {})
|
|
if maxEstimatedRuntime > 0 && maxEstimatedRuntime > remaining {
|
|
execCtx, execCancel = context.WithTimeout(context.Background(), maxEstimatedRuntime)
|
|
remaining = maxEstimatedRuntime
|
|
}
|
|
defer execCancel()
|
|
|
|
execPolicy := policy
|
|
if execPolicy.ExecutionTimeout <= 0 {
|
|
execPolicy.ExecutionTimeout = defaultScheduledExecutionTimeout
|
|
}
|
|
if execPolicy.ExecutionTimeout > remaining {
|
|
execPolicy.ExecutionTimeout = remaining
|
|
}
|
|
|
|
successCount, errorCount, canceledCount := r.dispatchScheduledProposals(execCtx, jobType, filtered, clusterContext, execPolicy)
|
|
|
|
status := "success"
|
|
if execCtx.Err() != nil {
|
|
status = "timeout"
|
|
} else if errorCount > 0 || canceledCount > 0 {
|
|
status = "error"
|
|
}
|
|
|
|
r.appendActivity(JobActivity{
|
|
JobType: jobType,
|
|
Source: "admin_scheduler",
|
|
Message: fmt.Sprintf("scheduled execution finished: success=%d error=%d canceled=%d", successCount, errorCount, canceledCount),
|
|
Stage: "executed",
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
})
|
|
r.recordSchedulerRunComplete(jobType, status)
|
|
return detected
|
|
}
|
|
|
|
func (r *Plugin) loadSchedulerPolicy(jobType string) (schedulerPolicy, bool, error) {
|
|
cfg, err := r.store.LoadJobTypeConfig(jobType)
|
|
if err != nil {
|
|
return schedulerPolicy{}, false, err
|
|
}
|
|
descriptor, err := r.store.LoadDescriptor(jobType)
|
|
if err != nil {
|
|
return schedulerPolicy{}, false, err
|
|
}
|
|
|
|
adminRuntime := deriveSchedulerAdminRuntime(cfg, descriptor)
|
|
if adminRuntime == nil {
|
|
return schedulerPolicy{}, false, nil
|
|
}
|
|
if !adminRuntime.Enabled {
|
|
return schedulerPolicy{}, false, nil
|
|
}
|
|
|
|
policy := schedulerPolicy{
|
|
DetectionInterval: durationFromSeconds(adminRuntime.DetectionIntervalSeconds, defaultScheduledDetectionInterval),
|
|
DetectionTimeout: durationFromSeconds(adminRuntime.DetectionTimeoutSeconds, defaultScheduledDetectionTimeout),
|
|
ExecutionTimeout: defaultScheduledExecutionTimeout,
|
|
JobTypeMaxRuntime: durationFromSeconds(adminRuntime.JobTypeMaxRuntimeSeconds, defaultScheduledJobTypeMaxRuntime),
|
|
RetryBackoff: durationFromSeconds(adminRuntime.RetryBackoffSeconds, defaultScheduledRetryBackoff),
|
|
MaxResults: adminRuntime.MaxJobsPerDetection,
|
|
ExecutionConcurrency: int(adminRuntime.GlobalExecutionConcurrency),
|
|
PerWorkerConcurrency: int(adminRuntime.PerWorkerExecutionConcurrency),
|
|
RetryLimit: int(adminRuntime.RetryLimit),
|
|
ExecutorReserveBackoff: 200 * time.Millisecond,
|
|
}
|
|
|
|
if policy.DetectionInterval < r.schedulerTick {
|
|
policy.DetectionInterval = r.schedulerTick
|
|
}
|
|
if policy.MaxResults <= 0 {
|
|
policy.MaxResults = defaultScheduledMaxResults
|
|
}
|
|
if policy.ExecutionConcurrency <= 0 {
|
|
policy.ExecutionConcurrency = defaultScheduledExecutionConcurrency
|
|
}
|
|
if policy.ExecutionConcurrency > maxScheduledExecutionConcurrency {
|
|
policy.ExecutionConcurrency = maxScheduledExecutionConcurrency
|
|
}
|
|
if policy.PerWorkerConcurrency <= 0 {
|
|
policy.PerWorkerConcurrency = defaultScheduledPerWorkerConcurrency
|
|
}
|
|
if policy.PerWorkerConcurrency > policy.ExecutionConcurrency {
|
|
policy.PerWorkerConcurrency = policy.ExecutionConcurrency
|
|
}
|
|
if policy.RetryLimit < 0 {
|
|
policy.RetryLimit = 0
|
|
}
|
|
if policy.JobTypeMaxRuntime <= 0 {
|
|
policy.JobTypeMaxRuntime = defaultScheduledJobTypeMaxRuntime
|
|
}
|
|
|
|
// Plugin protocol currently has only detection timeout in admin settings.
|
|
execTimeout := time.Duration(adminRuntime.DetectionTimeoutSeconds*2) * time.Second
|
|
if execTimeout < defaultScheduledExecutionTimeout {
|
|
execTimeout = defaultScheduledExecutionTimeout
|
|
}
|
|
policy.ExecutionTimeout = execTimeout
|
|
|
|
return policy, true, nil
|
|
}
|
|
|
|
func (r *Plugin) ListSchedulerStates() ([]SchedulerJobTypeState, error) {
|
|
jobTypes, err := r.ListKnownJobTypes()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r.schedulerMu.Lock()
|
|
nextDetectionAt := make(map[string]time.Time, len(r.nextDetectionAt))
|
|
for jobType, nextRun := range r.nextDetectionAt {
|
|
nextDetectionAt[jobType] = nextRun
|
|
}
|
|
detectionInFlight := make(map[string]bool, len(r.detectionInFlight))
|
|
for jobType, inFlight := range r.detectionInFlight {
|
|
detectionInFlight[jobType] = inFlight
|
|
}
|
|
r.schedulerMu.Unlock()
|
|
|
|
states := make([]SchedulerJobTypeState, 0, len(jobTypes))
|
|
for _, jobTypeInfo := range jobTypes {
|
|
jobType := jobTypeInfo.JobType
|
|
state := SchedulerJobTypeState{
|
|
JobType: jobType,
|
|
Lane: string(JobTypeLane(jobType)),
|
|
DetectionInFlight: detectionInFlight[jobType],
|
|
}
|
|
|
|
if nextRun, ok := nextDetectionAt[jobType]; ok && !nextRun.IsZero() {
|
|
nextRunUTC := nextRun.UTC()
|
|
state.NextDetectionAt = &nextRunUTC
|
|
}
|
|
|
|
policy, enabled, loadErr := r.loadSchedulerPolicy(jobType)
|
|
|
|
if loadErr != nil {
|
|
state.PolicyError = loadErr.Error()
|
|
} else {
|
|
state.Enabled = enabled
|
|
if enabled {
|
|
state.DetectionIntervalSeconds = secondsFromDuration(policy.DetectionInterval)
|
|
state.DetectionTimeoutSeconds = secondsFromDuration(policy.DetectionTimeout)
|
|
state.ExecutionTimeoutSeconds = secondsFromDuration(policy.ExecutionTimeout)
|
|
state.JobTypeMaxRuntimeSeconds = secondsFromDuration(policy.JobTypeMaxRuntime)
|
|
state.MaxJobsPerDetection = policy.MaxResults
|
|
state.GlobalExecutionConcurrency = policy.ExecutionConcurrency
|
|
state.PerWorkerExecutionConcurrency = policy.PerWorkerConcurrency
|
|
state.RetryLimit = policy.RetryLimit
|
|
state.RetryBackoffSeconds = secondsFromDuration(policy.RetryBackoff)
|
|
}
|
|
}
|
|
|
|
runInfo := r.snapshotSchedulerRun(jobType)
|
|
if !runInfo.lastRunStartedAt.IsZero() {
|
|
at := runInfo.lastRunStartedAt
|
|
state.LastRunStartedAt = &at
|
|
}
|
|
if !runInfo.lastRunCompletedAt.IsZero() {
|
|
at := runInfo.lastRunCompletedAt
|
|
state.LastRunCompletedAt = &at
|
|
}
|
|
if runInfo.lastRunStatus != "" {
|
|
state.LastRunStatus = runInfo.lastRunStatus
|
|
}
|
|
|
|
leasedWorkerID := r.getDetectorLease(jobType)
|
|
if leasedWorkerID != "" {
|
|
state.DetectorWorkerID = leasedWorkerID
|
|
if worker, ok := r.registry.Get(leasedWorkerID); ok {
|
|
if capability := worker.Capabilities[jobType]; capability != nil && capability.CanDetect {
|
|
state.DetectorAvailable = true
|
|
}
|
|
}
|
|
}
|
|
if state.DetectorWorkerID == "" {
|
|
detector, detectorErr := r.registry.PickDetector(jobType)
|
|
if detectorErr == nil && detector != nil {
|
|
state.DetectorAvailable = true
|
|
state.DetectorWorkerID = detector.WorkerID
|
|
}
|
|
}
|
|
|
|
executors, executorErr := r.registry.ListExecutors(jobType)
|
|
if executorErr == nil {
|
|
state.ExecutorWorkerCount = len(executors)
|
|
}
|
|
|
|
states = append(states, state)
|
|
}
|
|
|
|
return states, nil
|
|
}
|
|
|
|
func deriveSchedulerAdminRuntime(
|
|
cfg *plugin_pb.PersistedJobTypeConfig,
|
|
descriptor *plugin_pb.JobTypeDescriptor,
|
|
) *plugin_pb.AdminRuntimeConfig {
|
|
if cfg != nil && cfg.AdminRuntime != nil {
|
|
adminConfig := *cfg.AdminRuntime
|
|
return &adminConfig
|
|
}
|
|
|
|
if descriptor == nil || descriptor.AdminRuntimeDefaults == nil {
|
|
return nil
|
|
}
|
|
|
|
defaults := descriptor.AdminRuntimeDefaults
|
|
return &plugin_pb.AdminRuntimeConfig{
|
|
Enabled: defaults.Enabled,
|
|
DetectionIntervalSeconds: defaults.DetectionIntervalSeconds,
|
|
DetectionTimeoutSeconds: defaults.DetectionTimeoutSeconds,
|
|
MaxJobsPerDetection: defaults.MaxJobsPerDetection,
|
|
GlobalExecutionConcurrency: defaults.GlobalExecutionConcurrency,
|
|
PerWorkerExecutionConcurrency: defaults.PerWorkerExecutionConcurrency,
|
|
RetryLimit: defaults.RetryLimit,
|
|
RetryBackoffSeconds: defaults.RetryBackoffSeconds,
|
|
JobTypeMaxRuntimeSeconds: defaults.JobTypeMaxRuntimeSeconds,
|
|
}
|
|
}
|
|
|
|
func (r *Plugin) markDetectionDue(jobType string, interval, initialDelay time.Duration) bool {
|
|
now := time.Now().UTC()
|
|
|
|
r.schedulerMu.Lock()
|
|
defer r.schedulerMu.Unlock()
|
|
|
|
if r.detectionInFlight[jobType] {
|
|
return false
|
|
}
|
|
|
|
nextRun, exists := r.nextDetectionAt[jobType]
|
|
if exists && now.Before(nextRun) {
|
|
return false
|
|
}
|
|
if !exists && initialDelay > 0 {
|
|
r.nextDetectionAt[jobType] = now.Add(initialDelay)
|
|
return false
|
|
}
|
|
|
|
r.nextDetectionAt[jobType] = now.Add(interval)
|
|
r.detectionInFlight[jobType] = true
|
|
return true
|
|
}
|
|
|
|
// earliestLaneDetectionAt returns the earliest next detection time among
|
|
// job types that belong to the given lane.
|
|
func (r *Plugin) earliestLaneDetectionAt(lane SchedulerLane) time.Time {
|
|
if r == nil {
|
|
return time.Time{}
|
|
}
|
|
|
|
r.schedulerMu.Lock()
|
|
defer r.schedulerMu.Unlock()
|
|
|
|
var earliest time.Time
|
|
for jobType, nextRun := range r.nextDetectionAt {
|
|
if JobTypeLane(jobType) != lane {
|
|
continue
|
|
}
|
|
if nextRun.IsZero() {
|
|
continue
|
|
}
|
|
if earliest.IsZero() || nextRun.Before(earliest) {
|
|
earliest = nextRun
|
|
}
|
|
}
|
|
|
|
return earliest
|
|
}
|
|
|
|
// earliestNextDetectionAt returns the earliest next detection time across
|
|
// all job types regardless of lane. Kept for backward compatibility and
|
|
// the global scheduler status API.
|
|
func (r *Plugin) earliestNextDetectionAt() time.Time {
|
|
if r == nil {
|
|
return time.Time{}
|
|
}
|
|
|
|
r.schedulerMu.Lock()
|
|
defer r.schedulerMu.Unlock()
|
|
|
|
var earliest time.Time
|
|
for _, nextRun := range r.nextDetectionAt {
|
|
if nextRun.IsZero() {
|
|
continue
|
|
}
|
|
if earliest.IsZero() || nextRun.Before(earliest) {
|
|
earliest = nextRun
|
|
}
|
|
}
|
|
|
|
return earliest
|
|
}
|
|
|
|
func (r *Plugin) markJobTypeInFlight(jobType string) {
|
|
r.schedulerMu.Lock()
|
|
r.detectionInFlight[jobType] = true
|
|
r.schedulerMu.Unlock()
|
|
}
|
|
|
|
func (r *Plugin) finishDetection(jobType string) {
|
|
r.schedulerMu.Lock()
|
|
delete(r.detectionInFlight, jobType)
|
|
r.schedulerMu.Unlock()
|
|
}
|
|
|
|
func (r *Plugin) pruneSchedulerState(activeJobTypes map[string]struct{}) {
|
|
r.schedulerMu.Lock()
|
|
defer r.schedulerMu.Unlock()
|
|
|
|
for jobType := range r.nextDetectionAt {
|
|
if _, ok := activeJobTypes[jobType]; !ok {
|
|
delete(r.nextDetectionAt, jobType)
|
|
delete(r.detectionInFlight, jobType)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Plugin) clearSchedulerJobType(jobType string) {
|
|
r.schedulerMu.Lock()
|
|
delete(r.nextDetectionAt, jobType)
|
|
delete(r.detectionInFlight, jobType)
|
|
r.schedulerMu.Unlock()
|
|
r.clearDetectorLease(jobType, "")
|
|
}
|
|
|
|
func (r *Plugin) pruneDetectorLeases(activeJobTypes map[string]struct{}) {
|
|
r.detectorLeaseMu.Lock()
|
|
defer r.detectorLeaseMu.Unlock()
|
|
|
|
for jobType := range r.detectorLeases {
|
|
if _, ok := activeJobTypes[jobType]; !ok {
|
|
delete(r.detectorLeases, jobType)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Plugin) loadSchedulerClusterContext(ctx context.Context) (*plugin_pb.ClusterContext, error) {
|
|
if r.clusterContextProvider == nil {
|
|
return nil, fmt.Errorf("cluster context provider is not configured")
|
|
}
|
|
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
clusterCtx, cancel := context.WithTimeout(ctx, defaultClusterContextTimeout)
|
|
defer cancel()
|
|
|
|
clusterContext, err := r.clusterContextProvider(clusterCtx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if clusterContext == nil {
|
|
return nil, fmt.Errorf("cluster context provider returned nil")
|
|
}
|
|
return clusterContext, nil
|
|
}
|
|
|
|
func (r *Plugin) dispatchScheduledProposals(
|
|
ctx context.Context,
|
|
jobType string,
|
|
proposals []*plugin_pb.JobProposal,
|
|
clusterContext *plugin_pb.ClusterContext,
|
|
policy schedulerPolicy,
|
|
) (int, int, int) {
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
jobQueue := make(chan *plugin_pb.JobSpec, len(proposals))
|
|
for index, proposal := range proposals {
|
|
job := buildScheduledJobSpec(jobType, proposal, index)
|
|
r.trackExecutionQueued(job)
|
|
select {
|
|
case <-r.shutdownCh:
|
|
close(jobQueue)
|
|
return 0, 0, 0
|
|
default:
|
|
jobQueue <- job
|
|
}
|
|
}
|
|
close(jobQueue)
|
|
|
|
var wg sync.WaitGroup
|
|
var statsMu sync.Mutex
|
|
successCount := 0
|
|
errorCount := 0
|
|
canceledCount := 0
|
|
|
|
workerCount := policy.ExecutionConcurrency
|
|
if workerCount < 1 {
|
|
workerCount = 1
|
|
}
|
|
|
|
for i := 0; i < workerCount; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
jobLoop:
|
|
for job := range jobQueue {
|
|
select {
|
|
case <-r.shutdownCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
if ctx.Err() != nil {
|
|
r.cancelQueuedJob(job, ctx.Err())
|
|
statsMu.Lock()
|
|
canceledCount++
|
|
statsMu.Unlock()
|
|
continue
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-r.shutdownCh:
|
|
return
|
|
default:
|
|
}
|
|
if ctx.Err() != nil {
|
|
r.cancelQueuedJob(job, ctx.Err())
|
|
statsMu.Lock()
|
|
canceledCount++
|
|
statsMu.Unlock()
|
|
continue jobLoop
|
|
}
|
|
|
|
executor, release, reserveErr := r.reserveScheduledExecutor(ctx, jobType, policy)
|
|
if reserveErr != nil {
|
|
if ctx.Err() != nil {
|
|
r.cancelQueuedJob(job, ctx.Err())
|
|
statsMu.Lock()
|
|
canceledCount++
|
|
statsMu.Unlock()
|
|
continue jobLoop
|
|
}
|
|
statsMu.Lock()
|
|
errorCount++
|
|
statsMu.Unlock()
|
|
r.appendActivity(JobActivity{
|
|
JobType: jobType,
|
|
Source: "admin_scheduler",
|
|
Message: fmt.Sprintf("scheduled execution reservation failed: %v", reserveErr),
|
|
Stage: "failed",
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
})
|
|
break
|
|
}
|
|
|
|
err := r.executeScheduledJobWithExecutor(ctx, executor, job, clusterContext, policy)
|
|
release()
|
|
if errors.Is(err, errExecutorAtCapacity) {
|
|
r.trackExecutionQueued(job)
|
|
if !waitForShutdownOrTimerWithContext(r.shutdownCh, ctx, policy.ExecutorReserveBackoff) {
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
if err != nil {
|
|
if ctx.Err() != nil || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
|
r.cancelQueuedJob(job, err)
|
|
statsMu.Lock()
|
|
canceledCount++
|
|
statsMu.Unlock()
|
|
continue jobLoop
|
|
}
|
|
statsMu.Lock()
|
|
errorCount++
|
|
statsMu.Unlock()
|
|
r.appendActivity(JobActivity{
|
|
JobID: job.JobId,
|
|
JobType: job.JobType,
|
|
Source: "admin_scheduler",
|
|
Message: fmt.Sprintf("scheduled execution failed: %v", err),
|
|
Stage: "failed",
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
})
|
|
break
|
|
}
|
|
|
|
statsMu.Lock()
|
|
successCount++
|
|
statsMu.Unlock()
|
|
break
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
drainErr := ctx.Err()
|
|
if drainErr == nil {
|
|
drainErr = errSchedulerShutdown
|
|
}
|
|
for job := range jobQueue {
|
|
r.cancelQueuedJob(job, drainErr)
|
|
canceledCount++
|
|
}
|
|
|
|
return successCount, errorCount, canceledCount
|
|
}
|
|
|
|
func (r *Plugin) reserveScheduledExecutor(
|
|
ctx context.Context,
|
|
jobType string,
|
|
policy schedulerPolicy,
|
|
) (*WorkerSession, func(), error) {
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
deadline := time.Now().Add(policy.ExecutionTimeout)
|
|
if policy.ExecutionTimeout <= 0 {
|
|
deadline = time.Now().Add(10 * time.Minute) // Default cap
|
|
}
|
|
if ctxDeadline, ok := ctx.Deadline(); ok && ctxDeadline.Before(deadline) {
|
|
deadline = ctxDeadline
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-r.shutdownCh:
|
|
return nil, nil, fmt.Errorf("plugin is shutting down")
|
|
default:
|
|
}
|
|
if ctx.Err() != nil {
|
|
return nil, nil, ctx.Err()
|
|
}
|
|
|
|
if time.Now().After(deadline) {
|
|
return nil, nil, fmt.Errorf("timed out waiting for executor capacity for %s", jobType)
|
|
}
|
|
|
|
executors, err := r.registry.ListExecutors(jobType)
|
|
if err != nil {
|
|
if !waitForShutdownOrTimerWithContext(r.shutdownCh, ctx, policy.ExecutorReserveBackoff) {
|
|
if ctx.Err() != nil {
|
|
return nil, nil, ctx.Err()
|
|
}
|
|
return nil, nil, fmt.Errorf("plugin is shutting down")
|
|
}
|
|
continue
|
|
}
|
|
|
|
for _, executor := range executors {
|
|
release, ok := r.tryReserveExecutorCapacity(executor, jobType, policy)
|
|
if !ok {
|
|
continue
|
|
}
|
|
return executor, release, nil
|
|
}
|
|
|
|
if !waitForShutdownOrTimerWithContext(r.shutdownCh, ctx, policy.ExecutorReserveBackoff) {
|
|
if ctx.Err() != nil {
|
|
return nil, nil, ctx.Err()
|
|
}
|
|
return nil, nil, fmt.Errorf("plugin is shutting down")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Plugin) tryReserveExecutorCapacity(
|
|
executor *WorkerSession,
|
|
jobType string,
|
|
policy schedulerPolicy,
|
|
) (func(), bool) {
|
|
return r.tryReserveExecutorCapacityForLane(executor, jobType, policy, JobTypeLane(jobType))
|
|
}
|
|
|
|
// tryReserveExecutorCapacityForLane reserves an execution slot on the
|
|
// per-lane reservation pool so that lanes cannot starve each other.
|
|
func (r *Plugin) tryReserveExecutorCapacityForLane(
|
|
executor *WorkerSession,
|
|
jobType string,
|
|
policy schedulerPolicy,
|
|
lane SchedulerLane,
|
|
) (func(), bool) {
|
|
if executor == nil || strings.TrimSpace(executor.WorkerID) == "" {
|
|
return nil, false
|
|
}
|
|
|
|
limit := schedulerWorkerExecutionLimit(executor, jobType, policy)
|
|
if limit <= 0 {
|
|
return nil, false
|
|
}
|
|
heartbeatUsed := 0
|
|
if executor.Heartbeat != nil && executor.Heartbeat.ExecutionSlotsUsed > 0 {
|
|
heartbeatUsed = int(executor.Heartbeat.ExecutionSlotsUsed)
|
|
}
|
|
|
|
workerID := strings.TrimSpace(executor.WorkerID)
|
|
|
|
ls := r.lanes[lane]
|
|
if ls == nil {
|
|
// Fallback to global reservations if lane state is missing.
|
|
r.schedulerExecMu.Lock()
|
|
reserved := r.schedulerExecReservations[workerID]
|
|
if heartbeatUsed+reserved >= limit {
|
|
r.schedulerExecMu.Unlock()
|
|
return nil, false
|
|
}
|
|
r.schedulerExecReservations[workerID] = reserved + 1
|
|
r.schedulerExecMu.Unlock()
|
|
release := func() { r.releaseExecutorCapacity(workerID) }
|
|
return release, true
|
|
}
|
|
|
|
ls.execMu.Lock()
|
|
reserved := ls.execRes[workerID]
|
|
if heartbeatUsed+reserved >= limit {
|
|
ls.execMu.Unlock()
|
|
return nil, false
|
|
}
|
|
ls.execRes[workerID] = reserved + 1
|
|
ls.execMu.Unlock()
|
|
|
|
release := func() {
|
|
r.releaseExecutorCapacityForLane(workerID, lane)
|
|
}
|
|
return release, true
|
|
}
|
|
|
|
// releaseExecutorCapacityForLane releases a reservation from the per-lane pool.
|
|
func (r *Plugin) releaseExecutorCapacityForLane(workerID string, lane SchedulerLane) {
|
|
workerID = strings.TrimSpace(workerID)
|
|
if workerID == "" {
|
|
return
|
|
}
|
|
|
|
ls := r.lanes[lane]
|
|
if ls == nil {
|
|
r.releaseExecutorCapacity(workerID)
|
|
return
|
|
}
|
|
|
|
ls.execMu.Lock()
|
|
defer ls.execMu.Unlock()
|
|
|
|
current := ls.execRes[workerID]
|
|
if current <= 1 {
|
|
delete(ls.execRes, workerID)
|
|
return
|
|
}
|
|
ls.execRes[workerID] = current - 1
|
|
}
|
|
|
|
func (r *Plugin) releaseExecutorCapacity(workerID string) {
|
|
workerID = strings.TrimSpace(workerID)
|
|
if workerID == "" {
|
|
return
|
|
}
|
|
|
|
r.schedulerExecMu.Lock()
|
|
defer r.schedulerExecMu.Unlock()
|
|
|
|
current := r.schedulerExecReservations[workerID]
|
|
if current <= 1 {
|
|
delete(r.schedulerExecReservations, workerID)
|
|
return
|
|
}
|
|
r.schedulerExecReservations[workerID] = current - 1
|
|
}
|
|
|
|
func schedulerWorkerExecutionLimit(executor *WorkerSession, jobType string, policy schedulerPolicy) int {
|
|
limit := policy.PerWorkerConcurrency
|
|
if limit <= 0 {
|
|
limit = defaultScheduledPerWorkerConcurrency
|
|
}
|
|
|
|
if capability := executor.Capabilities[jobType]; capability != nil && capability.MaxExecutionConcurrency > 0 {
|
|
capLimit := int(capability.MaxExecutionConcurrency)
|
|
if capLimit < limit {
|
|
limit = capLimit
|
|
}
|
|
}
|
|
|
|
if executor.Heartbeat != nil && executor.Heartbeat.ExecutionSlotsTotal > 0 {
|
|
heartbeatLimit := int(executor.Heartbeat.ExecutionSlotsTotal)
|
|
if heartbeatLimit < limit {
|
|
limit = heartbeatLimit
|
|
}
|
|
}
|
|
|
|
if limit < 0 {
|
|
return 0
|
|
}
|
|
return limit
|
|
}
|
|
|
|
func (r *Plugin) executeScheduledJobWithExecutor(
|
|
ctx context.Context,
|
|
executor *WorkerSession,
|
|
job *plugin_pb.JobSpec,
|
|
clusterContext *plugin_pb.ClusterContext,
|
|
policy schedulerPolicy,
|
|
) error {
|
|
maxAttempts := policy.RetryLimit + 1
|
|
if maxAttempts < 1 {
|
|
maxAttempts = 1
|
|
}
|
|
|
|
var lastErr error
|
|
for attempt := 1; attempt <= maxAttempts; attempt++ {
|
|
select {
|
|
case <-r.shutdownCh:
|
|
return fmt.Errorf("plugin is shutting down")
|
|
default:
|
|
}
|
|
if ctx != nil && ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
|
|
parent := ctx
|
|
if parent == nil {
|
|
parent = context.Background()
|
|
}
|
|
// Use the job's estimated runtime if provided and larger than the
|
|
// default execution timeout. This lets handlers like vacuum scale
|
|
// the timeout based on volume size so large volumes are not killed.
|
|
timeout := policy.ExecutionTimeout
|
|
if job.Parameters != nil {
|
|
if est, ok := job.Parameters["estimated_runtime_seconds"]; ok {
|
|
if v := est.GetInt64Value(); v > 0 {
|
|
estimated := time.Duration(v) * time.Second
|
|
if estimated > maxEstimatedRuntimeCap {
|
|
estimated = maxEstimatedRuntimeCap
|
|
}
|
|
if estimated > timeout {
|
|
timeout = estimated
|
|
}
|
|
}
|
|
}
|
|
}
|
|
execCtx, cancel := context.WithTimeout(parent, timeout)
|
|
_, err := r.executeJobWithExecutor(execCtx, executor, job, clusterContext, int32(attempt))
|
|
cancel()
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if isExecutorAtCapacityError(err) {
|
|
return errExecutorAtCapacity
|
|
}
|
|
lastErr = err
|
|
|
|
if attempt < maxAttempts {
|
|
r.appendActivity(JobActivity{
|
|
JobID: job.JobId,
|
|
JobType: job.JobType,
|
|
Source: "admin_scheduler",
|
|
Message: fmt.Sprintf("retrying job attempt %d/%d after error: %v", attempt, maxAttempts, err),
|
|
Stage: "retry",
|
|
OccurredAt: timeToPtr(time.Now().UTC()),
|
|
})
|
|
if !waitForShutdownOrTimerWithContext(r.shutdownCh, ctx, policy.RetryBackoff) {
|
|
if ctx != nil && ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
return fmt.Errorf("plugin is shutting down")
|
|
}
|
|
}
|
|
}
|
|
|
|
if lastErr == nil {
|
|
lastErr = fmt.Errorf("execution failed without an explicit error")
|
|
}
|
|
return lastErr
|
|
}
|
|
|
|
func (r *Plugin) shouldSkipDetectionForWaitingJobs(jobType string, policy schedulerPolicy) (bool, int, int) {
|
|
waitingCount := r.countWaitingTrackedJobs(jobType)
|
|
threshold := waitingBacklogThreshold(policy)
|
|
if threshold <= 0 {
|
|
return false, waitingCount, threshold
|
|
}
|
|
return waitingCount >= threshold, waitingCount, threshold
|
|
}
|
|
|
|
func (r *Plugin) countWaitingTrackedJobs(jobType string) int {
|
|
normalizedJobType := strings.TrimSpace(jobType)
|
|
if normalizedJobType == "" {
|
|
return 0
|
|
}
|
|
|
|
waiting := 0
|
|
r.jobsMu.RLock()
|
|
for _, job := range r.jobs {
|
|
if job == nil {
|
|
continue
|
|
}
|
|
if strings.TrimSpace(job.JobType) != normalizedJobType {
|
|
continue
|
|
}
|
|
if !isWaitingTrackedJobState(job.State) {
|
|
continue
|
|
}
|
|
waiting++
|
|
}
|
|
r.jobsMu.RUnlock()
|
|
|
|
return waiting
|
|
}
|
|
|
|
func (r *Plugin) clearWaitingJobQueue(jobType string) int {
|
|
normalizedJobType := strings.TrimSpace(jobType)
|
|
if normalizedJobType == "" {
|
|
return 0
|
|
}
|
|
|
|
jobIDs := make([]string, 0)
|
|
seen := make(map[string]struct{})
|
|
|
|
r.jobsMu.RLock()
|
|
for _, job := range r.jobs {
|
|
if job == nil {
|
|
continue
|
|
}
|
|
if strings.TrimSpace(job.JobType) != normalizedJobType {
|
|
continue
|
|
}
|
|
if !isWaitingTrackedJobState(job.State) {
|
|
continue
|
|
}
|
|
jobID := strings.TrimSpace(job.JobID)
|
|
if jobID == "" {
|
|
continue
|
|
}
|
|
if _, ok := seen[jobID]; ok {
|
|
continue
|
|
}
|
|
seen[jobID] = struct{}{}
|
|
jobIDs = append(jobIDs, jobID)
|
|
}
|
|
r.jobsMu.RUnlock()
|
|
|
|
if len(jobIDs) == 0 {
|
|
return 0
|
|
}
|
|
|
|
reason := fmt.Sprintf("cleared queued job before %s run", normalizedJobType)
|
|
for _, jobID := range jobIDs {
|
|
r.markJobCanceled(&plugin_pb.JobSpec{
|
|
JobId: jobID,
|
|
JobType: normalizedJobType,
|
|
}, reason)
|
|
}
|
|
|
|
return len(jobIDs)
|
|
}
|
|
|
|
func waitingBacklogThreshold(policy schedulerPolicy) int {
|
|
concurrency := policy.ExecutionConcurrency
|
|
if concurrency <= 0 {
|
|
concurrency = defaultScheduledExecutionConcurrency
|
|
}
|
|
threshold := concurrency * defaultWaitingBacklogMultiplier
|
|
if threshold < defaultWaitingBacklogFloor {
|
|
threshold = defaultWaitingBacklogFloor
|
|
}
|
|
if policy.MaxResults > 0 && threshold > int(policy.MaxResults) {
|
|
threshold = int(policy.MaxResults)
|
|
}
|
|
return threshold
|
|
}
|
|
|
|
func isExecutorAtCapacityError(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
if errors.Is(err, errExecutorAtCapacity) {
|
|
return true
|
|
}
|
|
return strings.Contains(strings.ToLower(err.Error()), "executor is at capacity")
|
|
}
|
|
|
|
func buildScheduledJobSpec(jobType string, proposal *plugin_pb.JobProposal, index int) *plugin_pb.JobSpec {
|
|
now := timestamppb.Now()
|
|
|
|
jobID := fmt.Sprintf("%s-scheduled-%d-%d", jobType, now.AsTime().UnixNano(), index)
|
|
|
|
job := &plugin_pb.JobSpec{
|
|
JobId: jobID,
|
|
JobType: jobType,
|
|
Priority: plugin_pb.JobPriority_JOB_PRIORITY_NORMAL,
|
|
Parameters: map[string]*plugin_pb.ConfigValue{},
|
|
Labels: map[string]string{},
|
|
CreatedAt: now,
|
|
ScheduledAt: now,
|
|
}
|
|
|
|
if proposal == nil {
|
|
return job
|
|
}
|
|
|
|
if proposal.JobType != "" {
|
|
job.JobType = proposal.JobType
|
|
}
|
|
job.Summary = proposal.Summary
|
|
job.Detail = proposal.Detail
|
|
if proposal.Priority != plugin_pb.JobPriority_JOB_PRIORITY_UNSPECIFIED {
|
|
job.Priority = proposal.Priority
|
|
}
|
|
job.DedupeKey = proposal.DedupeKey
|
|
job.Parameters = CloneConfigValueMap(proposal.Parameters)
|
|
if proposal.Labels != nil {
|
|
job.Labels = make(map[string]string, len(proposal.Labels))
|
|
for k, v := range proposal.Labels {
|
|
job.Labels[k] = v
|
|
}
|
|
}
|
|
if proposal.NotBefore != nil {
|
|
job.ScheduledAt = proposal.NotBefore
|
|
}
|
|
|
|
return job
|
|
}
|
|
|
|
func durationFromSeconds(seconds int32, defaultValue time.Duration) time.Duration {
|
|
if seconds <= 0 {
|
|
return defaultValue
|
|
}
|
|
return time.Duration(seconds) * time.Second
|
|
}
|
|
|
|
func secondsFromDuration(duration time.Duration) int32 {
|
|
if duration <= 0 {
|
|
return 0
|
|
}
|
|
return int32(duration / time.Second)
|
|
}
|
|
|
|
func waitForShutdownOrTimerWithContext(shutdown <-chan struct{}, ctx context.Context, duration time.Duration) bool {
|
|
if duration <= 0 {
|
|
return true
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
timer := time.NewTimer(duration)
|
|
defer timer.Stop()
|
|
|
|
select {
|
|
case <-shutdown:
|
|
return false
|
|
case <-ctx.Done():
|
|
return false
|
|
case <-timer.C:
|
|
return true
|
|
}
|
|
}
|
|
|
|
// filterProposalsWithActiveJobs removes proposals whose dedupe keys already have active jobs.
|
|
// It first expires stale tracked jobs via expireStaleJobs, which can mutate scheduler state,
|
|
// so callers should treat this method as a stateful operation.
|
|
func (r *Plugin) filterProposalsWithActiveJobs(jobType string, proposals []*plugin_pb.JobProposal) ([]*plugin_pb.JobProposal, int) {
|
|
if len(proposals) == 0 {
|
|
return proposals, 0
|
|
}
|
|
|
|
r.expireStaleJobs(time.Now().UTC())
|
|
|
|
activeKeys := make(map[string]struct{})
|
|
r.jobsMu.RLock()
|
|
for _, job := range r.jobs {
|
|
if job == nil {
|
|
continue
|
|
}
|
|
if strings.TrimSpace(job.JobType) != strings.TrimSpace(jobType) {
|
|
continue
|
|
}
|
|
if !isActiveTrackedJobState(job.State) {
|
|
continue
|
|
}
|
|
|
|
key := strings.TrimSpace(job.DedupeKey)
|
|
if key == "" {
|
|
key = strings.TrimSpace(job.JobID)
|
|
}
|
|
if key == "" {
|
|
continue
|
|
}
|
|
activeKeys[key] = struct{}{}
|
|
}
|
|
r.jobsMu.RUnlock()
|
|
|
|
if len(activeKeys) == 0 {
|
|
return proposals, 0
|
|
}
|
|
|
|
filtered := make([]*plugin_pb.JobProposal, 0, len(proposals))
|
|
skipped := 0
|
|
for _, proposal := range proposals {
|
|
if proposal == nil {
|
|
continue
|
|
}
|
|
key := proposalExecutionKey(proposal)
|
|
if key != "" {
|
|
if _, exists := activeKeys[key]; exists {
|
|
skipped++
|
|
continue
|
|
}
|
|
}
|
|
filtered = append(filtered, proposal)
|
|
}
|
|
|
|
return filtered, skipped
|
|
}
|
|
|
|
func proposalExecutionKey(proposal *plugin_pb.JobProposal) string {
|
|
if proposal == nil {
|
|
return ""
|
|
}
|
|
key := strings.TrimSpace(proposal.DedupeKey)
|
|
if key != "" {
|
|
return key
|
|
}
|
|
return strings.TrimSpace(proposal.ProposalId)
|
|
}
|
|
|
|
func isActiveTrackedJobState(state string) bool {
|
|
normalized := strings.ToLower(strings.TrimSpace(state))
|
|
switch normalized {
|
|
case "pending", "assigned", "running", "in_progress", "job_state_pending", "job_state_assigned", "job_state_running":
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func isWaitingTrackedJobState(state string) bool {
|
|
normalized := strings.ToLower(strings.TrimSpace(state))
|
|
return normalized == "pending" || normalized == "job_state_pending"
|
|
}
|
|
|
|
// DispatchProposals dispatches a batch of proposals using the same capacity-aware
|
|
// dispatch logic as the scheduler loop: concurrent execution, executor reservation
|
|
// with backoff, and per-job retry on transient errors. The scheduler policy is
|
|
// loaded from the persisted job type config; if the job type has no config or is
|
|
// disabled a sensible default policy is used so manual runs always work.
|
|
func (r *Plugin) DispatchProposals(
|
|
ctx context.Context,
|
|
jobType string,
|
|
proposals []*plugin_pb.JobProposal,
|
|
clusterContext *plugin_pb.ClusterContext,
|
|
) (successCount, errorCount, canceledCount int) {
|
|
if len(proposals) == 0 {
|
|
return 0, 0, 0
|
|
}
|
|
|
|
policy, enabled, err := r.loadSchedulerPolicy(jobType)
|
|
if err != nil || !enabled {
|
|
policy = schedulerPolicy{
|
|
ExecutionConcurrency: defaultScheduledExecutionConcurrency,
|
|
PerWorkerConcurrency: defaultScheduledPerWorkerConcurrency,
|
|
ExecutionTimeout: defaultScheduledExecutionTimeout,
|
|
RetryBackoff: defaultScheduledRetryBackoff,
|
|
ExecutorReserveBackoff: 200 * time.Millisecond,
|
|
}
|
|
}
|
|
|
|
return r.dispatchScheduledProposals(ctx, jobType, proposals, clusterContext, policy)
|
|
}
|
|
|
|
func (r *Plugin) filterScheduledProposals(proposals []*plugin_pb.JobProposal) []*plugin_pb.JobProposal {
|
|
filtered := make([]*plugin_pb.JobProposal, 0, len(proposals))
|
|
seenInRun := make(map[string]struct{}, len(proposals))
|
|
|
|
for _, proposal := range proposals {
|
|
if proposal == nil {
|
|
continue
|
|
}
|
|
|
|
key := proposal.DedupeKey
|
|
if key == "" {
|
|
key = proposal.ProposalId
|
|
}
|
|
if key == "" {
|
|
filtered = append(filtered, proposal)
|
|
continue
|
|
}
|
|
|
|
if _, exists := seenInRun[key]; exists {
|
|
continue
|
|
}
|
|
|
|
seenInRun[key] = struct{}{}
|
|
filtered = append(filtered, proposal)
|
|
}
|
|
|
|
return filtered
|
|
}
|