simplify plugin scheduler: remove configurable IdleSleepSeconds, use constant 61s

The SchedulerConfig struct and its persistence/API were unnecessary
indirection. Replace with a simple constant (reduced from 613s to 61s)
so the scheduler re-checks for detectable job types promptly after
going idle, improving the clean-install experience.
This commit is contained in:
Chris Lu
2026-03-09 22:41:03 -07:00
parent 8ad58e7002
commit 1bd7a98a4a
8 changed files with 8 additions and 223 deletions

View File

@@ -194,28 +194,6 @@ func TestEcEndToEnd(t *testing.T) {
// 1. Configure plugin job types for fast EC detection/execution.
t.Log("Configuring plugin job types via API...")
schedulerConfig := map[string]interface{}{
"idle_sleep_seconds": 1,
}
jsonBody, err := json.Marshal(schedulerConfig)
if err != nil {
t.Fatalf("Failed to marshal scheduler config: %v", err)
}
req, err := http.NewRequest("PUT", AdminUrl+"/api/plugin/scheduler-config", bytes.NewBuffer(jsonBody))
if err != nil {
t.Fatalf("Failed to create scheduler config request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Failed to update scheduler config: %v", err)
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("Failed to update scheduler config (status %d): %s", resp.StatusCode, string(body))
}
resp.Body.Close()
// Disable volume balance to reduce interference for this EC-focused test.
balanceConfig := map[string]interface{}{
"job_type": "volume_balance",
@@ -223,16 +201,16 @@ func TestEcEndToEnd(t *testing.T) {
"enabled": false,
},
}
jsonBody, err = json.Marshal(balanceConfig)
jsonBody, err := json.Marshal(balanceConfig)
if err != nil {
t.Fatalf("Failed to marshal volume_balance config: %v", err)
}
req, err = http.NewRequest("PUT", AdminUrl+"/api/plugin/job-types/volume_balance/config", bytes.NewBuffer(jsonBody))
req, err := http.NewRequest("PUT", AdminUrl+"/api/plugin/job-types/volume_balance/config", bytes.NewBuffer(jsonBody))
if err != nil {
t.Fatalf("Failed to create volume_balance config request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err = client.Do(req)
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Failed to update volume_balance config: %v", err)
}

View File

@@ -235,53 +235,6 @@ func (s *AdminServer) GetPluginSchedulerStatusAPI(w http.ResponseWriter, r *http
writeJSON(w, http.StatusOK, response)
}
// GetPluginSchedulerConfigAPI returns scheduler configuration.
func (s *AdminServer) GetPluginSchedulerConfigAPI(w http.ResponseWriter, r *http.Request) {
pluginSvc := s.GetPlugin()
if pluginSvc == nil {
writeJSONError(w, http.StatusNotFound, "plugin is not enabled")
return
}
writeJSON(w, http.StatusOK, pluginSvc.GetSchedulerConfig())
}
// UpdatePluginSchedulerConfigAPI updates scheduler configuration.
func (s *AdminServer) UpdatePluginSchedulerConfigAPI(w http.ResponseWriter, r *http.Request) {
pluginSvc := s.GetPlugin()
if pluginSvc == nil {
writeJSONError(w, http.StatusNotFound, "plugin is not enabled")
return
}
var req struct {
IdleSleepSeconds *int32 `json:"idle_sleep_seconds"`
}
if err := decodeJSONBody(newJSONMaxReader(w, r), &req); err != nil {
if errors.Is(err, io.EOF) {
writeJSONError(w, http.StatusBadRequest, "request body is required")
return
}
writeJSONError(w, http.StatusBadRequest, "invalid request body: "+err.Error())
return
}
if req.IdleSleepSeconds == nil {
writeJSONError(w, http.StatusBadRequest, "idle_sleep_seconds is required")
return
}
updated, err := pluginSvc.UpdateSchedulerConfig(plugin.SchedulerConfig{
IdleSleepSeconds: *req.IdleSleepSeconds,
})
if err != nil {
writeJSONError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, updated)
}
// RequestPluginJobTypeSchemaAPI asks a worker for one job type schema.
func (s *AdminServer) RequestPluginJobTypeSchemaAPI(w http.ResponseWriter, r *http.Request) {
jobType := strings.TrimSpace(mux.Vars(r)["jobType"])

View File

@@ -247,8 +247,6 @@ func (h *AdminHandlers) registerAPIRoutes(api *mux.Router, enforceWrite bool) {
pluginApi.HandleFunc("/status", h.adminServer.GetPluginStatusAPI).Methods(http.MethodGet)
pluginApi.HandleFunc("/workers", h.adminServer.GetPluginWorkersAPI).Methods(http.MethodGet)
pluginApi.HandleFunc("/job-types", h.adminServer.GetPluginJobTypesAPI).Methods(http.MethodGet)
pluginApi.HandleFunc("/scheduler-config", h.adminServer.GetPluginSchedulerConfigAPI).Methods(http.MethodGet)
pluginApi.Handle("/scheduler-config", wrapWrite(h.adminServer.UpdatePluginSchedulerConfigAPI)).Methods(http.MethodPut)
pluginApi.HandleFunc("/jobs", h.adminServer.GetPluginJobsAPI).Methods(http.MethodGet)
pluginApi.HandleFunc("/jobs/{jobId}", h.adminServer.GetPluginJobAPI).Methods(http.MethodGet)
pluginApi.HandleFunc("/jobs/{jobId}/detail", h.adminServer.GetPluginJobDetailAPI).Methods(http.MethodGet)

View File

@@ -30,7 +30,6 @@ const (
runsJSONFileName = "runs.json"
trackedJobsJSONFileName = "tracked_jobs.json"
activitiesJSONFileName = "activities.json"
schedulerJSONFileName = "scheduler.json"
defaultDirPerm = 0o755
defaultFilePerm = 0o644
)
@@ -54,7 +53,6 @@ type ConfigStore struct {
memTrackedJobs []TrackedJob
memActivities []JobActivity
memJobDetails map[string]TrackedJob
memScheduler *SchedulerConfig
}
func NewConfigStore(adminDataDir string) (*ConfigStore, error) {
@@ -95,60 +93,6 @@ func (s *ConfigStore) BaseDir() string {
return s.baseDir
}
func (s *ConfigStore) LoadSchedulerConfig() (*SchedulerConfig, error) {
s.mu.RLock()
if !s.configured {
cfg := s.memScheduler
s.mu.RUnlock()
if cfg == nil {
return nil, nil
}
clone := *cfg
return &clone, nil
}
s.mu.RUnlock()
path := filepath.Join(s.baseDir, schedulerJSONFileName)
data, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, fmt.Errorf("read scheduler config: %w", err)
}
var cfg SchedulerConfig
if err := json.Unmarshal(data, &cfg); err != nil {
return nil, fmt.Errorf("unmarshal scheduler config: %w", err)
}
return &cfg, nil
}
func (s *ConfigStore) SaveSchedulerConfig(config *SchedulerConfig) error {
if config == nil {
return fmt.Errorf("scheduler config is nil")
}
normalized := normalizeSchedulerConfig(*config)
s.mu.Lock()
if !s.configured {
s.memScheduler = &normalized
s.mu.Unlock()
return nil
}
s.mu.Unlock()
payload, err := json.MarshalIndent(normalized, "", " ")
if err != nil {
return fmt.Errorf("marshal scheduler config: %w", err)
}
path := filepath.Join(s.baseDir, schedulerJSONFileName)
if err := os.WriteFile(path, payload, defaultFilePerm); err != nil {
return fmt.Errorf("save scheduler config: %w", err)
}
return nil
}
func (s *ConfigStore) SaveDescriptor(jobType string, descriptor *plugin_pb.JobTypeDescriptor) error {
if descriptor == nil {
return fmt.Errorf("descriptor is nil")

View File

@@ -78,9 +78,7 @@ type Plugin struct {
schedulerRun map[string]*schedulerRunInfo
schedulerLoopMu sync.Mutex
schedulerLoopState schedulerLoopState
schedulerConfigMu sync.RWMutex
schedulerConfig SchedulerConfig
schedulerWakeCh chan struct{}
schedulerWakeCh chan struct{}
dedupeMu sync.Mutex
recentDedupeByType map[string]map[string]time.Time
@@ -188,21 +186,6 @@ func New(options Options) (*Plugin, error) {
}
plugin.ctx, plugin.ctxCancel = context.WithCancel(context.Background())
if cfg, err := plugin.store.LoadSchedulerConfig(); err != nil {
glog.Warningf("Plugin failed to load scheduler config: %v", err)
plugin.schedulerConfig = DefaultSchedulerConfig()
} else if cfg == nil {
defaults := DefaultSchedulerConfig()
plugin.schedulerConfig = defaults
if plugin.store.IsConfigured() {
if err := plugin.store.SaveSchedulerConfig(&defaults); err != nil {
glog.Warningf("Plugin failed to persist scheduler defaults: %v", err)
}
}
} else {
plugin.schedulerConfig = normalizeSchedulerConfig(*cfg)
}
if err := plugin.loadPersistedMonitorState(); err != nil {
glog.Warningf("Plugin failed to load persisted monitoring state: %v", err)
}
@@ -426,31 +409,6 @@ func (r *Plugin) BaseDir() string {
return r.store.BaseDir()
}
func (r *Plugin) GetSchedulerConfig() SchedulerConfig {
if r == nil {
return DefaultSchedulerConfig()
}
r.schedulerConfigMu.RLock()
cfg := r.schedulerConfig
r.schedulerConfigMu.RUnlock()
return normalizeSchedulerConfig(cfg)
}
func (r *Plugin) UpdateSchedulerConfig(cfg SchedulerConfig) (SchedulerConfig, error) {
if r == nil {
return DefaultSchedulerConfig(), fmt.Errorf("plugin is not initialized")
}
normalized := normalizeSchedulerConfig(cfg)
if err := r.store.SaveSchedulerConfig(&normalized); err != nil {
return SchedulerConfig{}, err
}
r.schedulerConfigMu.Lock()
r.schedulerConfig = normalized
r.schedulerConfigMu.Unlock()
r.wakeScheduler()
return normalized, nil
}
func (r *Plugin) acquireAdminLock(reason string) (func(), error) {
if r == nil || r.lockManager == nil {
return func() {}, nil

View File

@@ -64,7 +64,7 @@ func (r *Plugin) schedulerLoop() {
}
r.setSchedulerLoopState("", "sleeping")
idleSleep := r.GetSchedulerConfig().IdleSleepDuration()
idleSleep := defaultSchedulerIdleSleep
if nextRun := r.earliestNextDetectionAt(); !nextRun.IsZero() {
if until := time.Until(nextRun); until <= 0 {
idleSleep = 0
@@ -1134,22 +1134,6 @@ func secondsFromDuration(duration time.Duration) int32 {
return int32(duration / time.Second)
}
func waitForShutdownOrTimer(shutdown <-chan struct{}, duration time.Duration) bool {
if duration <= 0 {
return true
}
timer := time.NewTimer(duration)
defer timer.Stop()
select {
case <-shutdown:
return false
case <-timer.C:
return true
}
}
func waitForShutdownOrTimerWithContext(shutdown <-chan struct{}, ctx context.Context, duration time.Duration) bool {
if duration <= 0 {
return true

View File

@@ -2,30 +2,4 @@ package plugin
import "time"
const (
defaultSchedulerIdleSleep = 613 * time.Second
)
type SchedulerConfig struct {
IdleSleepSeconds int32 `json:"idle_sleep_seconds"`
}
func DefaultSchedulerConfig() SchedulerConfig {
return SchedulerConfig{
IdleSleepSeconds: int32(defaultSchedulerIdleSleep / time.Second),
}
}
func normalizeSchedulerConfig(cfg SchedulerConfig) SchedulerConfig {
if cfg.IdleSleepSeconds <= 0 {
return DefaultSchedulerConfig()
}
return cfg
}
func (c SchedulerConfig) IdleSleepDuration() time.Duration {
if c.IdleSleepSeconds <= 0 {
return defaultSchedulerIdleSleep
}
return time.Duration(c.IdleSleepSeconds) * time.Second
}
const defaultSchedulerIdleSleep = 61 * time.Second

View File

@@ -216,22 +216,18 @@ func (r *Plugin) snapshotSchedulerLoopState() schedulerLoopState {
func (r *Plugin) GetSchedulerStatus() SchedulerStatus {
now := time.Now().UTC()
loopState := r.snapshotSchedulerLoopState()
schedulerConfig := r.GetSchedulerConfig()
status := SchedulerStatus{
Now: now,
SchedulerTickSeconds: int(secondsFromDuration(r.schedulerTick)),
InProcessJobs: r.listInProcessJobs(now),
IdleSleepSeconds: int(schedulerConfig.IdleSleepSeconds),
IdleSleepSeconds: int(defaultSchedulerIdleSleep / time.Second),
CurrentJobType: loopState.currentJobType,
CurrentPhase: loopState.currentPhase,
LastIterationHadJobs: loopState.lastIterationHadJobs,
}
nextDetectionAt := r.earliestNextDetectionAt()
if nextDetectionAt.IsZero() && loopState.currentPhase == "sleeping" && !loopState.lastIterationCompleted.IsZero() {
idleSleep := schedulerConfig.IdleSleepDuration()
if idleSleep > 0 {
nextDetectionAt = loopState.lastIterationCompleted.Add(idleSleep)
}
nextDetectionAt = loopState.lastIterationCompleted.Add(defaultSchedulerIdleSleep)
}
if !nextDetectionAt.IsZero() {
at := nextDetectionAt