add admin script worker (#8491)
* admin: add plugin lock coordination * shell: allow bypassing lock checks * plugin worker: add admin script handler * mini: include admin_script in plugin defaults * admin script UI: drop name and enlarge text * admin script: add default script * admin_script: make run interval configurable * plugin: gate other jobs during admin_script runs * plugin: use last completed admin_script run * admin: backfill plugin config defaults * templ Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * comparable to default version Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * default to run Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * format Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * shell: respect pre-set noLock for fix.replication * shell: add force no-lock mode for admin scripts * volume balance worker already exists Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * admin: expose scheduler status JSON * shell: add sleep command * shell: restrict sleep syntax * Revert "shell: respect pre-set noLock for fix.replication" This reverts commit 2b14e8b82602a740d3a473c085e3b3a14f1ddbb3. * templ Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * fix import Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * less logs Co-Authored-By: Copilot <223556219+Copilot@users.noreply.github.com> * Reduce master client logs on canceled contexts * Update mini default job type count --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
148
weed/admin/dash/admin_lock_manager.go
Normal file
148
weed/admin/dash/admin_lock_manager.go
Normal file
@@ -0,0 +1,148 @@
|
||||
package dash
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
||||
"github.com/seaweedfs/seaweedfs/weed/wdclient/exclusive_locks"
|
||||
)
|
||||
|
||||
const (
|
||||
adminLockName = "shell"
|
||||
adminLockClientName = "admin-plugin"
|
||||
)
|
||||
|
||||
// AdminLockManager coordinates exclusive admin locks with reference counting.
|
||||
// It is safe for concurrent use.
|
||||
type AdminLockManager struct {
|
||||
locker *exclusive_locks.ExclusiveLocker
|
||||
clientName string
|
||||
|
||||
mu sync.Mutex
|
||||
cond *sync.Cond
|
||||
acquiring bool
|
||||
holdCount int
|
||||
|
||||
lastAcquiredAt time.Time
|
||||
lastReleasedAt time.Time
|
||||
waitingSince time.Time
|
||||
waitingReason string
|
||||
currentReason string
|
||||
}
|
||||
|
||||
func NewAdminLockManager(masterClient *wdclient.MasterClient, clientName string) *AdminLockManager {
|
||||
if masterClient == nil {
|
||||
return nil
|
||||
}
|
||||
if clientName == "" {
|
||||
clientName = adminLockClientName
|
||||
}
|
||||
manager := &AdminLockManager{
|
||||
locker: exclusive_locks.NewExclusiveLocker(masterClient, adminLockName),
|
||||
clientName: clientName,
|
||||
}
|
||||
manager.cond = sync.NewCond(&manager.mu)
|
||||
return manager
|
||||
}
|
||||
|
||||
func (m *AdminLockManager) Acquire(reason string) (func(), error) {
|
||||
if m == nil || m.locker == nil {
|
||||
return func() {}, nil
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
if reason != "" {
|
||||
m.locker.SetMessage(reason)
|
||||
m.currentReason = reason
|
||||
}
|
||||
for m.acquiring {
|
||||
m.cond.Wait()
|
||||
}
|
||||
if m.holdCount == 0 {
|
||||
m.acquiring = true
|
||||
m.waitingSince = time.Now().UTC()
|
||||
m.waitingReason = reason
|
||||
m.mu.Unlock()
|
||||
m.locker.RequestLock(m.clientName)
|
||||
m.mu.Lock()
|
||||
m.acquiring = false
|
||||
m.holdCount = 1
|
||||
m.lastAcquiredAt = time.Now().UTC()
|
||||
m.waitingSince = time.Time{}
|
||||
m.waitingReason = ""
|
||||
m.cond.Broadcast()
|
||||
m.mu.Unlock()
|
||||
return m.Release, nil
|
||||
}
|
||||
m.holdCount++
|
||||
if reason != "" {
|
||||
m.currentReason = reason
|
||||
}
|
||||
m.mu.Unlock()
|
||||
return m.Release, nil
|
||||
}
|
||||
|
||||
func (m *AdminLockManager) Release() {
|
||||
if m == nil || m.locker == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
if m.holdCount <= 0 {
|
||||
m.mu.Unlock()
|
||||
return
|
||||
}
|
||||
m.holdCount--
|
||||
shouldRelease := m.holdCount == 0
|
||||
m.mu.Unlock()
|
||||
|
||||
if shouldRelease {
|
||||
m.mu.Lock()
|
||||
m.lastReleasedAt = time.Now().UTC()
|
||||
m.currentReason = ""
|
||||
m.mu.Unlock()
|
||||
m.locker.ReleaseLock()
|
||||
}
|
||||
}
|
||||
|
||||
type LockStatus struct {
|
||||
Held bool `json:"held"`
|
||||
HoldCount int `json:"hold_count"`
|
||||
Acquiring bool `json:"acquiring"`
|
||||
Message string `json:"message,omitempty"`
|
||||
WaitingReason string `json:"waiting_reason,omitempty"`
|
||||
LastAcquiredAt *time.Time `json:"last_acquired_at,omitempty"`
|
||||
LastReleasedAt *time.Time `json:"last_released_at,omitempty"`
|
||||
WaitingSince *time.Time `json:"waiting_since,omitempty"`
|
||||
}
|
||||
|
||||
func (m *AdminLockManager) Status() LockStatus {
|
||||
if m == nil {
|
||||
return LockStatus{}
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
status := LockStatus{
|
||||
Held: m.holdCount > 0,
|
||||
HoldCount: m.holdCount,
|
||||
Acquiring: m.acquiring,
|
||||
Message: m.currentReason,
|
||||
WaitingReason: m.waitingReason,
|
||||
}
|
||||
if !m.lastAcquiredAt.IsZero() {
|
||||
at := m.lastAcquiredAt
|
||||
status.LastAcquiredAt = &at
|
||||
}
|
||||
if !m.lastReleasedAt.IsZero() {
|
||||
at := m.lastReleasedAt
|
||||
status.LastReleasedAt = &at
|
||||
}
|
||||
if !m.waitingSince.IsZero() {
|
||||
at := m.waitingSince
|
||||
status.WaitingSince = &at
|
||||
}
|
||||
return status
|
||||
}
|
||||
@@ -98,6 +98,7 @@ type AdminServer struct {
|
||||
// Maintenance system
|
||||
maintenanceManager *maintenance.MaintenanceManager
|
||||
plugin *adminplugin.Plugin
|
||||
pluginLock *AdminLockManager
|
||||
expireJobHandler func(jobID string, reason string) (*adminplugin.TrackedJob, bool, error)
|
||||
|
||||
// Topic retention purger
|
||||
@@ -135,6 +136,8 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string,
|
||||
ctx := context.Background()
|
||||
go masterClient.KeepConnectedToMaster(ctx)
|
||||
|
||||
lockManager := NewAdminLockManager(masterClient, adminLockClientName)
|
||||
|
||||
server := &AdminServer{
|
||||
masterClient: masterClient,
|
||||
templateFS: templateFS,
|
||||
@@ -146,6 +149,7 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string,
|
||||
collectionStatsCacheThreshold: defaultStatsCacheTimeout,
|
||||
s3TablesManager: newS3TablesManager(),
|
||||
icebergPort: icebergPort,
|
||||
pluginLock: lockManager,
|
||||
}
|
||||
|
||||
// Initialize topic retention purger
|
||||
@@ -229,6 +233,7 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string,
|
||||
ClusterContextProvider: func(_ context.Context) (*plugin_pb.ClusterContext, error) {
|
||||
return server.buildDefaultPluginClusterContext(), nil
|
||||
},
|
||||
LockManager: lockManager,
|
||||
})
|
||||
if err != nil && dataDir != "" {
|
||||
glog.Warningf("Failed to initialize plugin with dataDir=%q: %v. Falling back to in-memory plugin state.", dataDir, err)
|
||||
@@ -237,6 +242,7 @@ func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string,
|
||||
ClusterContextProvider: func(_ context.Context) (*plugin_pb.ClusterContext, error) {
|
||||
return server.buildDefaultPluginClusterContext(), nil
|
||||
},
|
||||
LockManager: lockManager,
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
@@ -890,6 +896,13 @@ func (s *AdminServer) GetPlugin() *adminplugin.Plugin {
|
||||
return s.plugin
|
||||
}
|
||||
|
||||
func (s *AdminServer) acquirePluginLock(reason string) (func(), error) {
|
||||
if s == nil || s.pluginLock == nil {
|
||||
return func() {}, nil
|
||||
}
|
||||
return s.pluginLock.Acquire(reason)
|
||||
}
|
||||
|
||||
// RequestPluginJobTypeDescriptor asks one worker for job type schema and returns the descriptor.
|
||||
func (s *AdminServer) RequestPluginJobTypeDescriptor(ctx context.Context, jobType string, forceRefresh bool) (*plugin_pb.JobTypeDescriptor, error) {
|
||||
if s.plugin == nil {
|
||||
@@ -932,6 +945,13 @@ func (s *AdminServer) RunPluginDetection(
|
||||
if s.plugin == nil {
|
||||
return nil, fmt.Errorf("plugin is not enabled")
|
||||
}
|
||||
releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin detection %s", jobType))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if releaseLock != nil {
|
||||
defer releaseLock()
|
||||
}
|
||||
return s.plugin.RunDetection(ctx, jobType, clusterContext, maxResults)
|
||||
}
|
||||
|
||||
@@ -957,6 +977,13 @@ func (s *AdminServer) RunPluginDetectionWithReport(
|
||||
if s.plugin == nil {
|
||||
return nil, fmt.Errorf("plugin is not enabled")
|
||||
}
|
||||
releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin detection %s", jobType))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if releaseLock != nil {
|
||||
defer releaseLock()
|
||||
}
|
||||
return s.plugin.RunDetectionWithReport(ctx, jobType, clusterContext, maxResults)
|
||||
}
|
||||
|
||||
@@ -970,6 +997,17 @@ func (s *AdminServer) ExecutePluginJob(
|
||||
if s.plugin == nil {
|
||||
return nil, fmt.Errorf("plugin is not enabled")
|
||||
}
|
||||
jobType := ""
|
||||
if job != nil {
|
||||
jobType = strings.TrimSpace(job.JobType)
|
||||
}
|
||||
releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin execution %s", jobType))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if releaseLock != nil {
|
||||
defer releaseLock()
|
||||
}
|
||||
return s.plugin.ExecuteJob(ctx, job, clusterContext, attempt)
|
||||
}
|
||||
|
||||
|
||||
@@ -214,6 +214,27 @@ func (s *AdminServer) GetPluginSchedulerStatesAPI(w http.ResponseWriter, r *http
|
||||
writeJSON(w, http.StatusOK, states)
|
||||
}
|
||||
|
||||
// GetPluginSchedulerStatusAPI returns scheduler status including in-process jobs and lock state.
|
||||
func (s *AdminServer) GetPluginSchedulerStatusAPI(w http.ResponseWriter, r *http.Request) {
|
||||
pluginSvc := s.GetPlugin()
|
||||
if pluginSvc == nil {
|
||||
writeJSON(w, http.StatusOK, map[string]interface{}{
|
||||
"enabled": false,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
response := map[string]interface{}{
|
||||
"enabled": true,
|
||||
"scheduler": pluginSvc.GetSchedulerStatus(),
|
||||
}
|
||||
if s.pluginLock != nil {
|
||||
response["lock"] = s.pluginLock.Status()
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, response)
|
||||
}
|
||||
|
||||
// RequestPluginJobTypeSchemaAPI asks a worker for one job type schema.
|
||||
func (s *AdminServer) RequestPluginJobTypeSchemaAPI(w http.ResponseWriter, r *http.Request) {
|
||||
jobType := strings.TrimSpace(mux.Vars(r)["jobType"])
|
||||
@@ -277,6 +298,9 @@ func (s *AdminServer) GetPluginJobTypeConfigAPI(w http.ResponseWriter, r *http.R
|
||||
AdminRuntime: &plugin_pb.AdminRuntimeConfig{},
|
||||
}
|
||||
}
|
||||
if descriptor, err := s.LoadPluginJobTypeDescriptor(jobType); err == nil && descriptor != nil {
|
||||
applyDescriptorDefaultsToPersistedConfig(config, descriptor)
|
||||
}
|
||||
|
||||
renderProtoJSON(w, http.StatusOK, config)
|
||||
}
|
||||
@@ -455,6 +479,14 @@ func (s *AdminServer) RunPluginJobTypeAPI(w http.ResponseWriter, r *http.Request
|
||||
writeJSONError(w, http.StatusBadRequest, "jobType is required")
|
||||
return
|
||||
}
|
||||
releaseLock, err := s.acquirePluginLock(fmt.Sprintf("plugin detect+execute %s", jobType))
|
||||
if err != nil {
|
||||
writeJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
if releaseLock != nil {
|
||||
defer releaseLock()
|
||||
}
|
||||
|
||||
var req struct {
|
||||
ClusterContext json.RawMessage `json:"cluster_context"`
|
||||
@@ -771,6 +803,90 @@ func buildJobSpecFromProposal(jobType string, proposal *plugin_pb.JobProposal, i
|
||||
return jobSpec
|
||||
}
|
||||
|
||||
func applyDescriptorDefaultsToPersistedConfig(
|
||||
config *plugin_pb.PersistedJobTypeConfig,
|
||||
descriptor *plugin_pb.JobTypeDescriptor,
|
||||
) {
|
||||
if config == nil || descriptor == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if config.AdminConfigValues == nil {
|
||||
config.AdminConfigValues = map[string]*plugin_pb.ConfigValue{}
|
||||
}
|
||||
if config.WorkerConfigValues == nil {
|
||||
config.WorkerConfigValues = map[string]*plugin_pb.ConfigValue{}
|
||||
}
|
||||
if config.AdminRuntime == nil {
|
||||
config.AdminRuntime = &plugin_pb.AdminRuntimeConfig{}
|
||||
}
|
||||
|
||||
if descriptor.AdminConfigForm != nil {
|
||||
for key, value := range descriptor.AdminConfigForm.DefaultValues {
|
||||
if value == nil {
|
||||
continue
|
||||
}
|
||||
current := config.AdminConfigValues[key]
|
||||
if current == nil {
|
||||
config.AdminConfigValues[key] = proto.Clone(value).(*plugin_pb.ConfigValue)
|
||||
continue
|
||||
}
|
||||
if strings.EqualFold(descriptor.JobType, "admin_script") &&
|
||||
key == "script" &&
|
||||
isBlankStringConfigValue(current) {
|
||||
config.AdminConfigValues[key] = proto.Clone(value).(*plugin_pb.ConfigValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
if descriptor.WorkerConfigForm != nil {
|
||||
for key, value := range descriptor.WorkerConfigForm.DefaultValues {
|
||||
if value == nil {
|
||||
continue
|
||||
}
|
||||
if config.WorkerConfigValues[key] != nil {
|
||||
continue
|
||||
}
|
||||
config.WorkerConfigValues[key] = proto.Clone(value).(*plugin_pb.ConfigValue)
|
||||
}
|
||||
}
|
||||
if descriptor.AdminRuntimeDefaults != nil {
|
||||
runtime := config.AdminRuntime
|
||||
defaults := descriptor.AdminRuntimeDefaults
|
||||
if runtime.DetectionIntervalSeconds <= 0 {
|
||||
runtime.DetectionIntervalSeconds = defaults.DetectionIntervalSeconds
|
||||
}
|
||||
if runtime.DetectionTimeoutSeconds <= 0 {
|
||||
runtime.DetectionTimeoutSeconds = defaults.DetectionTimeoutSeconds
|
||||
}
|
||||
if runtime.MaxJobsPerDetection <= 0 {
|
||||
runtime.MaxJobsPerDetection = defaults.MaxJobsPerDetection
|
||||
}
|
||||
if runtime.GlobalExecutionConcurrency <= 0 {
|
||||
runtime.GlobalExecutionConcurrency = defaults.GlobalExecutionConcurrency
|
||||
}
|
||||
if runtime.PerWorkerExecutionConcurrency <= 0 {
|
||||
runtime.PerWorkerExecutionConcurrency = defaults.PerWorkerExecutionConcurrency
|
||||
}
|
||||
if runtime.RetryBackoffSeconds <= 0 {
|
||||
runtime.RetryBackoffSeconds = defaults.RetryBackoffSeconds
|
||||
}
|
||||
if runtime.RetryLimit < 0 {
|
||||
runtime.RetryLimit = defaults.RetryLimit
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isBlankStringConfigValue(value *plugin_pb.ConfigValue) bool {
|
||||
if value == nil {
|
||||
return true
|
||||
}
|
||||
kind, ok := value.Kind.(*plugin_pb.ConfigValue_StringValue)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return strings.TrimSpace(kind.StringValue) == ""
|
||||
}
|
||||
|
||||
func parsePositiveInt(raw string, defaultValue int) int {
|
||||
value, err := strconv.Atoi(strings.TrimSpace(raw))
|
||||
if err != nil || value <= 0 {
|
||||
|
||||
@@ -140,3 +140,83 @@ func TestBuildJobSpecFromProposalDoesNotReuseProposalID(t *testing.T) {
|
||||
t.Fatalf("dedupe key must be preserved: got=%s want=%s", jobA.DedupeKey, proposal.DedupeKey)
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyDescriptorDefaultsToPersistedConfigBackfillsAdminDefaults(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
config := &plugin_pb.PersistedJobTypeConfig{
|
||||
JobType: "admin_script",
|
||||
AdminConfigValues: map[string]*plugin_pb.ConfigValue{},
|
||||
WorkerConfigValues: map[string]*plugin_pb.ConfigValue{},
|
||||
AdminRuntime: &plugin_pb.AdminRuntimeConfig{},
|
||||
}
|
||||
descriptor := &plugin_pb.JobTypeDescriptor{
|
||||
JobType: "admin_script",
|
||||
AdminConfigForm: &plugin_pb.ConfigForm{
|
||||
DefaultValues: map[string]*plugin_pb.ConfigValue{
|
||||
"script": {
|
||||
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "volume.balance -apply"},
|
||||
},
|
||||
"run_interval_minutes": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 17},
|
||||
},
|
||||
},
|
||||
},
|
||||
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
|
||||
DetectionIntervalSeconds: 60,
|
||||
DetectionTimeoutSeconds: 300,
|
||||
},
|
||||
}
|
||||
|
||||
applyDescriptorDefaultsToPersistedConfig(config, descriptor)
|
||||
|
||||
script := config.AdminConfigValues["script"]
|
||||
if script == nil {
|
||||
t.Fatalf("expected script default to be backfilled")
|
||||
}
|
||||
scriptKind, ok := script.Kind.(*plugin_pb.ConfigValue_StringValue)
|
||||
if !ok || scriptKind.StringValue == "" {
|
||||
t.Fatalf("expected non-empty script default, got=%+v", script)
|
||||
}
|
||||
if config.AdminRuntime.DetectionIntervalSeconds != 60 {
|
||||
t.Fatalf("expected runtime detection interval default to be backfilled")
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyDescriptorDefaultsToPersistedConfigReplacesBlankAdminScript(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
config := &plugin_pb.PersistedJobTypeConfig{
|
||||
JobType: "admin_script",
|
||||
AdminConfigValues: map[string]*plugin_pb.ConfigValue{
|
||||
"script": {
|
||||
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: " "},
|
||||
},
|
||||
},
|
||||
AdminRuntime: &plugin_pb.AdminRuntimeConfig{},
|
||||
}
|
||||
descriptor := &plugin_pb.JobTypeDescriptor{
|
||||
JobType: "admin_script",
|
||||
AdminConfigForm: &plugin_pb.ConfigForm{
|
||||
DefaultValues: map[string]*plugin_pb.ConfigValue{
|
||||
"script": {
|
||||
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "volume.fix.replication -apply"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
applyDescriptorDefaultsToPersistedConfig(config, descriptor)
|
||||
|
||||
script := config.AdminConfigValues["script"]
|
||||
if script == nil {
|
||||
t.Fatalf("expected script config value")
|
||||
}
|
||||
scriptKind, ok := script.Kind.(*plugin_pb.ConfigValue_StringValue)
|
||||
if !ok {
|
||||
t.Fatalf("expected string script config value, got=%T", script.Kind)
|
||||
}
|
||||
if scriptKind.StringValue != "volume.fix.replication -apply" {
|
||||
t.Fatalf("expected blank script to be replaced by default, got=%q", scriptKind.StringValue)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,6 +234,7 @@ func (h *AdminHandlers) registerAPIRoutes(api *mux.Router, enforceWrite bool) {
|
||||
pluginApi.HandleFunc("/jobs/{jobId}/detail", h.adminServer.GetPluginJobDetailAPI).Methods(http.MethodGet)
|
||||
pluginApi.HandleFunc("/activities", h.adminServer.GetPluginActivitiesAPI).Methods(http.MethodGet)
|
||||
pluginApi.HandleFunc("/scheduler-states", h.adminServer.GetPluginSchedulerStatesAPI).Methods(http.MethodGet)
|
||||
pluginApi.HandleFunc("/scheduler-status", h.adminServer.GetPluginSchedulerStatusAPI).Methods(http.MethodGet)
|
||||
pluginApi.HandleFunc("/job-types/{jobType}/descriptor", h.adminServer.GetPluginJobTypeDescriptorAPI).Methods(http.MethodGet)
|
||||
pluginApi.HandleFunc("/job-types/{jobType}/schema", h.adminServer.RequestPluginJobTypeSchemaAPI).Methods(http.MethodPost)
|
||||
pluginApi.HandleFunc("/job-types/{jobType}/config", h.adminServer.GetPluginJobTypeConfigAPI).Methods(http.MethodGet)
|
||||
|
||||
7
weed/admin/plugin/lock_manager.go
Normal file
7
weed/admin/plugin/lock_manager.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package plugin
|
||||
|
||||
// LockManager provides a shared exclusive lock for admin-managed detection/execution.
|
||||
// Acquire returns a release function that must be called when the protected work finishes.
|
||||
type LockManager interface {
|
||||
Acquire(reason string) (release func(), err error)
|
||||
}
|
||||
@@ -24,6 +24,7 @@ const (
|
||||
defaultHeartbeatInterval = 30
|
||||
defaultReconnectDelay = 5
|
||||
defaultPendingSchemaBuffer = 1
|
||||
adminScriptJobType = "admin_script"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
@@ -32,6 +33,7 @@ type Options struct {
|
||||
SendTimeout time.Duration
|
||||
SchedulerTick time.Duration
|
||||
ClusterContextProvider func(context.Context) (*plugin_pb.ClusterContext, error)
|
||||
LockManager LockManager
|
||||
}
|
||||
|
||||
// JobTypeInfo contains metadata about a plugin job type.
|
||||
@@ -52,6 +54,7 @@ type Plugin struct {
|
||||
|
||||
schedulerTick time.Duration
|
||||
clusterContextProvider func(context.Context) (*plugin_pb.ClusterContext, error)
|
||||
lockManager LockManager
|
||||
|
||||
schedulerMu sync.Mutex
|
||||
nextDetectionAt map[string]time.Time
|
||||
@@ -62,6 +65,9 @@ type Plugin struct {
|
||||
|
||||
schedulerExecMu sync.Mutex
|
||||
schedulerExecReservations map[string]int
|
||||
adminScriptRunMu sync.RWMutex
|
||||
schedulerDetectionMu sync.Mutex
|
||||
schedulerDetection map[string]*schedulerDetectionInfo
|
||||
|
||||
dedupeMu sync.Mutex
|
||||
recentDedupeByType map[string]map[string]time.Time
|
||||
@@ -148,6 +154,7 @@ func New(options Options) (*Plugin, error) {
|
||||
sendTimeout: sendTimeout,
|
||||
schedulerTick: schedulerTick,
|
||||
clusterContextProvider: options.ClusterContextProvider,
|
||||
lockManager: options.LockManager,
|
||||
sessions: make(map[string]*streamSession),
|
||||
pendingSchema: make(map[string]chan *plugin_pb.ConfigSchemaResponse),
|
||||
pendingDetection: make(map[string]*pendingDetectionState),
|
||||
@@ -156,6 +163,7 @@ func New(options Options) (*Plugin, error) {
|
||||
detectionInFlight: make(map[string]bool),
|
||||
detectorLeases: make(map[string]string),
|
||||
schedulerExecReservations: make(map[string]int),
|
||||
schedulerDetection: make(map[string]*schedulerDetectionInfo),
|
||||
recentDedupeByType: make(map[string]map[string]time.Time),
|
||||
jobs: make(map[string]*TrackedJob),
|
||||
activities: make([]JobActivity, 0, 256),
|
||||
@@ -382,6 +390,13 @@ func (r *Plugin) BaseDir() string {
|
||||
return r.store.BaseDir()
|
||||
}
|
||||
|
||||
func (r *Plugin) acquireAdminLock(reason string) (func(), error) {
|
||||
if r == nil || r.lockManager == nil {
|
||||
return func() {}, nil
|
||||
}
|
||||
return r.lockManager.Acquire(reason)
|
||||
}
|
||||
|
||||
// RunDetectionWithReport requests one detector worker and returns proposals with request metadata.
|
||||
func (r *Plugin) RunDetectionWithReport(
|
||||
ctx context.Context,
|
||||
@@ -389,6 +404,9 @@ func (r *Plugin) RunDetectionWithReport(
|
||||
clusterContext *plugin_pb.ClusterContext,
|
||||
maxResults int32,
|
||||
) (*DetectionReport, error) {
|
||||
releaseGate := r.acquireDetectionExecutionGate(jobType, false)
|
||||
defer releaseGate()
|
||||
|
||||
detector, err := r.pickDetector(jobType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -403,7 +421,10 @@ func (r *Plugin) RunDetectionWithReport(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lastSuccessfulRun := r.loadLastSuccessfulRun(jobType)
|
||||
lastCompletedRun := r.loadLastSuccessfulRun(jobType)
|
||||
if strings.EqualFold(strings.TrimSpace(jobType), adminScriptJobType) {
|
||||
lastCompletedRun = r.loadLastCompletedRun(jobType)
|
||||
}
|
||||
|
||||
state := &pendingDetectionState{
|
||||
complete: make(chan *plugin_pb.DetectionComplete, 1),
|
||||
@@ -444,7 +465,7 @@ func (r *Plugin) RunDetectionWithReport(
|
||||
AdminConfigValues: adminConfigValues,
|
||||
WorkerConfigValues: workerConfigValues,
|
||||
ClusterContext: clusterContext,
|
||||
LastSuccessfulRun: lastSuccessfulRun,
|
||||
LastSuccessfulRun: lastCompletedRun,
|
||||
MaxResults: maxResults,
|
||||
},
|
||||
},
|
||||
@@ -531,11 +552,14 @@ func (r *Plugin) ExecuteJob(
|
||||
if job == nil {
|
||||
return nil, fmt.Errorf("job is nil")
|
||||
}
|
||||
if strings.TrimSpace(job.JobType) == "" {
|
||||
jobType := strings.TrimSpace(job.JobType)
|
||||
if jobType == "" {
|
||||
return nil, fmt.Errorf("job_type is required")
|
||||
}
|
||||
releaseGate := r.acquireDetectionExecutionGate(jobType, true)
|
||||
defer releaseGate()
|
||||
|
||||
executor, err := r.registry.PickExecutor(job.JobType)
|
||||
executor, err := r.registry.PickExecutor(jobType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -543,6 +567,23 @@ func (r *Plugin) ExecuteJob(
|
||||
return r.executeJobWithExecutor(ctx, executor, job, clusterContext, attempt)
|
||||
}
|
||||
|
||||
func (r *Plugin) acquireDetectionExecutionGate(jobType string, execution bool) func() {
|
||||
normalizedJobType := strings.ToLower(strings.TrimSpace(jobType))
|
||||
if execution && normalizedJobType == adminScriptJobType {
|
||||
r.adminScriptRunMu.Lock()
|
||||
return func() {
|
||||
r.adminScriptRunMu.Unlock()
|
||||
}
|
||||
}
|
||||
if normalizedJobType != adminScriptJobType {
|
||||
r.adminScriptRunMu.RLock()
|
||||
return func() {
|
||||
r.adminScriptRunMu.RUnlock()
|
||||
}
|
||||
}
|
||||
return func() {}
|
||||
}
|
||||
|
||||
func (r *Plugin) executeJobWithExecutor(
|
||||
ctx context.Context,
|
||||
executor *WorkerSession,
|
||||
@@ -1291,6 +1332,41 @@ func (r *Plugin) loadLastSuccessfulRun(jobType string) *timestamppb.Timestamp {
|
||||
return timestamppb.New(latest.UTC())
|
||||
}
|
||||
|
||||
func (r *Plugin) loadLastCompletedRun(jobType string) *timestamppb.Timestamp {
|
||||
history, err := r.store.LoadRunHistory(jobType)
|
||||
if err != nil {
|
||||
glog.Warningf("Plugin failed to load run history for %s: %v", jobType, err)
|
||||
return nil
|
||||
}
|
||||
if history == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var latest time.Time
|
||||
for i := range history.SuccessfulRuns {
|
||||
completedAt := history.SuccessfulRuns[i].CompletedAt
|
||||
if completedAt == nil || completedAt.IsZero() {
|
||||
continue
|
||||
}
|
||||
if latest.IsZero() || completedAt.After(latest) {
|
||||
latest = *completedAt
|
||||
}
|
||||
}
|
||||
for i := range history.ErrorRuns {
|
||||
completedAt := history.ErrorRuns[i].CompletedAt
|
||||
if completedAt == nil || completedAt.IsZero() {
|
||||
continue
|
||||
}
|
||||
if latest.IsZero() || completedAt.After(latest) {
|
||||
latest = *completedAt
|
||||
}
|
||||
}
|
||||
if latest.IsZero() {
|
||||
return nil
|
||||
}
|
||||
return timestamppb.New(latest.UTC())
|
||||
}
|
||||
|
||||
func CloneConfigValueMap(in map[string]*plugin_pb.ConfigValue) map[string]*plugin_pb.ConfigValue {
|
||||
if len(in) == 0 {
|
||||
return map[string]*plugin_pb.ConfigValue{}
|
||||
|
||||
@@ -4,8 +4,10 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
func TestRunDetectionSendsCancelOnContextDone(t *testing.T) {
|
||||
@@ -110,3 +112,165 @@ func TestExecuteJobSendsCancelOnContextDone(t *testing.T) {
|
||||
t.Fatalf("expected context canceled error, got %v", runErr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdminScriptExecutionBlocksOtherDetection(t *testing.T) {
|
||||
pluginSvc, err := New(Options{})
|
||||
if err != nil {
|
||||
t.Fatalf("New plugin error: %v", err)
|
||||
}
|
||||
defer pluginSvc.Shutdown()
|
||||
|
||||
const adminWorkerID = "worker-admin-script"
|
||||
const otherWorkerID = "worker-vacuum"
|
||||
|
||||
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
|
||||
WorkerId: adminWorkerID,
|
||||
Capabilities: []*plugin_pb.JobTypeCapability{
|
||||
{JobType: "admin_script", CanExecute: true, MaxExecutionConcurrency: 1},
|
||||
},
|
||||
})
|
||||
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
|
||||
WorkerId: otherWorkerID,
|
||||
Capabilities: []*plugin_pb.JobTypeCapability{
|
||||
{JobType: "vacuum", CanDetect: true, MaxDetectionConcurrency: 1},
|
||||
},
|
||||
})
|
||||
adminSession := &streamSession{workerID: adminWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)}
|
||||
otherSession := &streamSession{workerID: otherWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)}
|
||||
pluginSvc.putSession(adminSession)
|
||||
pluginSvc.putSession(otherSession)
|
||||
|
||||
adminErrCh := make(chan error, 1)
|
||||
go func() {
|
||||
_, runErr := pluginSvc.ExecuteJob(context.Background(), &plugin_pb.JobSpec{
|
||||
JobId: "job-admin-script-1",
|
||||
JobType: "admin_script",
|
||||
}, &plugin_pb.ClusterContext{}, 1)
|
||||
adminErrCh <- runErr
|
||||
}()
|
||||
|
||||
adminExecMessage := <-adminSession.outgoing
|
||||
if adminExecMessage.GetExecuteJobRequest() == nil {
|
||||
t.Fatalf("expected admin_script execute request")
|
||||
}
|
||||
|
||||
detectErrCh := make(chan error, 1)
|
||||
go func() {
|
||||
_, runErr := pluginSvc.RunDetection(context.Background(), "vacuum", &plugin_pb.ClusterContext{}, 10)
|
||||
detectErrCh <- runErr
|
||||
}()
|
||||
|
||||
select {
|
||||
case unexpected := <-otherSession.outgoing:
|
||||
t.Fatalf("expected vacuum detection to wait while admin_script runs, got message: %+v", unexpected)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
|
||||
pluginSvc.handleJobCompleted(&plugin_pb.JobCompleted{
|
||||
RequestId: adminExecMessage.RequestId,
|
||||
JobId: "job-admin-script-1",
|
||||
JobType: "admin_script",
|
||||
Success: true,
|
||||
CompletedAt: timestamppb.Now(),
|
||||
})
|
||||
if runErr := <-adminErrCh; runErr != nil {
|
||||
t.Fatalf("admin_script ExecuteJob error: %v", runErr)
|
||||
}
|
||||
|
||||
detectMessage := <-otherSession.outgoing
|
||||
detectRequest := detectMessage.GetRunDetectionRequest()
|
||||
if detectRequest == nil {
|
||||
t.Fatalf("expected vacuum detection request after admin_script completion")
|
||||
}
|
||||
pluginSvc.handleDetectionComplete(otherWorkerID, &plugin_pb.DetectionComplete{
|
||||
RequestId: detectMessage.RequestId,
|
||||
JobType: "vacuum",
|
||||
Success: true,
|
||||
})
|
||||
if runErr := <-detectErrCh; runErr != nil {
|
||||
t.Fatalf("vacuum RunDetection error: %v", runErr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdminScriptExecutionBlocksOtherExecution(t *testing.T) {
|
||||
pluginSvc, err := New(Options{})
|
||||
if err != nil {
|
||||
t.Fatalf("New plugin error: %v", err)
|
||||
}
|
||||
defer pluginSvc.Shutdown()
|
||||
|
||||
const adminWorkerID = "worker-admin-script"
|
||||
const otherWorkerID = "worker-vacuum"
|
||||
|
||||
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
|
||||
WorkerId: adminWorkerID,
|
||||
Capabilities: []*plugin_pb.JobTypeCapability{
|
||||
{JobType: "admin_script", CanExecute: true, MaxExecutionConcurrency: 1},
|
||||
},
|
||||
})
|
||||
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
|
||||
WorkerId: otherWorkerID,
|
||||
Capabilities: []*plugin_pb.JobTypeCapability{
|
||||
{JobType: "vacuum", CanExecute: true, MaxExecutionConcurrency: 1},
|
||||
},
|
||||
})
|
||||
adminSession := &streamSession{workerID: adminWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)}
|
||||
otherSession := &streamSession{workerID: otherWorkerID, outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 8)}
|
||||
pluginSvc.putSession(adminSession)
|
||||
pluginSvc.putSession(otherSession)
|
||||
|
||||
adminErrCh := make(chan error, 1)
|
||||
go func() {
|
||||
_, runErr := pluginSvc.ExecuteJob(context.Background(), &plugin_pb.JobSpec{
|
||||
JobId: "job-admin-script-2",
|
||||
JobType: "admin_script",
|
||||
}, &plugin_pb.ClusterContext{}, 1)
|
||||
adminErrCh <- runErr
|
||||
}()
|
||||
|
||||
adminExecMessage := <-adminSession.outgoing
|
||||
if adminExecMessage.GetExecuteJobRequest() == nil {
|
||||
t.Fatalf("expected admin_script execute request")
|
||||
}
|
||||
|
||||
otherErrCh := make(chan error, 1)
|
||||
go func() {
|
||||
_, runErr := pluginSvc.ExecuteJob(context.Background(), &plugin_pb.JobSpec{
|
||||
JobId: "job-vacuum-1",
|
||||
JobType: "vacuum",
|
||||
}, &plugin_pb.ClusterContext{}, 1)
|
||||
otherErrCh <- runErr
|
||||
}()
|
||||
|
||||
select {
|
||||
case unexpected := <-otherSession.outgoing:
|
||||
t.Fatalf("expected vacuum execute to wait while admin_script runs, got message: %+v", unexpected)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
|
||||
pluginSvc.handleJobCompleted(&plugin_pb.JobCompleted{
|
||||
RequestId: adminExecMessage.RequestId,
|
||||
JobId: "job-admin-script-2",
|
||||
JobType: "admin_script",
|
||||
Success: true,
|
||||
CompletedAt: timestamppb.Now(),
|
||||
})
|
||||
if runErr := <-adminErrCh; runErr != nil {
|
||||
t.Fatalf("admin_script ExecuteJob error: %v", runErr)
|
||||
}
|
||||
|
||||
otherExecMessage := <-otherSession.outgoing
|
||||
if otherExecMessage.GetExecuteJobRequest() == nil {
|
||||
t.Fatalf("expected vacuum execute request after admin_script completion")
|
||||
}
|
||||
pluginSvc.handleJobCompleted(&plugin_pb.JobCompleted{
|
||||
RequestId: otherExecMessage.RequestId,
|
||||
JobId: "job-vacuum-1",
|
||||
JobType: "vacuum",
|
||||
Success: true,
|
||||
CompletedAt: timestamppb.Now(),
|
||||
})
|
||||
if runErr := <-otherErrCh; runErr != nil {
|
||||
t.Fatalf("vacuum ExecuteJob error: %v", runErr)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,3 +195,64 @@ func TestRunDetectionWithReportCapturesDetectionActivities(t *testing.T) {
|
||||
t.Fatalf("expected requested/proposal/completed activities, got stages=%v", stages)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunDetectionAdminScriptUsesLastCompletedRun(t *testing.T) {
|
||||
pluginSvc, err := New(Options{})
|
||||
if err != nil {
|
||||
t.Fatalf("New plugin error: %v", err)
|
||||
}
|
||||
defer pluginSvc.Shutdown()
|
||||
|
||||
jobType := "admin_script"
|
||||
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
|
||||
WorkerId: "worker-admin-script",
|
||||
Capabilities: []*plugin_pb.JobTypeCapability{
|
||||
{JobType: jobType, CanDetect: true, MaxDetectionConcurrency: 1},
|
||||
},
|
||||
})
|
||||
session := &streamSession{workerID: "worker-admin-script", outgoing: make(chan *plugin_pb.AdminToWorkerMessage, 1)}
|
||||
pluginSvc.putSession(session)
|
||||
|
||||
successCompleted := time.Date(2026, 2, 1, 10, 0, 0, 0, time.UTC)
|
||||
errorCompleted := successCompleted.Add(45 * time.Minute)
|
||||
if err := pluginSvc.store.AppendRunRecord(jobType, &JobRunRecord{
|
||||
Outcome: RunOutcomeSuccess,
|
||||
CompletedAt: timeToPtr(successCompleted),
|
||||
}); err != nil {
|
||||
t.Fatalf("AppendRunRecord success run: %v", err)
|
||||
}
|
||||
if err := pluginSvc.store.AppendRunRecord(jobType, &JobRunRecord{
|
||||
Outcome: RunOutcomeError,
|
||||
CompletedAt: timeToPtr(errorCompleted),
|
||||
}); err != nil {
|
||||
t.Fatalf("AppendRunRecord error run: %v", err)
|
||||
}
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
_, runErr := pluginSvc.RunDetection(context.Background(), jobType, &plugin_pb.ClusterContext{}, 10)
|
||||
errCh <- runErr
|
||||
}()
|
||||
|
||||
message := <-session.outgoing
|
||||
detectRequest := message.GetRunDetectionRequest()
|
||||
if detectRequest == nil {
|
||||
t.Fatalf("expected run detection request message")
|
||||
}
|
||||
if detectRequest.LastSuccessfulRun == nil {
|
||||
t.Fatalf("expected last_successful_run to be set")
|
||||
}
|
||||
if got := detectRequest.LastSuccessfulRun.AsTime().UTC(); !got.Equal(errorCompleted) {
|
||||
t.Fatalf("unexpected last_successful_run, got=%s want=%s", got, errorCompleted)
|
||||
}
|
||||
|
||||
pluginSvc.handleDetectionComplete("worker-admin-script", &plugin_pb.DetectionComplete{
|
||||
RequestId: message.RequestId,
|
||||
JobType: jobType,
|
||||
Success: true,
|
||||
})
|
||||
|
||||
if runErr := <-errCh; runErr != nil {
|
||||
t.Fatalf("RunDetection error: %v", runErr)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -321,6 +321,22 @@ func (r *Plugin) pruneDetectorLeases(activeJobTypes map[string]struct{}) {
|
||||
func (r *Plugin) runScheduledDetection(jobType string, policy schedulerPolicy) {
|
||||
defer r.finishDetection(jobType)
|
||||
|
||||
releaseLock, lockErr := r.acquireAdminLock(fmt.Sprintf("plugin scheduled detection %s", jobType))
|
||||
if lockErr != nil {
|
||||
r.recordSchedulerDetectionError(jobType, lockErr)
|
||||
r.appendActivity(JobActivity{
|
||||
JobType: jobType,
|
||||
Source: "admin_scheduler",
|
||||
Message: fmt.Sprintf("scheduled detection aborted: failed to acquire lock: %v", lockErr),
|
||||
Stage: "failed",
|
||||
OccurredAt: timeToPtr(time.Now().UTC()),
|
||||
})
|
||||
return
|
||||
}
|
||||
if releaseLock != nil {
|
||||
defer releaseLock()
|
||||
}
|
||||
|
||||
start := time.Now().UTC()
|
||||
r.appendActivity(JobActivity{
|
||||
JobType: jobType,
|
||||
@@ -331,6 +347,7 @@ func (r *Plugin) runScheduledDetection(jobType string, policy schedulerPolicy) {
|
||||
})
|
||||
|
||||
if skip, waitingCount, waitingThreshold := r.shouldSkipDetectionForWaitingJobs(jobType, policy); skip {
|
||||
r.recordSchedulerDetectionSkip(jobType, fmt.Sprintf("waiting backlog %d reached threshold %d", waitingCount, waitingThreshold))
|
||||
r.appendActivity(JobActivity{
|
||||
JobType: jobType,
|
||||
Source: "admin_scheduler",
|
||||
@@ -343,6 +360,7 @@ func (r *Plugin) runScheduledDetection(jobType string, policy schedulerPolicy) {
|
||||
|
||||
clusterContext, err := r.loadSchedulerClusterContext()
|
||||
if err != nil {
|
||||
r.recordSchedulerDetectionError(jobType, err)
|
||||
r.appendActivity(JobActivity{
|
||||
JobType: jobType,
|
||||
Source: "admin_scheduler",
|
||||
@@ -357,6 +375,7 @@ func (r *Plugin) runScheduledDetection(jobType string, policy schedulerPolicy) {
|
||||
proposals, err := r.RunDetection(ctx, jobType, clusterContext, policy.MaxResults)
|
||||
cancel()
|
||||
if err != nil {
|
||||
r.recordSchedulerDetectionError(jobType, err)
|
||||
r.appendActivity(JobActivity{
|
||||
JobType: jobType,
|
||||
Source: "admin_scheduler",
|
||||
@@ -374,6 +393,7 @@ func (r *Plugin) runScheduledDetection(jobType string, policy schedulerPolicy) {
|
||||
Stage: "detected",
|
||||
OccurredAt: timeToPtr(time.Now().UTC()),
|
||||
})
|
||||
r.recordSchedulerDetectionSuccess(jobType, len(proposals))
|
||||
|
||||
filteredByActive, skippedActive := r.filterProposalsWithActiveJobs(jobType, proposals)
|
||||
if skippedActive > 0 {
|
||||
|
||||
234
weed/admin/plugin/scheduler_status.go
Normal file
234
weed/admin/plugin/scheduler_status.go
Normal file
@@ -0,0 +1,234 @@
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type SchedulerStatus struct {
|
||||
Now time.Time `json:"now"`
|
||||
SchedulerTickSeconds int `json:"scheduler_tick_seconds"`
|
||||
Waiting []SchedulerWaitingStatus `json:"waiting,omitempty"`
|
||||
InProcessJobs []SchedulerJobStatus `json:"in_process_jobs,omitempty"`
|
||||
JobTypes []SchedulerJobTypeStatus `json:"job_types,omitempty"`
|
||||
}
|
||||
|
||||
type SchedulerWaitingStatus struct {
|
||||
Reason string `json:"reason"`
|
||||
JobType string `json:"job_type,omitempty"`
|
||||
Since *time.Time `json:"since,omitempty"`
|
||||
Until *time.Time `json:"until,omitempty"`
|
||||
Details map[string]interface{} `json:"details,omitempty"`
|
||||
}
|
||||
|
||||
type SchedulerJobStatus struct {
|
||||
JobID string `json:"job_id"`
|
||||
JobType string `json:"job_type"`
|
||||
State string `json:"state"`
|
||||
Stage string `json:"stage,omitempty"`
|
||||
WorkerID string `json:"worker_id,omitempty"`
|
||||
Message string `json:"message,omitempty"`
|
||||
Progress float64 `json:"progress,omitempty"`
|
||||
CreatedAt *time.Time `json:"created_at,omitempty"`
|
||||
UpdatedAt *time.Time `json:"updated_at,omitempty"`
|
||||
DurationSeconds float64 `json:"duration_seconds,omitempty"`
|
||||
}
|
||||
|
||||
type SchedulerJobTypeStatus struct {
|
||||
JobType string `json:"job_type"`
|
||||
Enabled bool `json:"enabled"`
|
||||
DetectionInFlight bool `json:"detection_in_flight"`
|
||||
NextDetectionAt *time.Time `json:"next_detection_at,omitempty"`
|
||||
DetectionIntervalSeconds int32 `json:"detection_interval_seconds,omitempty"`
|
||||
LastDetectedAt *time.Time `json:"last_detected_at,omitempty"`
|
||||
LastDetectedCount int `json:"last_detected_count,omitempty"`
|
||||
LastDetectionError string `json:"last_detection_error,omitempty"`
|
||||
LastDetectionSkipped string `json:"last_detection_skipped,omitempty"`
|
||||
}
|
||||
|
||||
type schedulerDetectionInfo struct {
|
||||
lastDetectedAt time.Time
|
||||
lastDetectedCount int
|
||||
lastErrorAt time.Time
|
||||
lastError string
|
||||
lastSkippedAt time.Time
|
||||
lastSkippedReason string
|
||||
}
|
||||
|
||||
func (r *Plugin) recordSchedulerDetectionSuccess(jobType string, count int) {
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
r.schedulerDetectionMu.Lock()
|
||||
defer r.schedulerDetectionMu.Unlock()
|
||||
info := r.schedulerDetection[jobType]
|
||||
if info == nil {
|
||||
info = &schedulerDetectionInfo{}
|
||||
r.schedulerDetection[jobType] = info
|
||||
}
|
||||
info.lastDetectedAt = time.Now().UTC()
|
||||
info.lastDetectedCount = count
|
||||
info.lastError = ""
|
||||
info.lastSkippedReason = ""
|
||||
}
|
||||
|
||||
func (r *Plugin) recordSchedulerDetectionError(jobType string, err error) {
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
r.schedulerDetectionMu.Lock()
|
||||
defer r.schedulerDetectionMu.Unlock()
|
||||
info := r.schedulerDetection[jobType]
|
||||
if info == nil {
|
||||
info = &schedulerDetectionInfo{}
|
||||
r.schedulerDetection[jobType] = info
|
||||
}
|
||||
info.lastErrorAt = time.Now().UTC()
|
||||
info.lastError = err.Error()
|
||||
}
|
||||
|
||||
func (r *Plugin) recordSchedulerDetectionSkip(jobType string, reason string) {
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
if strings.TrimSpace(reason) == "" {
|
||||
return
|
||||
}
|
||||
r.schedulerDetectionMu.Lock()
|
||||
defer r.schedulerDetectionMu.Unlock()
|
||||
info := r.schedulerDetection[jobType]
|
||||
if info == nil {
|
||||
info = &schedulerDetectionInfo{}
|
||||
r.schedulerDetection[jobType] = info
|
||||
}
|
||||
info.lastSkippedAt = time.Now().UTC()
|
||||
info.lastSkippedReason = reason
|
||||
}
|
||||
|
||||
func (r *Plugin) snapshotSchedulerDetection(jobType string) schedulerDetectionInfo {
|
||||
if r == nil {
|
||||
return schedulerDetectionInfo{}
|
||||
}
|
||||
r.schedulerDetectionMu.Lock()
|
||||
defer r.schedulerDetectionMu.Unlock()
|
||||
info := r.schedulerDetection[jobType]
|
||||
if info == nil {
|
||||
return schedulerDetectionInfo{}
|
||||
}
|
||||
return *info
|
||||
}
|
||||
|
||||
func (r *Plugin) GetSchedulerStatus() SchedulerStatus {
|
||||
now := time.Now().UTC()
|
||||
status := SchedulerStatus{
|
||||
Now: now,
|
||||
SchedulerTickSeconds: int(secondsFromDuration(r.schedulerTick)),
|
||||
InProcessJobs: r.listInProcessJobs(now),
|
||||
}
|
||||
|
||||
states, err := r.ListSchedulerStates()
|
||||
if err != nil {
|
||||
return status
|
||||
}
|
||||
|
||||
waiting := make([]SchedulerWaitingStatus, 0)
|
||||
jobTypes := make([]SchedulerJobTypeStatus, 0, len(states))
|
||||
|
||||
for _, state := range states {
|
||||
jobType := state.JobType
|
||||
info := r.snapshotSchedulerDetection(jobType)
|
||||
|
||||
jobStatus := SchedulerJobTypeStatus{
|
||||
JobType: jobType,
|
||||
Enabled: state.Enabled,
|
||||
DetectionInFlight: state.DetectionInFlight,
|
||||
NextDetectionAt: state.NextDetectionAt,
|
||||
DetectionIntervalSeconds: state.DetectionIntervalSeconds,
|
||||
}
|
||||
if !info.lastDetectedAt.IsZero() {
|
||||
jobStatus.LastDetectedAt = timeToPtr(info.lastDetectedAt)
|
||||
jobStatus.LastDetectedCount = info.lastDetectedCount
|
||||
}
|
||||
if info.lastError != "" {
|
||||
jobStatus.LastDetectionError = info.lastError
|
||||
}
|
||||
if info.lastSkippedReason != "" {
|
||||
jobStatus.LastDetectionSkipped = info.lastSkippedReason
|
||||
}
|
||||
jobTypes = append(jobTypes, jobStatus)
|
||||
|
||||
if state.DetectionInFlight {
|
||||
waiting = append(waiting, SchedulerWaitingStatus{
|
||||
Reason: "detection_in_flight",
|
||||
JobType: jobType,
|
||||
})
|
||||
} else if state.Enabled && state.NextDetectionAt != nil && now.Before(*state.NextDetectionAt) {
|
||||
waiting = append(waiting, SchedulerWaitingStatus{
|
||||
Reason: "next_detection_at",
|
||||
JobType: jobType,
|
||||
Until: state.NextDetectionAt,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(jobTypes, func(i, j int) bool {
|
||||
return jobTypes[i].JobType < jobTypes[j].JobType
|
||||
})
|
||||
|
||||
status.Waiting = waiting
|
||||
status.JobTypes = jobTypes
|
||||
return status
|
||||
}
|
||||
|
||||
func (r *Plugin) listInProcessJobs(now time.Time) []SchedulerJobStatus {
|
||||
active := make([]SchedulerJobStatus, 0)
|
||||
if r == nil {
|
||||
return active
|
||||
}
|
||||
|
||||
r.jobsMu.RLock()
|
||||
for _, job := range r.jobs {
|
||||
if job == nil {
|
||||
continue
|
||||
}
|
||||
if !isActiveTrackedJobState(job.State) {
|
||||
continue
|
||||
}
|
||||
start := timeToPtr(now)
|
||||
if job.CreatedAt != nil && !job.CreatedAt.IsZero() {
|
||||
start = job.CreatedAt
|
||||
} else if job.UpdatedAt != nil && !job.UpdatedAt.IsZero() {
|
||||
start = job.UpdatedAt
|
||||
}
|
||||
durationSeconds := 0.0
|
||||
if start != nil {
|
||||
durationSeconds = now.Sub(*start).Seconds()
|
||||
}
|
||||
active = append(active, SchedulerJobStatus{
|
||||
JobID: job.JobID,
|
||||
JobType: job.JobType,
|
||||
State: strings.ToLower(job.State),
|
||||
Stage: job.Stage,
|
||||
WorkerID: job.WorkerID,
|
||||
Message: job.Message,
|
||||
Progress: job.Progress,
|
||||
CreatedAt: job.CreatedAt,
|
||||
UpdatedAt: job.UpdatedAt,
|
||||
DurationSeconds: durationSeconds,
|
||||
})
|
||||
}
|
||||
r.jobsMu.RUnlock()
|
||||
|
||||
sort.Slice(active, func(i, j int) bool {
|
||||
if active[i].DurationSeconds != active[j].DurationSeconds {
|
||||
return active[i].DurationSeconds > active[j].DurationSeconds
|
||||
}
|
||||
return active[i].JobID < active[j].JobID
|
||||
})
|
||||
|
||||
return active
|
||||
}
|
||||
64
weed/admin/plugin/scheduler_status_test.go
Normal file
64
weed/admin/plugin/scheduler_status_test.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
||||
)
|
||||
|
||||
func TestGetSchedulerStatusIncludesInProcessJobs(t *testing.T) {
|
||||
pluginSvc, err := New(Options{})
|
||||
if err != nil {
|
||||
t.Fatalf("New: %v", err)
|
||||
}
|
||||
defer pluginSvc.Shutdown()
|
||||
|
||||
pluginSvc.trackExecutionStart("req-1", "worker-a", &plugin_pb.JobSpec{
|
||||
JobId: "job-1",
|
||||
JobType: "vacuum",
|
||||
}, 1)
|
||||
|
||||
status := pluginSvc.GetSchedulerStatus()
|
||||
if len(status.InProcessJobs) != 1 {
|
||||
t.Fatalf("expected one in-process job, got %d", len(status.InProcessJobs))
|
||||
}
|
||||
if status.InProcessJobs[0].JobID != "job-1" {
|
||||
t.Fatalf("unexpected job id: %s", status.InProcessJobs[0].JobID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetSchedulerStatusIncludesLastDetectionCount(t *testing.T) {
|
||||
pluginSvc, err := New(Options{})
|
||||
if err != nil {
|
||||
t.Fatalf("New: %v", err)
|
||||
}
|
||||
defer pluginSvc.Shutdown()
|
||||
|
||||
const jobType = "vacuum"
|
||||
pluginSvc.registry.UpsertFromHello(&plugin_pb.WorkerHello{
|
||||
WorkerId: "worker-a",
|
||||
Capabilities: []*plugin_pb.JobTypeCapability{
|
||||
{JobType: jobType, CanDetect: true},
|
||||
},
|
||||
})
|
||||
|
||||
pluginSvc.recordSchedulerDetectionSuccess(jobType, 3)
|
||||
|
||||
status := pluginSvc.GetSchedulerStatus()
|
||||
found := false
|
||||
for _, jt := range status.JobTypes {
|
||||
if jt.JobType != jobType {
|
||||
continue
|
||||
}
|
||||
found = true
|
||||
if jt.LastDetectedCount != 3 {
|
||||
t.Fatalf("unexpected last detected count: got=%d want=3", jt.LastDetectedCount)
|
||||
}
|
||||
if jt.LastDetectedAt == nil {
|
||||
t.Fatalf("expected last detected at to be set")
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatalf("expected job type status for %s", jobType)
|
||||
}
|
||||
}
|
||||
@@ -511,6 +511,9 @@ templ Plugin(page string) {
|
||||
.plugin-form-root .card {
|
||||
border: 1px solid #dee2e6;
|
||||
}
|
||||
.plugin-form-root textarea {
|
||||
min-height: 12rem;
|
||||
}
|
||||
|
||||
.plugin-field-hidden {
|
||||
display: none;
|
||||
|
||||
File diff suppressed because one or more lines are too long
Reference in New Issue
Block a user