Files
seaweedFS/weed/admin/plugin/plugin_monitor.go
Chris Lu 8ec9ff4a12 Refactor plugin system and migrate worker runtime (#8369)
* admin: add plugin runtime UI page and route wiring

* pb: add plugin gRPC contract and generated bindings

* admin/plugin: implement worker registry, runtime, monitoring, and config store

* admin/dash: wire plugin runtime and expose plugin workflow APIs

* command: add flags to enable plugin runtime

* admin: rename remaining plugin v2 wording to plugin

* admin/plugin: add detectable job type registry helper

* admin/plugin: add scheduled detection and dispatch orchestration

* admin/plugin: prefetch job type descriptors when workers connect

* admin/plugin: add known job type discovery API and UI

* admin/plugin: refresh design doc to match current implementation

* admin/plugin: enforce per-worker scheduler concurrency limits

* admin/plugin: use descriptor runtime defaults for scheduler policy

* admin/ui: auto-load first known plugin job type on page open

* admin/plugin: bootstrap persisted config from descriptor defaults

* admin/plugin: dedupe scheduled proposals by dedupe key

* admin/ui: add job type and state filters for plugin monitoring

* admin/ui: add per-job-type plugin activity summary

* admin/plugin: split descriptor read API from schema refresh

* admin/ui: keep plugin summary metrics global while tables are filtered

* admin/plugin: retry executor reservation before timing out

* admin/plugin: expose scheduler states for monitoring

* admin/ui: show per-job-type scheduler states in plugin monitor

* pb/plugin: rename protobuf package to plugin

* admin/plugin: rename pluginRuntime wiring to plugin

* admin/plugin: remove runtime naming from plugin APIs and UI

* admin/plugin: rename runtime files to plugin naming

* admin/plugin: persist jobs and activities for monitor recovery

* admin/plugin: lease one detector worker per job type

* admin/ui: show worker load from plugin heartbeats

* admin/plugin: skip stale workers for detector and executor picks

* plugin/worker: add plugin worker command and stream runtime scaffold

* plugin/worker: implement vacuum detect and execute handlers

* admin/plugin: document external vacuum plugin worker starter

* command: update plugin.worker help to reflect implemented flow

* command/admin: drop legacy Plugin V2 label

* plugin/worker: validate vacuum job type and respect min interval

* plugin/worker: test no-op detect when min interval not elapsed

* command/admin: document plugin.worker external process

* plugin/worker: advertise configured concurrency in hello

* command/plugin.worker: add jobType handler selection

* command/plugin.worker: test handler selection by job type

* command/plugin.worker: persist worker id in workingDir

* admin/plugin: document plugin.worker jobType and workingDir flags

* plugin/worker: support cancel request for in-flight work

* plugin/worker: test cancel request acknowledgements

* command/plugin.worker: document workingDir and jobType behavior

* plugin/worker: emit executor activity events for monitor

* plugin/worker: test executor activity builder

* admin/plugin: send last successful run in detection request

* admin/plugin: send cancel request when detect or execute context ends

* admin/plugin: document worker cancel request responsibility

* admin/handlers: expose plugin scheduler states API in no-auth mode

* admin/handlers: test plugin scheduler states route registration

* admin/plugin: keep worker id on worker-generated activity records

* admin/plugin: test worker id propagation in monitor activities

* admin/dash: always initialize plugin service

* command/admin: remove plugin enable flags and default to enabled

* admin/dash: drop pluginEnabled constructor parameter

* admin/plugin UI: stop checking plugin enabled state

* admin/plugin: remove docs for plugin enable flags

* admin/dash: remove unused plugin enabled check method

* admin/dash: fallback to in-memory plugin init when dataDir fails

* admin/plugin API: expose worker gRPC port in status

* command/plugin.worker: resolve admin gRPC port via plugin status

* split plugin UI into overview/configuration/monitoring pages

* Update layout_templ.go

* add volume_balance plugin worker handler

* wire plugin.worker CLI for volume_balance job type

* add erasure_coding plugin worker handler

* wire plugin.worker CLI for erasure_coding job type

* support multi-job handlers in plugin worker runtime

* allow plugin.worker jobType as comma-separated list

* admin/plugin UI: rename to Workers and simplify config view

* plugin worker: queue detection requests instead of capacity reject

* Update plugin_worker.go

* plugin volume_balance: remove force_move/timeout from worker config UI

* plugin erasure_coding: enforce local working dir and cleanup

* admin/plugin UI: rename admin settings to job scheduling

* admin/plugin UI: persist and robustly render detection results

* admin/plugin: record and return detection trace metadata

* admin/plugin UI: show detection process and decision trace

* plugin: surface detector decision trace as activities

* mini: start a plugin worker by default

* admin/plugin UI: split monitoring into detection and execution tabs

* plugin worker: emit detection decision trace for EC and balance

* admin workers UI: split monitoring into detection and execution pages

* plugin scheduler: skip proposals for active assigned/running jobs

* admin workers UI: add job queue tab

* plugin worker: add dummy stress detector and executor job type

* admin workers UI: reorder tabs to detection queue execution

* admin workers UI: regenerate plugin template

* plugin defaults: include dummy stress and add stress tests

* plugin dummy stress: rotate detection selections across runs

* plugin scheduler: remove cross-run proposal dedupe

* plugin queue: track pending scheduled jobs

* plugin scheduler: wait for executor capacity before dispatch

* plugin scheduler: skip detection when waiting backlog is high

* plugin: add disk-backed job detail API and persistence

* admin ui: show plugin job detail modal from job id links

* plugin: generate unique job ids instead of reusing proposal ids

* plugin worker: emit heartbeats on work state changes

* plugin registry: round-robin tied executor and detector picks

* add temporary EC overnight stress runner

* plugin job details: persist and render EC execution plans

* ec volume details: color data and parity shard badges

* shard labels: keep parity ids numeric and color-only distinction

* admin: remove legacy maintenance UI routes and templates

* admin: remove dead maintenance endpoint helpers

* Update layout_templ.go

* remove dummy_stress worker and command support

* refactor plugin UI to job-type top tabs and sub-tabs

* migrate weed worker command to plugin runtime

* remove plugin.worker command and keep worker runtime with metrics

* update helm worker args for jobType and execution flags

* set plugin scheduling defaults to global 16 and per-worker 4

* stress: fix RPC context reuse and remove redundant variables in ec_stress_runner

* admin/plugin: fix lifecycle races, safe channel operations, and terminal state constants

* admin/dash: randomize job IDs and fix priority zero-value overwrite in plugin API

* admin/handlers: implement buffered rendering to prevent response corruption

* admin/plugin: implement debounced persistence flusher and optimize BuildJobDetail memory lookups

* admin/plugin: fix priority overwrite and implement bounded wait in scheduler reserve

* admin/plugin: implement atomic file writes and fix run record side effects

* admin/plugin: use P prefix for parity shard labels in execution plans

* admin/plugin: enable parallel execution for cancellation tests

* admin: refactor time.Time fields to pointers for better JSON omitempty support

* admin/plugin: implement pointer-safe time assignments and comparisons in plugin core

* admin/plugin: fix time assignment and sorting logic in plugin monitor after pointer refactor

* admin/plugin: update scheduler activity tracking to use time pointers

* admin/plugin: fix time-based run history trimming after pointer refactor

* admin/dash: fix JobSpec struct literal in plugin API after pointer refactor

* admin/view: add D/P prefixes to EC shard badges for UI consistency

* admin/plugin: use lifecycle-aware context for schema prefetching

* Update ec_volume_details_templ.go

* admin/stress: fix proposal sorting and log volume cleanup errors

* stress: refine ec stress runner with math/rand and collection name

- Added Collection field to VolumeEcShardsDeleteRequest for correct filename construction.
- Replaced crypto/rand with seeded math/rand PRNG for bulk payloads.
- Added documentation for EcMinAge zero-value behavior.
- Added logging for ignored errors in volume/shard deletion.

* admin: return internal server error for plugin store failures

Changed error status code from 400 Bad Request to 500 Internal Server Error for failures in GetPluginJobDetail to correctly reflect server-side errors.

* admin: implement safe channel sends and graceful shutdown sync

- Added sync.WaitGroup to Plugin struct to manage background goroutines.
- Implemented safeSendCh helper using recover() to prevent panics on closed channels.
- Ensured Shutdown() waits for all background operations to complete.

* admin: robustify plugin monitor with nil-safe time and record init

- Standardized nil-safe assignment for *time.Time pointers (CreatedAt, UpdatedAt, CompletedAt).
- Ensured persistJobDetailSnapshot initializes new records correctly if they don't exist on disk.
- Fixed debounced persistence to trigger immediate write on job completion.

* admin: improve scheduler shutdown behavior and logic guards

- Replaced brittle error string matching with explicit r.shutdownCh selection for shutdown detection.
- Removed redundant nil guard in buildScheduledJobSpec.
- Standardized WaitGroup usage for schedulerLoop.

* admin: implement deep copy for job parameters and atomic write fixes

- Implemented deepCopyGenericValue and used it in cloneTrackedJob to prevent shared state.
- Ensured atomicWriteFile creates parent directories before writing.

* admin: remove unreachable branch in shard classification

Removed an unreachable 'totalShards <= 0' check in classifyShardID as dataShards and parityShards are already guarded.

* admin: secure UI links and use canonical shard constants

- Added rel="noopener noreferrer" to external links for security.
- Replaced magic number 14 with erasure_coding.TotalShardsCount.
- Used renderEcShardBadge for missing shard list consistency.

* admin: stabilize plugin tests and fix regressions

- Composed a robust plugin_monitor_test.go to handle asynchronous persistence.
- Updated all time.Time literals to use timeToPtr helper.
- Added explicit Shutdown() calls in tests to synchronize with debounced writes.
- Fixed syntax errors and orphaned struct literals in tests.

* Potential fix for code scanning alert no. 278: Slice memory allocation with excessive size value

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>

* Potential fix for code scanning alert no. 283: Uncontrolled data used in path expression

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>

* admin: finalize refinements for error handling, scheduler, and race fixes

- Standardized HTTP 500 status codes for store failures in plugin_api.go.
- Tracked scheduled detection goroutines with sync.WaitGroup for safe shutdown.
- Fixed race condition in safeSendDetectionComplete by extracting channel under lock.
- Implemented deep copy for JobActivity details.
- Used defaultDirPerm constant in atomicWriteFile.

* test(ec): migrate admin dockertest to plugin APIs

* admin/plugin_api: fix RunPluginJobTypeAPI to return 500 for server-side detection/filter errors

* admin/plugin_api: fix ExecutePluginJobAPI to return 500 for job execution failures

* admin/plugin_api: limit parseProtoJSONBody request body to 1MB to prevent unbounded memory usage

* admin/plugin: consolidate regex to package-level validJobTypePattern; add char validation to sanitizeJobID

* admin/plugin: fix racy Shutdown channel close with sync.Once

* admin/plugin: track sendLoop and recv goroutines in WorkerStream with r.wg

* admin/plugin: document writeProtoFiles atomicity — .pb is source of truth, .json is human-readable only

* admin/plugin: extract activityLess helper to deduplicate nil-safe OccurredAt sort comparators

* test/ec: check http.NewRequest errors to prevent nil req panics

* test/ec: replace deprecated ioutil/math/rand, fix stale step comment 5.1→3.1

* plugin(ec): raise default detection and scheduling throughput limits

* topology: include empty disks in volume list and EC capacity fallback

* topology: remove hard 10-task cap for detection planning

* Update ec_volume_details_templ.go

* adjust default

* fix tests

---------

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
2026-02-18 13:42:41 -08:00

897 lines
22 KiB
Go

package plugin
import (
"encoding/json"
"sort"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
"google.golang.org/protobuf/encoding/protojson"
)
const (
maxTrackedJobsTotal = 1000
maxActivityRecords = 4000
maxRelatedJobs = 100
)
var (
StateSucceeded = strings.ToLower(plugin_pb.JobState_JOB_STATE_SUCCEEDED.String())
StateFailed = strings.ToLower(plugin_pb.JobState_JOB_STATE_FAILED.String())
StateCanceled = strings.ToLower(plugin_pb.JobState_JOB_STATE_CANCELED.String())
)
// activityLess reports whether activity a occurred after activity b (newest-first order).
// A nil OccurredAt is treated as the zero time.
func activityLess(a, b JobActivity) bool {
ta := time.Time{}
if a.OccurredAt != nil {
ta = *a.OccurredAt
}
tb := time.Time{}
if b.OccurredAt != nil {
tb = *b.OccurredAt
}
return ta.After(tb)
}
func (r *Plugin) loadPersistedMonitorState() error {
trackedJobs, err := r.store.LoadTrackedJobs()
if err != nil {
return err
}
activities, err := r.store.LoadActivities()
if err != nil {
return err
}
if len(trackedJobs) > 0 {
r.jobsMu.Lock()
for i := range trackedJobs {
job := trackedJobs[i]
if strings.TrimSpace(job.JobID) == "" {
continue
}
// Backward compatibility: migrate older inline detail payloads
// out of tracked_jobs.json into dedicated per-job detail files.
if hasTrackedJobRichDetails(job) {
if err := r.store.SaveJobDetail(job); err != nil {
glog.Warningf("Plugin failed to migrate detail snapshot for job %s: %v", job.JobID, err)
}
}
stripTrackedJobDetailFields(&job)
jobCopy := job
r.jobs[job.JobID] = &jobCopy
}
r.pruneTrackedJobsLocked()
r.jobsMu.Unlock()
}
if len(activities) > maxActivityRecords {
activities = activities[len(activities)-maxActivityRecords:]
}
if len(activities) > 0 {
r.activitiesMu.Lock()
r.activities = append([]JobActivity(nil), activities...)
r.activitiesMu.Unlock()
}
return nil
}
func (r *Plugin) ListTrackedJobs(jobType string, state string, limit int) []TrackedJob {
r.jobsMu.RLock()
defer r.jobsMu.RUnlock()
normalizedJobType := strings.TrimSpace(jobType)
normalizedState := strings.TrimSpace(strings.ToLower(state))
items := make([]TrackedJob, 0, len(r.jobs))
for _, job := range r.jobs {
if job == nil {
continue
}
if normalizedJobType != "" && job.JobType != normalizedJobType {
continue
}
if normalizedState != "" && strings.ToLower(job.State) != normalizedState {
continue
}
items = append(items, cloneTrackedJob(*job))
}
sort.Slice(items, func(i, j int) bool {
ti := time.Time{}
if items[i].UpdatedAt != nil {
ti = *items[i].UpdatedAt
}
tj := time.Time{}
if items[j].UpdatedAt != nil {
tj = *items[j].UpdatedAt
}
if !ti.Equal(tj) {
return ti.After(tj)
}
return items[i].JobID < items[j].JobID
})
if limit > 0 && len(items) > limit {
items = items[:limit]
}
return items
}
func (r *Plugin) GetTrackedJob(jobID string) (*TrackedJob, bool) {
r.jobsMu.RLock()
defer r.jobsMu.RUnlock()
job, ok := r.jobs[jobID]
if !ok || job == nil {
return nil, false
}
clone := cloneTrackedJob(*job)
return &clone, true
}
func (r *Plugin) ListActivities(jobType string, limit int) []JobActivity {
r.activitiesMu.RLock()
defer r.activitiesMu.RUnlock()
normalized := strings.TrimSpace(jobType)
activities := make([]JobActivity, 0, len(r.activities))
for _, activity := range r.activities {
if normalized != "" && activity.JobType != normalized {
continue
}
activities = append(activities, activity)
}
sort.Slice(activities, func(i, j int) bool {
return activityLess(activities[i], activities[j])
})
if limit > 0 && len(activities) > limit {
activities = activities[:limit]
}
return activities
}
func (r *Plugin) ListJobActivities(jobID string, limit int) []JobActivity {
normalizedJobID := strings.TrimSpace(jobID)
if normalizedJobID == "" {
return nil
}
r.activitiesMu.RLock()
activities := make([]JobActivity, 0, len(r.activities))
for _, activity := range r.activities {
if strings.TrimSpace(activity.JobID) != normalizedJobID {
continue
}
activities = append(activities, activity)
}
r.activitiesMu.RUnlock()
sort.Slice(activities, func(i, j int) bool {
return !activityLess(activities[i], activities[j]) // oldest-first for job timeline
})
if limit > 0 && len(activities) > limit {
activities = activities[len(activities)-limit:]
}
return activities
}
func (r *Plugin) BuildJobDetail(jobID string, activityLimit int, relatedLimit int) (*JobDetail, bool, error) {
normalizedJobID := strings.TrimSpace(jobID)
if normalizedJobID == "" {
return nil, false, nil
}
// Clamp relatedLimit to a safe range to avoid excessive memory allocation from untrusted input.
if relatedLimit <= 0 {
relatedLimit = 0
} else if relatedLimit > maxRelatedJobs {
relatedLimit = maxRelatedJobs
}
r.jobsMu.RLock()
trackedSnapshot, ok := r.jobs[normalizedJobID]
if ok && trackedSnapshot != nil {
candidate := cloneTrackedJob(*trackedSnapshot)
stripTrackedJobDetailFields(&candidate)
trackedSnapshot = &candidate
} else {
trackedSnapshot = nil
}
r.jobsMu.RUnlock()
detailJob, err := r.store.LoadJobDetail(normalizedJobID)
if err != nil {
return nil, false, err
}
if trackedSnapshot == nil && detailJob == nil {
return nil, false, nil
}
if detailJob == nil && trackedSnapshot != nil {
clone := cloneTrackedJob(*trackedSnapshot)
detailJob = &clone
}
if detailJob == nil {
return nil, false, nil
}
if trackedSnapshot != nil {
mergeTrackedStatusIntoDetail(detailJob, trackedSnapshot)
}
detailJob.Parameters = enrichTrackedJobParameters(detailJob.JobType, detailJob.Parameters)
r.activitiesMu.RLock()
activities := append([]JobActivity(nil), r.activities...)
r.activitiesMu.RUnlock()
detail := &JobDetail{
Job: detailJob,
Activities: filterJobActivitiesFromSlice(activities, normalizedJobID, activityLimit),
LastUpdated: timeToPtr(time.Now().UTC()),
}
if history, err := r.store.LoadRunHistory(detailJob.JobType); err != nil {
return nil, true, err
} else if history != nil {
for i := range history.SuccessfulRuns {
record := history.SuccessfulRuns[i]
if strings.TrimSpace(record.JobID) == normalizedJobID {
recordCopy := record
detail.RunRecord = &recordCopy
break
}
}
if detail.RunRecord == nil {
for i := range history.ErrorRuns {
record := history.ErrorRuns[i]
if strings.TrimSpace(record.JobID) == normalizedJobID {
recordCopy := record
detail.RunRecord = &recordCopy
break
}
}
}
}
if relatedLimit > 0 {
related := make([]TrackedJob, 0, relatedLimit)
r.jobsMu.RLock()
for _, candidate := range r.jobs {
if strings.TrimSpace(candidate.JobType) != strings.TrimSpace(detailJob.JobType) {
continue
}
if strings.TrimSpace(candidate.JobID) == normalizedJobID {
continue
}
cloned := cloneTrackedJob(*candidate)
stripTrackedJobDetailFields(&cloned)
related = append(related, cloned)
if len(related) >= relatedLimit {
break
}
}
r.jobsMu.RUnlock()
detail.RelatedJobs = related
}
return detail, true, nil
}
func filterJobActivitiesFromSlice(all []JobActivity, jobID string, limit int) []JobActivity {
if strings.TrimSpace(jobID) == "" || len(all) == 0 {
return nil
}
activities := make([]JobActivity, 0, len(all))
for _, activity := range all {
if strings.TrimSpace(activity.JobID) != jobID {
continue
}
activities = append(activities, activity)
}
sort.Slice(activities, func(i, j int) bool {
return !activityLess(activities[i], activities[j]) // oldest-first for job timeline
})
if limit > 0 && len(activities) > limit {
activities = activities[len(activities)-limit:]
}
return activities
}
func stripTrackedJobDetailFields(job *TrackedJob) {
if job == nil {
return
}
job.Detail = ""
job.Parameters = nil
job.Labels = nil
job.ResultOutputValues = nil
}
func hasTrackedJobRichDetails(job TrackedJob) bool {
return strings.TrimSpace(job.Detail) != "" ||
len(job.Parameters) > 0 ||
len(job.Labels) > 0 ||
len(job.ResultOutputValues) > 0
}
func mergeTrackedStatusIntoDetail(detail *TrackedJob, tracked *TrackedJob) {
if detail == nil || tracked == nil {
return
}
if detail.JobType == "" {
detail.JobType = tracked.JobType
}
if detail.RequestID == "" {
detail.RequestID = tracked.RequestID
}
if detail.WorkerID == "" {
detail.WorkerID = tracked.WorkerID
}
if detail.DedupeKey == "" {
detail.DedupeKey = tracked.DedupeKey
}
if detail.Summary == "" {
detail.Summary = tracked.Summary
}
if detail.State == "" {
detail.State = tracked.State
}
if detail.Progress == 0 {
detail.Progress = tracked.Progress
}
if detail.Stage == "" {
detail.Stage = tracked.Stage
}
if detail.Message == "" {
detail.Message = tracked.Message
}
if detail.Attempt == 0 {
detail.Attempt = tracked.Attempt
}
if detail.CreatedAt == nil || detail.CreatedAt.IsZero() {
detail.CreatedAt = tracked.CreatedAt
}
if detail.UpdatedAt == nil || detail.UpdatedAt.IsZero() {
detail.UpdatedAt = tracked.UpdatedAt
}
if detail.CompletedAt == nil || detail.CompletedAt.IsZero() {
detail.CompletedAt = tracked.CompletedAt
}
if detail.ErrorMessage == "" {
detail.ErrorMessage = tracked.ErrorMessage
}
if detail.ResultSummary == "" {
detail.ResultSummary = tracked.ResultSummary
}
}
func (r *Plugin) handleJobProgressUpdate(workerID string, update *plugin_pb.JobProgressUpdate) {
if update == nil {
return
}
now := time.Now().UTC()
resolvedWorkerID := strings.TrimSpace(workerID)
if strings.TrimSpace(update.JobId) != "" {
r.jobsMu.Lock()
job := r.jobs[update.JobId]
if job == nil {
job = &TrackedJob{
JobID: update.JobId,
JobType: update.JobType,
RequestID: update.RequestId,
WorkerID: resolvedWorkerID,
CreatedAt: timeToPtr(now),
}
r.jobs[update.JobId] = job
}
if update.JobType != "" {
job.JobType = update.JobType
}
if update.RequestId != "" {
job.RequestID = update.RequestId
}
if job.WorkerID != "" {
resolvedWorkerID = job.WorkerID
} else if resolvedWorkerID != "" {
job.WorkerID = resolvedWorkerID
}
job.State = strings.ToLower(update.State.String())
job.Progress = update.ProgressPercent
job.Stage = update.Stage
job.Message = update.Message
job.UpdatedAt = timeToPtr(now)
r.pruneTrackedJobsLocked()
r.dirtyJobs = true
r.jobsMu.Unlock()
}
r.trackWorkerActivities(update.JobType, update.JobId, update.RequestId, resolvedWorkerID, update.Activities)
if update.Message != "" || update.Stage != "" {
source := "worker_progress"
if strings.TrimSpace(update.JobId) == "" {
source = "worker_detection"
}
r.appendActivity(JobActivity{
JobID: update.JobId,
JobType: update.JobType,
RequestID: update.RequestId,
WorkerID: resolvedWorkerID,
Source: source,
Message: update.Message,
Stage: update.Stage,
OccurredAt: timeToPtr(now),
})
}
}
func (r *Plugin) trackExecutionStart(requestID, workerID string, job *plugin_pb.JobSpec, attempt int32) {
if job == nil || strings.TrimSpace(job.JobId) == "" {
return
}
now := time.Now().UTC()
r.jobsMu.Lock()
tracked := r.jobs[job.JobId]
if tracked == nil {
tracked = &TrackedJob{
JobID: job.JobId,
CreatedAt: timeToPtr(now),
}
r.jobs[job.JobId] = tracked
}
tracked.JobType = job.JobType
tracked.RequestID = requestID
tracked.WorkerID = workerID
tracked.DedupeKey = job.DedupeKey
tracked.Summary = job.Summary
tracked.State = strings.ToLower(plugin_pb.JobState_JOB_STATE_ASSIGNED.String())
tracked.Progress = 0
tracked.Stage = "assigned"
tracked.Message = "job assigned to worker"
tracked.Attempt = attempt
if tracked.CreatedAt == nil || tracked.CreatedAt.IsZero() {
tracked.CreatedAt = timeToPtr(now)
}
tracked.UpdatedAt = timeToPtr(now)
trackedSnapshot := cloneTrackedJob(*tracked)
r.pruneTrackedJobsLocked()
r.dirtyJobs = true
r.jobsMu.Unlock()
r.persistJobDetailSnapshot(job.JobId, func(detail *TrackedJob) {
detail.JobID = job.JobId
detail.JobType = job.JobType
detail.RequestID = requestID
detail.WorkerID = workerID
detail.DedupeKey = job.DedupeKey
detail.Summary = job.Summary
detail.Detail = job.Detail
detail.Parameters = enrichTrackedJobParameters(job.JobType, configValueMapToPlain(job.Parameters))
if len(job.Labels) > 0 {
labels := make(map[string]string, len(job.Labels))
for key, value := range job.Labels {
labels[key] = value
}
detail.Labels = labels
} else {
detail.Labels = nil
}
detail.State = trackedSnapshot.State
detail.Progress = trackedSnapshot.Progress
detail.Stage = trackedSnapshot.Stage
detail.Message = trackedSnapshot.Message
detail.Attempt = attempt
if detail.CreatedAt == nil || detail.CreatedAt.IsZero() {
detail.CreatedAt = trackedSnapshot.CreatedAt
}
detail.UpdatedAt = trackedSnapshot.UpdatedAt
})
r.appendActivity(JobActivity{
JobID: job.JobId,
JobType: job.JobType,
RequestID: requestID,
WorkerID: workerID,
Source: "admin_dispatch",
Message: "job assigned",
Stage: "assigned",
OccurredAt: timeToPtr(now),
})
}
func (r *Plugin) trackExecutionQueued(job *plugin_pb.JobSpec) {
if job == nil || strings.TrimSpace(job.JobId) == "" {
return
}
now := time.Now().UTC()
r.jobsMu.Lock()
tracked := r.jobs[job.JobId]
if tracked == nil {
tracked = &TrackedJob{
JobID: job.JobId,
CreatedAt: timeToPtr(now),
}
r.jobs[job.JobId] = tracked
}
tracked.JobType = job.JobType
tracked.DedupeKey = job.DedupeKey
tracked.Summary = job.Summary
tracked.State = strings.ToLower(plugin_pb.JobState_JOB_STATE_PENDING.String())
tracked.Progress = 0
tracked.Stage = "queued"
tracked.Message = "waiting for available executor"
if tracked.CreatedAt == nil || tracked.CreatedAt.IsZero() {
tracked.CreatedAt = timeToPtr(now)
}
tracked.UpdatedAt = timeToPtr(now)
trackedSnapshot := cloneTrackedJob(*tracked)
r.pruneTrackedJobsLocked()
r.dirtyJobs = true
r.jobsMu.Unlock()
r.persistJobDetailSnapshot(job.JobId, func(detail *TrackedJob) {
detail.JobID = job.JobId
detail.JobType = job.JobType
detail.DedupeKey = job.DedupeKey
detail.Summary = job.Summary
detail.Detail = job.Detail
detail.Parameters = enrichTrackedJobParameters(job.JobType, configValueMapToPlain(job.Parameters))
if len(job.Labels) > 0 {
labels := make(map[string]string, len(job.Labels))
for key, value := range job.Labels {
labels[key] = value
}
detail.Labels = labels
} else {
detail.Labels = nil
}
detail.State = trackedSnapshot.State
detail.Progress = trackedSnapshot.Progress
detail.Stage = trackedSnapshot.Stage
detail.Message = trackedSnapshot.Message
if detail.CreatedAt == nil || detail.CreatedAt.IsZero() {
detail.CreatedAt = trackedSnapshot.CreatedAt
}
detail.UpdatedAt = trackedSnapshot.UpdatedAt
})
r.appendActivity(JobActivity{
JobID: job.JobId,
JobType: job.JobType,
Source: "admin_scheduler",
Message: "job queued for execution",
Stage: "queued",
OccurredAt: timeToPtr(now),
})
}
func (r *Plugin) trackExecutionCompletion(completed *plugin_pb.JobCompleted) *TrackedJob {
if completed == nil || strings.TrimSpace(completed.JobId) == "" {
return nil
}
now := time.Now().UTC()
if completed.CompletedAt != nil {
now = completed.CompletedAt.AsTime().UTC()
}
r.jobsMu.Lock()
tracked := r.jobs[completed.JobId]
if tracked == nil {
tracked = &TrackedJob{
JobID: completed.JobId,
CreatedAt: timeToPtr(now),
}
r.jobs[completed.JobId] = tracked
}
if completed.JobType != "" {
tracked.JobType = completed.JobType
}
if completed.RequestId != "" {
tracked.RequestID = completed.RequestId
}
if completed.Success {
tracked.State = strings.ToLower(plugin_pb.JobState_JOB_STATE_SUCCEEDED.String())
tracked.Progress = 100
tracked.Stage = "completed"
if completed.Result != nil {
tracked.ResultSummary = completed.Result.Summary
}
tracked.Message = tracked.ResultSummary
if tracked.Message == "" {
tracked.Message = "completed"
}
tracked.ErrorMessage = ""
} else {
tracked.State = strings.ToLower(plugin_pb.JobState_JOB_STATE_FAILED.String())
tracked.Stage = "failed"
tracked.ErrorMessage = completed.ErrorMessage
tracked.Message = completed.ErrorMessage
}
tracked.UpdatedAt = timeToPtr(now)
tracked.CompletedAt = timeToPtr(now)
r.pruneTrackedJobsLocked()
clone := cloneTrackedJob(*tracked)
r.dirtyJobs = true
r.jobsMu.Unlock()
r.persistJobDetailSnapshot(completed.JobId, func(detail *TrackedJob) {
detail.JobID = completed.JobId
if completed.JobType != "" {
detail.JobType = completed.JobType
}
if completed.RequestId != "" {
detail.RequestID = completed.RequestId
}
detail.State = clone.State
detail.Progress = clone.Progress
detail.Stage = clone.Stage
detail.Message = clone.Message
detail.ErrorMessage = clone.ErrorMessage
detail.ResultSummary = clone.ResultSummary
if completed.Success && completed.Result != nil {
detail.ResultOutputValues = configValueMapToPlain(completed.Result.OutputValues)
} else {
detail.ResultOutputValues = nil
}
if detail.CreatedAt == nil || detail.CreatedAt.IsZero() {
detail.CreatedAt = clone.CreatedAt
}
if detail.UpdatedAt == nil || detail.UpdatedAt.IsZero() {
detail.UpdatedAt = clone.UpdatedAt
}
if detail.CompletedAt == nil || detail.CompletedAt.IsZero() {
detail.CompletedAt = clone.CompletedAt
}
})
r.appendActivity(JobActivity{
JobID: completed.JobId,
JobType: completed.JobType,
RequestID: completed.RequestId,
WorkerID: clone.WorkerID,
Source: "worker_completion",
Message: clone.Message,
Stage: clone.Stage,
OccurredAt: timeToPtr(now),
})
return &clone
}
func (r *Plugin) trackWorkerActivities(jobType, jobID, requestID, workerID string, events []*plugin_pb.ActivityEvent) {
if len(events) == 0 {
return
}
for _, event := range events {
if event == nil {
continue
}
timestamp := time.Now().UTC()
if event.CreatedAt != nil {
timestamp = event.CreatedAt.AsTime().UTC()
}
r.appendActivity(JobActivity{
JobID: jobID,
JobType: jobType,
RequestID: requestID,
WorkerID: workerID,
Source: strings.ToLower(event.Source.String()),
Message: event.Message,
Stage: event.Stage,
Details: configValueMapToPlain(event.Details),
OccurredAt: timeToPtr(timestamp),
})
}
}
func (r *Plugin) appendActivity(activity JobActivity) {
if activity.OccurredAt == nil || activity.OccurredAt.IsZero() {
activity.OccurredAt = timeToPtr(time.Now().UTC())
}
r.activitiesMu.Lock()
r.activities = append(r.activities, activity)
if len(r.activities) > maxActivityRecords {
r.activities = r.activities[len(r.activities)-maxActivityRecords:]
}
r.dirtyActivities = true
r.activitiesMu.Unlock()
}
func (r *Plugin) pruneTrackedJobsLocked() {
if len(r.jobs) <= maxTrackedJobsTotal {
return
}
type sortableJob struct {
jobID string
updatedAt time.Time
}
terminalJobs := make([]sortableJob, 0)
for jobID, job := range r.jobs {
if job.State == StateSucceeded ||
job.State == StateFailed ||
job.State == StateCanceled {
updAt := time.Time{}
if job.UpdatedAt != nil {
updAt = *job.UpdatedAt
}
terminalJobs = append(terminalJobs, sortableJob{jobID, updAt})
}
}
if len(terminalJobs) == 0 {
return
}
sort.Slice(terminalJobs, func(i, j int) bool {
return terminalJobs[i].updatedAt.Before(terminalJobs[j].updatedAt)
})
toDelete := len(r.jobs) - maxTrackedJobsTotal
if toDelete <= 0 {
return
}
if toDelete > len(terminalJobs) {
toDelete = len(terminalJobs)
}
for i := 0; i < toDelete; i++ {
delete(r.jobs, terminalJobs[i].jobID)
}
}
func configValueMapToPlain(values map[string]*plugin_pb.ConfigValue) map[string]interface{} {
if len(values) == 0 {
return nil
}
payload, err := protojson.MarshalOptions{UseProtoNames: true}.Marshal(&plugin_pb.ValueMap{Fields: values})
if err != nil {
return nil
}
decoded := map[string]interface{}{}
if err := json.Unmarshal(payload, &decoded); err != nil {
return nil
}
fields, ok := decoded["fields"].(map[string]interface{})
if !ok {
return nil
}
return fields
}
func (r *Plugin) persistTrackedJobsSnapshot() {
r.jobsMu.Lock()
r.dirtyJobs = false
jobs := make([]TrackedJob, 0, len(r.jobs))
for _, job := range r.jobs {
if job == nil || strings.TrimSpace(job.JobID) == "" {
continue
}
clone := cloneTrackedJob(*job)
stripTrackedJobDetailFields(&clone)
jobs = append(jobs, clone)
}
r.jobsMu.Unlock()
if len(jobs) == 0 {
return
}
sort.Slice(jobs, func(i, j int) bool {
ti := time.Time{}
if jobs[i].UpdatedAt != nil {
ti = *jobs[i].UpdatedAt
}
tj := time.Time{}
if jobs[j].UpdatedAt != nil {
tj = *jobs[j].UpdatedAt
}
if !ti.Equal(tj) {
return ti.After(tj)
}
return jobs[i].JobID < jobs[j].JobID
})
if len(jobs) > maxTrackedJobsTotal {
jobs = jobs[:maxTrackedJobsTotal]
}
if err := r.store.SaveTrackedJobs(jobs); err != nil {
glog.Warningf("Plugin failed to persist tracked jobs: %v", err)
}
}
func (r *Plugin) persistJobDetailSnapshot(jobID string, apply func(detail *TrackedJob)) {
normalizedJobID, _ := sanitizeJobID(jobID)
if normalizedJobID == "" {
return
}
r.jobDetailsMu.Lock()
defer r.jobDetailsMu.Unlock()
detail, err := r.store.LoadJobDetail(normalizedJobID)
if err != nil {
glog.Warningf("Plugin failed to load job detail snapshot for %s: %v", normalizedJobID, err)
return
}
if detail == nil {
detail = &TrackedJob{
JobID: normalizedJobID,
}
}
if apply != nil {
apply(detail)
}
if err := r.store.SaveJobDetail(*detail); err != nil {
glog.Warningf("Plugin failed to persist job detail snapshot for %s: %v", normalizedJobID, err)
}
}
func (r *Plugin) persistActivitiesSnapshot() {
r.activitiesMu.Lock()
r.dirtyActivities = false
activities := append([]JobActivity(nil), r.activities...)
r.activitiesMu.Unlock()
if len(activities) == 0 {
return
}
if len(activities) > maxActivityRecords {
activities = activities[len(activities)-maxActivityRecords:]
}
if err := r.store.SaveActivities(activities); err != nil {
glog.Warningf("Plugin failed to persist activities: %v", err)
}
}
func (r *Plugin) persistenceLoop() {
defer r.wg.Done()
for {
select {
case <-r.shutdownCh:
r.persistTrackedJobsSnapshot()
r.persistActivitiesSnapshot()
return
case <-r.persistTicker.C:
r.jobsMu.RLock()
needsJobsFlush := r.dirtyJobs
r.jobsMu.RUnlock()
if needsJobsFlush {
r.persistTrackedJobsSnapshot()
}
r.activitiesMu.RLock()
needsActivitiesFlush := r.dirtyActivities
r.activitiesMu.RUnlock()
if needsActivitiesFlush {
r.persistActivitiesSnapshot()
}
}
}
}