diff --git a/test/erasure_coding/admin_dockertest/ec_integration_test.go b/test/erasure_coding/admin_dockertest/ec_integration_test.go index 28856f4f1..1bd1b2095 100644 --- a/test/erasure_coding/admin_dockertest/ec_integration_test.go +++ b/test/erasure_coding/admin_dockertest/ec_integration_test.go @@ -194,28 +194,6 @@ func TestEcEndToEnd(t *testing.T) { // 1. Configure plugin job types for fast EC detection/execution. t.Log("Configuring plugin job types via API...") - schedulerConfig := map[string]interface{}{ - "idle_sleep_seconds": 1, - } - jsonBody, err := json.Marshal(schedulerConfig) - if err != nil { - t.Fatalf("Failed to marshal scheduler config: %v", err) - } - req, err := http.NewRequest("PUT", AdminUrl+"/api/plugin/scheduler-config", bytes.NewBuffer(jsonBody)) - if err != nil { - t.Fatalf("Failed to create scheduler config request: %v", err) - } - req.Header.Set("Content-Type", "application/json") - resp, err := client.Do(req) - if err != nil { - t.Fatalf("Failed to update scheduler config: %v", err) - } - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - t.Fatalf("Failed to update scheduler config (status %d): %s", resp.StatusCode, string(body)) - } - resp.Body.Close() - // Disable volume balance to reduce interference for this EC-focused test. balanceConfig := map[string]interface{}{ "job_type": "volume_balance", @@ -223,16 +201,16 @@ func TestEcEndToEnd(t *testing.T) { "enabled": false, }, } - jsonBody, err = json.Marshal(balanceConfig) + jsonBody, err := json.Marshal(balanceConfig) if err != nil { t.Fatalf("Failed to marshal volume_balance config: %v", err) } - req, err = http.NewRequest("PUT", AdminUrl+"/api/plugin/job-types/volume_balance/config", bytes.NewBuffer(jsonBody)) + req, err := http.NewRequest("PUT", AdminUrl+"/api/plugin/job-types/volume_balance/config", bytes.NewBuffer(jsonBody)) if err != nil { t.Fatalf("Failed to create volume_balance config request: %v", err) } req.Header.Set("Content-Type", "application/json") - resp, err = client.Do(req) + resp, err := client.Do(req) if err != nil { t.Fatalf("Failed to update volume_balance config: %v", err) } diff --git a/weed/admin/dash/plugin_api.go b/weed/admin/dash/plugin_api.go index 632800405..f6756d4b3 100644 --- a/weed/admin/dash/plugin_api.go +++ b/weed/admin/dash/plugin_api.go @@ -235,53 +235,6 @@ func (s *AdminServer) GetPluginSchedulerStatusAPI(w http.ResponseWriter, r *http writeJSON(w, http.StatusOK, response) } -// GetPluginSchedulerConfigAPI returns scheduler configuration. -func (s *AdminServer) GetPluginSchedulerConfigAPI(w http.ResponseWriter, r *http.Request) { - pluginSvc := s.GetPlugin() - if pluginSvc == nil { - writeJSONError(w, http.StatusNotFound, "plugin is not enabled") - return - } - - writeJSON(w, http.StatusOK, pluginSvc.GetSchedulerConfig()) -} - -// UpdatePluginSchedulerConfigAPI updates scheduler configuration. -func (s *AdminServer) UpdatePluginSchedulerConfigAPI(w http.ResponseWriter, r *http.Request) { - pluginSvc := s.GetPlugin() - if pluginSvc == nil { - writeJSONError(w, http.StatusNotFound, "plugin is not enabled") - return - } - - var req struct { - IdleSleepSeconds *int32 `json:"idle_sleep_seconds"` - } - - if err := decodeJSONBody(newJSONMaxReader(w, r), &req); err != nil { - if errors.Is(err, io.EOF) { - writeJSONError(w, http.StatusBadRequest, "request body is required") - return - } - writeJSONError(w, http.StatusBadRequest, "invalid request body: "+err.Error()) - return - } - if req.IdleSleepSeconds == nil { - writeJSONError(w, http.StatusBadRequest, "idle_sleep_seconds is required") - return - } - - updated, err := pluginSvc.UpdateSchedulerConfig(plugin.SchedulerConfig{ - IdleSleepSeconds: *req.IdleSleepSeconds, - }) - if err != nil { - writeJSONError(w, http.StatusInternalServerError, err.Error()) - return - } - - writeJSON(w, http.StatusOK, updated) -} - // RequestPluginJobTypeSchemaAPI asks a worker for one job type schema. func (s *AdminServer) RequestPluginJobTypeSchemaAPI(w http.ResponseWriter, r *http.Request) { jobType := strings.TrimSpace(mux.Vars(r)["jobType"]) diff --git a/weed/admin/handlers/admin_handlers.go b/weed/admin/handlers/admin_handlers.go index b110c9bff..dc82bbfbd 100644 --- a/weed/admin/handlers/admin_handlers.go +++ b/weed/admin/handlers/admin_handlers.go @@ -247,8 +247,6 @@ func (h *AdminHandlers) registerAPIRoutes(api *mux.Router, enforceWrite bool) { pluginApi.HandleFunc("/status", h.adminServer.GetPluginStatusAPI).Methods(http.MethodGet) pluginApi.HandleFunc("/workers", h.adminServer.GetPluginWorkersAPI).Methods(http.MethodGet) pluginApi.HandleFunc("/job-types", h.adminServer.GetPluginJobTypesAPI).Methods(http.MethodGet) - pluginApi.HandleFunc("/scheduler-config", h.adminServer.GetPluginSchedulerConfigAPI).Methods(http.MethodGet) - pluginApi.Handle("/scheduler-config", wrapWrite(h.adminServer.UpdatePluginSchedulerConfigAPI)).Methods(http.MethodPut) pluginApi.HandleFunc("/jobs", h.adminServer.GetPluginJobsAPI).Methods(http.MethodGet) pluginApi.HandleFunc("/jobs/{jobId}", h.adminServer.GetPluginJobAPI).Methods(http.MethodGet) pluginApi.HandleFunc("/jobs/{jobId}/detail", h.adminServer.GetPluginJobDetailAPI).Methods(http.MethodGet) diff --git a/weed/admin/plugin/config_store.go b/weed/admin/plugin/config_store.go index c36f7bd67..f237b6354 100644 --- a/weed/admin/plugin/config_store.go +++ b/weed/admin/plugin/config_store.go @@ -30,7 +30,6 @@ const ( runsJSONFileName = "runs.json" trackedJobsJSONFileName = "tracked_jobs.json" activitiesJSONFileName = "activities.json" - schedulerJSONFileName = "scheduler.json" defaultDirPerm = 0o755 defaultFilePerm = 0o644 ) @@ -54,7 +53,6 @@ type ConfigStore struct { memTrackedJobs []TrackedJob memActivities []JobActivity memJobDetails map[string]TrackedJob - memScheduler *SchedulerConfig } func NewConfigStore(adminDataDir string) (*ConfigStore, error) { @@ -95,60 +93,6 @@ func (s *ConfigStore) BaseDir() string { return s.baseDir } -func (s *ConfigStore) LoadSchedulerConfig() (*SchedulerConfig, error) { - s.mu.RLock() - if !s.configured { - cfg := s.memScheduler - s.mu.RUnlock() - if cfg == nil { - return nil, nil - } - clone := *cfg - return &clone, nil - } - s.mu.RUnlock() - - path := filepath.Join(s.baseDir, schedulerJSONFileName) - data, err := os.ReadFile(path) - if err != nil { - if os.IsNotExist(err) { - return nil, nil - } - return nil, fmt.Errorf("read scheduler config: %w", err) - } - - var cfg SchedulerConfig - if err := json.Unmarshal(data, &cfg); err != nil { - return nil, fmt.Errorf("unmarshal scheduler config: %w", err) - } - return &cfg, nil -} - -func (s *ConfigStore) SaveSchedulerConfig(config *SchedulerConfig) error { - if config == nil { - return fmt.Errorf("scheduler config is nil") - } - normalized := normalizeSchedulerConfig(*config) - - s.mu.Lock() - if !s.configured { - s.memScheduler = &normalized - s.mu.Unlock() - return nil - } - s.mu.Unlock() - - payload, err := json.MarshalIndent(normalized, "", " ") - if err != nil { - return fmt.Errorf("marshal scheduler config: %w", err) - } - path := filepath.Join(s.baseDir, schedulerJSONFileName) - if err := os.WriteFile(path, payload, defaultFilePerm); err != nil { - return fmt.Errorf("save scheduler config: %w", err) - } - return nil -} - func (s *ConfigStore) SaveDescriptor(jobType string, descriptor *plugin_pb.JobTypeDescriptor) error { if descriptor == nil { return fmt.Errorf("descriptor is nil") diff --git a/weed/admin/plugin/plugin.go b/weed/admin/plugin/plugin.go index 18ebcb891..a4bb5e575 100644 --- a/weed/admin/plugin/plugin.go +++ b/weed/admin/plugin/plugin.go @@ -78,9 +78,7 @@ type Plugin struct { schedulerRun map[string]*schedulerRunInfo schedulerLoopMu sync.Mutex schedulerLoopState schedulerLoopState - schedulerConfigMu sync.RWMutex - schedulerConfig SchedulerConfig - schedulerWakeCh chan struct{} + schedulerWakeCh chan struct{} dedupeMu sync.Mutex recentDedupeByType map[string]map[string]time.Time @@ -188,21 +186,6 @@ func New(options Options) (*Plugin, error) { } plugin.ctx, plugin.ctxCancel = context.WithCancel(context.Background()) - if cfg, err := plugin.store.LoadSchedulerConfig(); err != nil { - glog.Warningf("Plugin failed to load scheduler config: %v", err) - plugin.schedulerConfig = DefaultSchedulerConfig() - } else if cfg == nil { - defaults := DefaultSchedulerConfig() - plugin.schedulerConfig = defaults - if plugin.store.IsConfigured() { - if err := plugin.store.SaveSchedulerConfig(&defaults); err != nil { - glog.Warningf("Plugin failed to persist scheduler defaults: %v", err) - } - } - } else { - plugin.schedulerConfig = normalizeSchedulerConfig(*cfg) - } - if err := plugin.loadPersistedMonitorState(); err != nil { glog.Warningf("Plugin failed to load persisted monitoring state: %v", err) } @@ -426,31 +409,6 @@ func (r *Plugin) BaseDir() string { return r.store.BaseDir() } -func (r *Plugin) GetSchedulerConfig() SchedulerConfig { - if r == nil { - return DefaultSchedulerConfig() - } - r.schedulerConfigMu.RLock() - cfg := r.schedulerConfig - r.schedulerConfigMu.RUnlock() - return normalizeSchedulerConfig(cfg) -} - -func (r *Plugin) UpdateSchedulerConfig(cfg SchedulerConfig) (SchedulerConfig, error) { - if r == nil { - return DefaultSchedulerConfig(), fmt.Errorf("plugin is not initialized") - } - normalized := normalizeSchedulerConfig(cfg) - if err := r.store.SaveSchedulerConfig(&normalized); err != nil { - return SchedulerConfig{}, err - } - r.schedulerConfigMu.Lock() - r.schedulerConfig = normalized - r.schedulerConfigMu.Unlock() - r.wakeScheduler() - return normalized, nil -} - func (r *Plugin) acquireAdminLock(reason string) (func(), error) { if r == nil || r.lockManager == nil { return func() {}, nil diff --git a/weed/admin/plugin/plugin_scheduler.go b/weed/admin/plugin/plugin_scheduler.go index ec0eddf4b..5b6eba7ab 100644 --- a/weed/admin/plugin/plugin_scheduler.go +++ b/weed/admin/plugin/plugin_scheduler.go @@ -64,7 +64,7 @@ func (r *Plugin) schedulerLoop() { } r.setSchedulerLoopState("", "sleeping") - idleSleep := r.GetSchedulerConfig().IdleSleepDuration() + idleSleep := defaultSchedulerIdleSleep if nextRun := r.earliestNextDetectionAt(); !nextRun.IsZero() { if until := time.Until(nextRun); until <= 0 { idleSleep = 0 @@ -1134,22 +1134,6 @@ func secondsFromDuration(duration time.Duration) int32 { return int32(duration / time.Second) } -func waitForShutdownOrTimer(shutdown <-chan struct{}, duration time.Duration) bool { - if duration <= 0 { - return true - } - - timer := time.NewTimer(duration) - defer timer.Stop() - - select { - case <-shutdown: - return false - case <-timer.C: - return true - } -} - func waitForShutdownOrTimerWithContext(shutdown <-chan struct{}, ctx context.Context, duration time.Duration) bool { if duration <= 0 { return true diff --git a/weed/admin/plugin/scheduler_config.go b/weed/admin/plugin/scheduler_config.go index 1feb1a736..fe3fd2286 100644 --- a/weed/admin/plugin/scheduler_config.go +++ b/weed/admin/plugin/scheduler_config.go @@ -2,30 +2,4 @@ package plugin import "time" -const ( - defaultSchedulerIdleSleep = 613 * time.Second -) - -type SchedulerConfig struct { - IdleSleepSeconds int32 `json:"idle_sleep_seconds"` -} - -func DefaultSchedulerConfig() SchedulerConfig { - return SchedulerConfig{ - IdleSleepSeconds: int32(defaultSchedulerIdleSleep / time.Second), - } -} - -func normalizeSchedulerConfig(cfg SchedulerConfig) SchedulerConfig { - if cfg.IdleSleepSeconds <= 0 { - return DefaultSchedulerConfig() - } - return cfg -} - -func (c SchedulerConfig) IdleSleepDuration() time.Duration { - if c.IdleSleepSeconds <= 0 { - return defaultSchedulerIdleSleep - } - return time.Duration(c.IdleSleepSeconds) * time.Second -} +const defaultSchedulerIdleSleep = 61 * time.Second diff --git a/weed/admin/plugin/scheduler_status.go b/weed/admin/plugin/scheduler_status.go index a2d9e621b..75ae55aa9 100644 --- a/weed/admin/plugin/scheduler_status.go +++ b/weed/admin/plugin/scheduler_status.go @@ -216,22 +216,18 @@ func (r *Plugin) snapshotSchedulerLoopState() schedulerLoopState { func (r *Plugin) GetSchedulerStatus() SchedulerStatus { now := time.Now().UTC() loopState := r.snapshotSchedulerLoopState() - schedulerConfig := r.GetSchedulerConfig() status := SchedulerStatus{ Now: now, SchedulerTickSeconds: int(secondsFromDuration(r.schedulerTick)), InProcessJobs: r.listInProcessJobs(now), - IdleSleepSeconds: int(schedulerConfig.IdleSleepSeconds), + IdleSleepSeconds: int(defaultSchedulerIdleSleep / time.Second), CurrentJobType: loopState.currentJobType, CurrentPhase: loopState.currentPhase, LastIterationHadJobs: loopState.lastIterationHadJobs, } nextDetectionAt := r.earliestNextDetectionAt() if nextDetectionAt.IsZero() && loopState.currentPhase == "sleeping" && !loopState.lastIterationCompleted.IsZero() { - idleSleep := schedulerConfig.IdleSleepDuration() - if idleSleep > 0 { - nextDetectionAt = loopState.lastIterationCompleted.Add(idleSleep) - } + nextDetectionAt = loopState.lastIterationCompleted.Add(defaultSchedulerIdleSleep) } if !nextDetectionAt.IsZero() { at := nextDetectionAt