Admin UI add maintenance menu (#6944)

* add ui for maintenance

* valid config loading. fix workers page.

* refactor

* grpc between admin and workers

* add a long-running bidirectional grpc call between admin and worker
* use the grpc call to heartbeat
* use the grpc call to communicate
* worker can remove the http client
* admin uses http port + 10000 as its default grpc port

* one task one package

* handles connection failures gracefully with exponential backoff

* grpc with insecure tls

* grpc with optional tls

* fix detecting tls

* change time config from nano seconds to seconds

* add tasks with 3 interfaces

* compiles reducing hard coded

* remove a couple of tasks

* remove hard coded references

* reduce hard coded values

* remove hard coded values

* remove hard coded from templ

* refactor maintenance package

* fix import cycle

* simplify

* simplify

* auto register

* auto register factory

* auto register task types

* self register types

* refactor

* simplify

* remove one task

* register ui

* lazy init executor factories

* use registered task types

* DefaultWorkerConfig remove hard coded task types

* remove more hard coded

* implement get maintenance task

* dynamic task configuration

* "System Settings" should only have system level settings

* adjust menu for tasks

* ensure menu not collapsed

* render job configuration well

* use templ for ui of task configuration

* fix ordering

* fix bugs

* saving duration in seconds

* use value and unit for duration

* Delete WORKER_REFACTORING_PLAN.md

* Delete maintenance.json

* Delete custom_worker_example.go

* remove address from workers

* remove old code from ec task

* remove creating collection button

* reconnect with exponential backoff

* worker use security.toml

* start admin server with tls info from security.toml

* fix "weed admin" cli description
This commit is contained in:
Chris Lu
2025-07-06 13:57:02 -07:00
committed by GitHub
parent 302e62d480
commit aa66852304
76 changed files with 18218 additions and 206 deletions

View File

@@ -0,0 +1,409 @@
package maintenance
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// MaintenanceIntegration bridges the task system with existing maintenance
type MaintenanceIntegration struct {
taskRegistry *types.TaskRegistry
uiRegistry *types.UIRegistry
// Bridge to existing system
maintenanceQueue *MaintenanceQueue
maintenancePolicy *MaintenancePolicy
// Type conversion maps
taskTypeMap map[types.TaskType]MaintenanceTaskType
revTaskTypeMap map[MaintenanceTaskType]types.TaskType
priorityMap map[types.TaskPriority]MaintenanceTaskPriority
revPriorityMap map[MaintenanceTaskPriority]types.TaskPriority
}
// NewMaintenanceIntegration creates the integration bridge
func NewMaintenanceIntegration(queue *MaintenanceQueue, policy *MaintenancePolicy) *MaintenanceIntegration {
integration := &MaintenanceIntegration{
taskRegistry: tasks.GetGlobalTypesRegistry(), // Use global types registry with auto-registered tasks
uiRegistry: tasks.GetGlobalUIRegistry(), // Use global UI registry with auto-registered UI providers
maintenanceQueue: queue,
maintenancePolicy: policy,
}
// Initialize type conversion maps
integration.initializeTypeMaps()
// Register all tasks
integration.registerAllTasks()
return integration
}
// initializeTypeMaps creates the type conversion maps for dynamic conversion
func (s *MaintenanceIntegration) initializeTypeMaps() {
// Initialize empty maps
s.taskTypeMap = make(map[types.TaskType]MaintenanceTaskType)
s.revTaskTypeMap = make(map[MaintenanceTaskType]types.TaskType)
// Build task type mappings dynamically from registered tasks after registration
// This will be called from registerAllTasks() after all tasks are registered
// Priority mappings (these are static and don't depend on registered tasks)
s.priorityMap = map[types.TaskPriority]MaintenanceTaskPriority{
types.TaskPriorityLow: PriorityLow,
types.TaskPriorityNormal: PriorityNormal,
types.TaskPriorityHigh: PriorityHigh,
}
// Reverse priority mappings
s.revPriorityMap = map[MaintenanceTaskPriority]types.TaskPriority{
PriorityLow: types.TaskPriorityLow,
PriorityNormal: types.TaskPriorityNormal,
PriorityHigh: types.TaskPriorityHigh,
PriorityCritical: types.TaskPriorityHigh, // Map critical to high
}
}
// buildTaskTypeMappings dynamically builds task type mappings from registered tasks
func (s *MaintenanceIntegration) buildTaskTypeMappings() {
// Clear existing mappings
s.taskTypeMap = make(map[types.TaskType]MaintenanceTaskType)
s.revTaskTypeMap = make(map[MaintenanceTaskType]types.TaskType)
// Build mappings from registered detectors
for workerTaskType := range s.taskRegistry.GetAllDetectors() {
// Convert types.TaskType to MaintenanceTaskType by string conversion
maintenanceTaskType := MaintenanceTaskType(string(workerTaskType))
s.taskTypeMap[workerTaskType] = maintenanceTaskType
s.revTaskTypeMap[maintenanceTaskType] = workerTaskType
glog.V(3).Infof("Dynamically mapped task type: %s <-> %s", workerTaskType, maintenanceTaskType)
}
glog.V(2).Infof("Built %d dynamic task type mappings", len(s.taskTypeMap))
}
// registerAllTasks registers all available tasks
func (s *MaintenanceIntegration) registerAllTasks() {
// Tasks are already auto-registered via import statements
// No manual registration needed
// Build dynamic type mappings from registered tasks
s.buildTaskTypeMappings()
// Configure tasks from policy
s.configureTasksFromPolicy()
registeredTaskTypes := make([]string, 0, len(s.taskTypeMap))
for _, maintenanceTaskType := range s.taskTypeMap {
registeredTaskTypes = append(registeredTaskTypes, string(maintenanceTaskType))
}
glog.V(1).Infof("Registered tasks: %v", registeredTaskTypes)
}
// configureTasksFromPolicy dynamically configures all registered tasks based on the maintenance policy
func (s *MaintenanceIntegration) configureTasksFromPolicy() {
if s.maintenancePolicy == nil {
return
}
// Configure all registered detectors and schedulers dynamically using policy configuration
configuredCount := 0
// Get all registered task types from the registry
for taskType, detector := range s.taskRegistry.GetAllDetectors() {
// Configure detector using policy-based configuration
s.configureDetectorFromPolicy(taskType, detector)
configuredCount++
}
for taskType, scheduler := range s.taskRegistry.GetAllSchedulers() {
// Configure scheduler using policy-based configuration
s.configureSchedulerFromPolicy(taskType, scheduler)
}
glog.V(1).Infof("Dynamically configured %d task types from maintenance policy", configuredCount)
}
// configureDetectorFromPolicy configures a detector using policy-based configuration
func (s *MaintenanceIntegration) configureDetectorFromPolicy(taskType types.TaskType, detector types.TaskDetector) {
// Try to configure using PolicyConfigurableDetector interface if supported
if configurableDetector, ok := detector.(types.PolicyConfigurableDetector); ok {
configurableDetector.ConfigureFromPolicy(s.maintenancePolicy)
glog.V(2).Infof("Configured detector %s using policy interface", taskType)
return
}
// Apply basic configuration that all detectors should support
if basicDetector, ok := detector.(interface{ SetEnabled(bool) }); ok {
// Convert task system type to maintenance task type for policy lookup
maintenanceTaskType, exists := s.taskTypeMap[taskType]
if exists {
enabled := s.maintenancePolicy.IsTaskEnabled(maintenanceTaskType)
basicDetector.SetEnabled(enabled)
glog.V(3).Infof("Set enabled=%v for detector %s", enabled, taskType)
}
}
// For detectors that don't implement PolicyConfigurableDetector interface,
// they should be updated to implement it for full policy-based configuration
glog.V(2).Infof("Detector %s should implement PolicyConfigurableDetector interface for full policy support", taskType)
}
// configureSchedulerFromPolicy configures a scheduler using policy-based configuration
func (s *MaintenanceIntegration) configureSchedulerFromPolicy(taskType types.TaskType, scheduler types.TaskScheduler) {
// Try to configure using PolicyConfigurableScheduler interface if supported
if configurableScheduler, ok := scheduler.(types.PolicyConfigurableScheduler); ok {
configurableScheduler.ConfigureFromPolicy(s.maintenancePolicy)
glog.V(2).Infof("Configured scheduler %s using policy interface", taskType)
return
}
// Apply basic configuration that all schedulers should support
maintenanceTaskType, exists := s.taskTypeMap[taskType]
if !exists {
glog.V(3).Infof("No maintenance task type mapping for %s, skipping configuration", taskType)
return
}
// Set enabled status if scheduler supports it
if enableableScheduler, ok := scheduler.(interface{ SetEnabled(bool) }); ok {
enabled := s.maintenancePolicy.IsTaskEnabled(maintenanceTaskType)
enableableScheduler.SetEnabled(enabled)
glog.V(3).Infof("Set enabled=%v for scheduler %s", enabled, taskType)
}
// Set max concurrent if scheduler supports it
if concurrentScheduler, ok := scheduler.(interface{ SetMaxConcurrent(int) }); ok {
maxConcurrent := s.maintenancePolicy.GetMaxConcurrent(maintenanceTaskType)
if maxConcurrent > 0 {
concurrentScheduler.SetMaxConcurrent(maxConcurrent)
glog.V(3).Infof("Set max concurrent=%d for scheduler %s", maxConcurrent, taskType)
}
}
// For schedulers that don't implement PolicyConfigurableScheduler interface,
// they should be updated to implement it for full policy-based configuration
glog.V(2).Infof("Scheduler %s should implement PolicyConfigurableScheduler interface for full policy support", taskType)
}
// ScanWithTaskDetectors performs a scan using the task system
func (s *MaintenanceIntegration) ScanWithTaskDetectors(volumeMetrics []*types.VolumeHealthMetrics) ([]*TaskDetectionResult, error) {
var allResults []*TaskDetectionResult
// Create cluster info
clusterInfo := &types.ClusterInfo{
TotalVolumes: len(volumeMetrics),
LastUpdated: time.Now(),
}
// Run detection for each registered task type
for taskType, detector := range s.taskRegistry.GetAllDetectors() {
if !detector.IsEnabled() {
continue
}
glog.V(2).Infof("Running detection for task type: %s", taskType)
results, err := detector.ScanForTasks(volumeMetrics, clusterInfo)
if err != nil {
glog.Errorf("Failed to scan for %s tasks: %v", taskType, err)
continue
}
// Convert results to existing system format
for _, result := range results {
existingResult := s.convertToExistingFormat(result)
if existingResult != nil {
allResults = append(allResults, existingResult)
}
}
glog.V(2).Infof("Found %d %s tasks", len(results), taskType)
}
return allResults, nil
}
// convertToExistingFormat converts task results to existing system format using dynamic mapping
func (s *MaintenanceIntegration) convertToExistingFormat(result *types.TaskDetectionResult) *TaskDetectionResult {
// Convert types using mapping tables
existingType, exists := s.taskTypeMap[result.TaskType]
if !exists {
glog.Warningf("Unknown task type %s, skipping conversion", result.TaskType)
// Return nil to indicate conversion failed - caller should handle this
return nil
}
existingPriority, exists := s.priorityMap[result.Priority]
if !exists {
glog.Warningf("Unknown priority %d, defaulting to normal", result.Priority)
existingPriority = PriorityNormal
}
return &TaskDetectionResult{
TaskType: existingType,
VolumeID: result.VolumeID,
Server: result.Server,
Collection: result.Collection,
Priority: existingPriority,
Reason: result.Reason,
Parameters: result.Parameters,
ScheduleAt: result.ScheduleAt,
}
}
// CanScheduleWithTaskSchedulers determines if a task can be scheduled using task schedulers with dynamic type conversion
func (s *MaintenanceIntegration) CanScheduleWithTaskSchedulers(task *MaintenanceTask, runningTasks []*MaintenanceTask, availableWorkers []*MaintenanceWorker) bool {
// Convert existing types to task types using mapping
taskType, exists := s.revTaskTypeMap[task.Type]
if !exists {
glog.V(2).Infof("Unknown task type %s for scheduling, falling back to existing logic", task.Type)
return false // Fallback to existing logic for unknown types
}
// Convert task objects
taskObject := s.convertTaskToTaskSystem(task)
if taskObject == nil {
glog.V(2).Infof("Failed to convert task %s for scheduling", task.ID)
return false
}
runningTaskObjects := s.convertTasksToTaskSystem(runningTasks)
workerObjects := s.convertWorkersToTaskSystem(availableWorkers)
// Get the appropriate scheduler
scheduler := s.taskRegistry.GetScheduler(taskType)
if scheduler == nil {
glog.V(2).Infof("No scheduler found for task type %s", taskType)
return false
}
return scheduler.CanScheduleNow(taskObject, runningTaskObjects, workerObjects)
}
// convertTaskToTaskSystem converts existing task to task system format using dynamic mapping
func (s *MaintenanceIntegration) convertTaskToTaskSystem(task *MaintenanceTask) *types.Task {
// Convert task type using mapping
taskType, exists := s.revTaskTypeMap[task.Type]
if !exists {
glog.Errorf("Unknown task type %s in conversion, cannot convert task", task.Type)
// Return nil to indicate conversion failed
return nil
}
// Convert priority using mapping
priority, exists := s.revPriorityMap[task.Priority]
if !exists {
glog.Warningf("Unknown priority %d in conversion, defaulting to normal", task.Priority)
priority = types.TaskPriorityNormal
}
return &types.Task{
ID: task.ID,
Type: taskType,
Priority: priority,
VolumeID: task.VolumeID,
Server: task.Server,
Collection: task.Collection,
Parameters: task.Parameters,
CreatedAt: task.CreatedAt,
}
}
// convertTasksToTaskSystem converts multiple tasks
func (s *MaintenanceIntegration) convertTasksToTaskSystem(tasks []*MaintenanceTask) []*types.Task {
var result []*types.Task
for _, task := range tasks {
converted := s.convertTaskToTaskSystem(task)
if converted != nil {
result = append(result, converted)
}
}
return result
}
// convertWorkersToTaskSystem converts workers to task system format using dynamic mapping
func (s *MaintenanceIntegration) convertWorkersToTaskSystem(workers []*MaintenanceWorker) []*types.Worker {
var result []*types.Worker
for _, worker := range workers {
capabilities := make([]types.TaskType, 0, len(worker.Capabilities))
for _, cap := range worker.Capabilities {
// Convert capability using mapping
taskType, exists := s.revTaskTypeMap[cap]
if exists {
capabilities = append(capabilities, taskType)
} else {
glog.V(3).Infof("Unknown capability %s for worker %s, skipping", cap, worker.ID)
}
}
result = append(result, &types.Worker{
ID: worker.ID,
Address: worker.Address,
Capabilities: capabilities,
MaxConcurrent: worker.MaxConcurrent,
CurrentLoad: worker.CurrentLoad,
})
}
return result
}
// GetTaskScheduler returns the scheduler for a task type using dynamic mapping
func (s *MaintenanceIntegration) GetTaskScheduler(taskType MaintenanceTaskType) types.TaskScheduler {
// Convert task type using mapping
taskSystemType, exists := s.revTaskTypeMap[taskType]
if !exists {
glog.V(3).Infof("Unknown task type %s for scheduler", taskType)
return nil
}
return s.taskRegistry.GetScheduler(taskSystemType)
}
// GetUIProvider returns the UI provider for a task type using dynamic mapping
func (s *MaintenanceIntegration) GetUIProvider(taskType MaintenanceTaskType) types.TaskUIProvider {
// Convert task type using mapping
taskSystemType, exists := s.revTaskTypeMap[taskType]
if !exists {
glog.V(3).Infof("Unknown task type %s for UI provider", taskType)
return nil
}
return s.uiRegistry.GetProvider(taskSystemType)
}
// GetAllTaskStats returns stats for all registered tasks
func (s *MaintenanceIntegration) GetAllTaskStats() []*types.TaskStats {
var stats []*types.TaskStats
for taskType, detector := range s.taskRegistry.GetAllDetectors() {
uiProvider := s.uiRegistry.GetProvider(taskType)
if uiProvider == nil {
continue
}
stat := &types.TaskStats{
TaskType: taskType,
DisplayName: uiProvider.GetDisplayName(),
Enabled: detector.IsEnabled(),
LastScan: time.Now().Add(-detector.ScanInterval()),
NextScan: time.Now().Add(detector.ScanInterval()),
ScanInterval: detector.ScanInterval(),
MaxConcurrent: s.taskRegistry.GetScheduler(taskType).GetMaxConcurrent(),
// Would need to get these from actual queue/stats
PendingTasks: 0,
RunningTasks: 0,
CompletedToday: 0,
FailedToday: 0,
}
stats = append(stats, stat)
}
return stats
}

View File

@@ -0,0 +1,407 @@
package maintenance
import (
"fmt"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
)
// MaintenanceManager coordinates the maintenance system
type MaintenanceManager struct {
config *MaintenanceConfig
scanner *MaintenanceScanner
queue *MaintenanceQueue
adminClient AdminClient
running bool
stopChan chan struct{}
// Error handling and backoff
errorCount int
lastError error
lastErrorTime time.Time
backoffDelay time.Duration
mutex sync.RWMutex
}
// NewMaintenanceManager creates a new maintenance manager
func NewMaintenanceManager(adminClient AdminClient, config *MaintenanceConfig) *MaintenanceManager {
if config == nil {
config = DefaultMaintenanceConfig()
}
queue := NewMaintenanceQueue(config.Policy)
scanner := NewMaintenanceScanner(adminClient, config.Policy, queue)
return &MaintenanceManager{
config: config,
scanner: scanner,
queue: queue,
adminClient: adminClient,
stopChan: make(chan struct{}),
backoffDelay: time.Second, // Start with 1 second backoff
}
}
// Start begins the maintenance manager
func (mm *MaintenanceManager) Start() error {
if !mm.config.Enabled {
glog.V(1).Infof("Maintenance system is disabled")
return nil
}
// Validate configuration durations to prevent ticker panics
if err := mm.validateConfig(); err != nil {
return fmt.Errorf("invalid maintenance configuration: %v", err)
}
mm.running = true
// Start background processes
go mm.scanLoop()
go mm.cleanupLoop()
glog.Infof("Maintenance manager started with scan interval %ds", mm.config.ScanIntervalSeconds)
return nil
}
// validateConfig validates the maintenance configuration durations
func (mm *MaintenanceManager) validateConfig() error {
if mm.config.ScanIntervalSeconds <= 0 {
glog.Warningf("Invalid scan interval %ds, using default 30m", mm.config.ScanIntervalSeconds)
mm.config.ScanIntervalSeconds = 30 * 60 // 30 minutes in seconds
}
if mm.config.CleanupIntervalSeconds <= 0 {
glog.Warningf("Invalid cleanup interval %ds, using default 24h", mm.config.CleanupIntervalSeconds)
mm.config.CleanupIntervalSeconds = 24 * 60 * 60 // 24 hours in seconds
}
if mm.config.WorkerTimeoutSeconds <= 0 {
glog.Warningf("Invalid worker timeout %ds, using default 5m", mm.config.WorkerTimeoutSeconds)
mm.config.WorkerTimeoutSeconds = 5 * 60 // 5 minutes in seconds
}
if mm.config.TaskTimeoutSeconds <= 0 {
glog.Warningf("Invalid task timeout %ds, using default 2h", mm.config.TaskTimeoutSeconds)
mm.config.TaskTimeoutSeconds = 2 * 60 * 60 // 2 hours in seconds
}
if mm.config.RetryDelaySeconds <= 0 {
glog.Warningf("Invalid retry delay %ds, using default 15m", mm.config.RetryDelaySeconds)
mm.config.RetryDelaySeconds = 15 * 60 // 15 minutes in seconds
}
if mm.config.TaskRetentionSeconds <= 0 {
glog.Warningf("Invalid task retention %ds, using default 168h", mm.config.TaskRetentionSeconds)
mm.config.TaskRetentionSeconds = 7 * 24 * 60 * 60 // 7 days in seconds
}
return nil
}
// IsRunning returns whether the maintenance manager is currently running
func (mm *MaintenanceManager) IsRunning() bool {
return mm.running
}
// Stop terminates the maintenance manager
func (mm *MaintenanceManager) Stop() {
mm.running = false
close(mm.stopChan)
glog.Infof("Maintenance manager stopped")
}
// scanLoop periodically scans for maintenance tasks with adaptive timing
func (mm *MaintenanceManager) scanLoop() {
scanInterval := time.Duration(mm.config.ScanIntervalSeconds) * time.Second
ticker := time.NewTicker(scanInterval)
defer ticker.Stop()
for mm.running {
select {
case <-mm.stopChan:
return
case <-ticker.C:
glog.V(1).Infof("Performing maintenance scan every %v", scanInterval)
mm.performScan()
// Adjust ticker interval based on error state
mm.mutex.RLock()
currentInterval := scanInterval
if mm.errorCount > 0 {
// Use backoff delay when there are errors
currentInterval = mm.backoffDelay
if currentInterval > scanInterval {
// Don't make it longer than the configured interval * 10
maxInterval := scanInterval * 10
if currentInterval > maxInterval {
currentInterval = maxInterval
}
}
}
mm.mutex.RUnlock()
// Reset ticker with new interval if needed
if currentInterval != scanInterval {
ticker.Stop()
ticker = time.NewTicker(currentInterval)
}
}
}
}
// cleanupLoop periodically cleans up old tasks and stale workers
func (mm *MaintenanceManager) cleanupLoop() {
cleanupInterval := time.Duration(mm.config.CleanupIntervalSeconds) * time.Second
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
for mm.running {
select {
case <-mm.stopChan:
return
case <-ticker.C:
mm.performCleanup()
}
}
}
// performScan executes a maintenance scan with error handling and backoff
func (mm *MaintenanceManager) performScan() {
mm.mutex.Lock()
defer mm.mutex.Unlock()
glog.V(2).Infof("Starting maintenance scan")
results, err := mm.scanner.ScanForMaintenanceTasks()
if err != nil {
mm.handleScanError(err)
return
}
// Scan succeeded, reset error tracking
mm.resetErrorTracking()
if len(results) > 0 {
mm.queue.AddTasksFromResults(results)
glog.V(1).Infof("Maintenance scan completed: added %d tasks", len(results))
} else {
glog.V(2).Infof("Maintenance scan completed: no tasks needed")
}
}
// handleScanError handles scan errors with exponential backoff and reduced logging
func (mm *MaintenanceManager) handleScanError(err error) {
now := time.Now()
mm.errorCount++
mm.lastError = err
mm.lastErrorTime = now
// Use exponential backoff with jitter
if mm.errorCount > 1 {
mm.backoffDelay = mm.backoffDelay * 2
if mm.backoffDelay > 5*time.Minute {
mm.backoffDelay = 5 * time.Minute // Cap at 5 minutes
}
}
// Reduce log frequency based on error count and time
shouldLog := false
if mm.errorCount <= 3 {
// Log first 3 errors immediately
shouldLog = true
} else if mm.errorCount <= 10 && mm.errorCount%3 == 0 {
// Log every 3rd error for errors 4-10
shouldLog = true
} else if mm.errorCount%10 == 0 {
// Log every 10th error after that
shouldLog = true
}
if shouldLog {
// Check if it's a connection error to provide better messaging
if isConnectionError(err) {
if mm.errorCount == 1 {
glog.Errorf("Maintenance scan failed: %v (will retry with backoff)", err)
} else {
glog.Errorf("Maintenance scan still failing after %d attempts: %v (backoff: %v)",
mm.errorCount, err, mm.backoffDelay)
}
} else {
glog.Errorf("Maintenance scan failed: %v", err)
}
} else {
// Use debug level for suppressed errors
glog.V(3).Infof("Maintenance scan failed (error #%d, suppressed): %v", mm.errorCount, err)
}
}
// resetErrorTracking resets error tracking when scan succeeds
func (mm *MaintenanceManager) resetErrorTracking() {
if mm.errorCount > 0 {
glog.V(1).Infof("Maintenance scan recovered after %d failed attempts", mm.errorCount)
mm.errorCount = 0
mm.lastError = nil
mm.backoffDelay = time.Second // Reset to initial delay
}
}
// isConnectionError checks if the error is a connection-related error
func isConnectionError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
return strings.Contains(errStr, "connection refused") ||
strings.Contains(errStr, "connection error") ||
strings.Contains(errStr, "dial tcp") ||
strings.Contains(errStr, "connection timeout") ||
strings.Contains(errStr, "no route to host") ||
strings.Contains(errStr, "network unreachable")
}
// performCleanup cleans up old tasks and stale workers
func (mm *MaintenanceManager) performCleanup() {
glog.V(2).Infof("Starting maintenance cleanup")
taskRetention := time.Duration(mm.config.TaskRetentionSeconds) * time.Second
workerTimeout := time.Duration(mm.config.WorkerTimeoutSeconds) * time.Second
removedTasks := mm.queue.CleanupOldTasks(taskRetention)
removedWorkers := mm.queue.RemoveStaleWorkers(workerTimeout)
if removedTasks > 0 || removedWorkers > 0 {
glog.V(1).Infof("Cleanup completed: removed %d old tasks and %d stale workers", removedTasks, removedWorkers)
}
}
// GetQueue returns the maintenance queue
func (mm *MaintenanceManager) GetQueue() *MaintenanceQueue {
return mm.queue
}
// GetConfig returns the maintenance configuration
func (mm *MaintenanceManager) GetConfig() *MaintenanceConfig {
return mm.config
}
// GetStats returns maintenance statistics
func (mm *MaintenanceManager) GetStats() *MaintenanceStats {
stats := mm.queue.GetStats()
mm.mutex.RLock()
defer mm.mutex.RUnlock()
stats.LastScanTime = time.Now() // Would need to track this properly
// Calculate next scan time based on current error state
scanInterval := time.Duration(mm.config.ScanIntervalSeconds) * time.Second
nextScanInterval := scanInterval
if mm.errorCount > 0 {
nextScanInterval = mm.backoffDelay
maxInterval := scanInterval * 10
if nextScanInterval > maxInterval {
nextScanInterval = maxInterval
}
}
stats.NextScanTime = time.Now().Add(nextScanInterval)
return stats
}
// GetErrorState returns the current error state for monitoring
func (mm *MaintenanceManager) GetErrorState() (errorCount int, lastError error, backoffDelay time.Duration) {
mm.mutex.RLock()
defer mm.mutex.RUnlock()
return mm.errorCount, mm.lastError, mm.backoffDelay
}
// GetTasks returns tasks with filtering
func (mm *MaintenanceManager) GetTasks(status MaintenanceTaskStatus, taskType MaintenanceTaskType, limit int) []*MaintenanceTask {
return mm.queue.GetTasks(status, taskType, limit)
}
// GetWorkers returns all registered workers
func (mm *MaintenanceManager) GetWorkers() []*MaintenanceWorker {
return mm.queue.GetWorkers()
}
// TriggerScan manually triggers a maintenance scan
func (mm *MaintenanceManager) TriggerScan() error {
if !mm.running {
return fmt.Errorf("maintenance manager is not running")
}
go mm.performScan()
return nil
}
// UpdateConfig updates the maintenance configuration
func (mm *MaintenanceManager) UpdateConfig(config *MaintenanceConfig) error {
if config == nil {
return fmt.Errorf("config cannot be nil")
}
mm.config = config
mm.queue.policy = config.Policy
mm.scanner.policy = config.Policy
glog.V(1).Infof("Maintenance configuration updated")
return nil
}
// CancelTask cancels a pending task
func (mm *MaintenanceManager) CancelTask(taskID string) error {
mm.queue.mutex.Lock()
defer mm.queue.mutex.Unlock()
task, exists := mm.queue.tasks[taskID]
if !exists {
return fmt.Errorf("task %s not found", taskID)
}
if task.Status == TaskStatusPending {
task.Status = TaskStatusCancelled
task.CompletedAt = &[]time.Time{time.Now()}[0]
// Remove from pending tasks
for i, pendingTask := range mm.queue.pendingTasks {
if pendingTask.ID == taskID {
mm.queue.pendingTasks = append(mm.queue.pendingTasks[:i], mm.queue.pendingTasks[i+1:]...)
break
}
}
glog.V(2).Infof("Cancelled task %s", taskID)
return nil
}
return fmt.Errorf("task %s cannot be cancelled (status: %s)", taskID, task.Status)
}
// RegisterWorker registers a new worker
func (mm *MaintenanceManager) RegisterWorker(worker *MaintenanceWorker) {
mm.queue.RegisterWorker(worker)
}
// GetNextTask returns the next task for a worker
func (mm *MaintenanceManager) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask {
return mm.queue.GetNextTask(workerID, capabilities)
}
// CompleteTask marks a task as completed
func (mm *MaintenanceManager) CompleteTask(taskID string, error string) {
mm.queue.CompleteTask(taskID, error)
}
// UpdateTaskProgress updates task progress
func (mm *MaintenanceManager) UpdateTaskProgress(taskID string, progress float64) {
mm.queue.UpdateTaskProgress(taskID, progress)
}
// UpdateWorkerHeartbeat updates worker heartbeat
func (mm *MaintenanceManager) UpdateWorkerHeartbeat(workerID string) {
mm.queue.UpdateWorkerHeartbeat(workerID)
}

View File

@@ -0,0 +1,140 @@
package maintenance
import (
"errors"
"testing"
"time"
)
func TestMaintenanceManager_ErrorHandling(t *testing.T) {
config := DefaultMaintenanceConfig()
config.ScanIntervalSeconds = 1 // Short interval for testing (1 second)
manager := NewMaintenanceManager(nil, config)
// Test initial state
if manager.errorCount != 0 {
t.Errorf("Expected initial error count to be 0, got %d", manager.errorCount)
}
if manager.backoffDelay != time.Second {
t.Errorf("Expected initial backoff delay to be 1s, got %v", manager.backoffDelay)
}
// Test error handling
err := errors.New("dial tcp [::1]:19333: connect: connection refused")
manager.handleScanError(err)
if manager.errorCount != 1 {
t.Errorf("Expected error count to be 1, got %d", manager.errorCount)
}
if manager.lastError != err {
t.Errorf("Expected last error to be set")
}
// Test exponential backoff
initialDelay := manager.backoffDelay
manager.handleScanError(err)
if manager.backoffDelay != initialDelay*2 {
t.Errorf("Expected backoff delay to double, got %v", manager.backoffDelay)
}
if manager.errorCount != 2 {
t.Errorf("Expected error count to be 2, got %d", manager.errorCount)
}
// Test backoff cap
for i := 0; i < 10; i++ {
manager.handleScanError(err)
}
if manager.backoffDelay > 5*time.Minute {
t.Errorf("Expected backoff delay to be capped at 5 minutes, got %v", manager.backoffDelay)
}
// Test error reset
manager.resetErrorTracking()
if manager.errorCount != 0 {
t.Errorf("Expected error count to be reset to 0, got %d", manager.errorCount)
}
if manager.backoffDelay != time.Second {
t.Errorf("Expected backoff delay to be reset to 1s, got %v", manager.backoffDelay)
}
if manager.lastError != nil {
t.Errorf("Expected last error to be reset to nil")
}
}
func TestIsConnectionError(t *testing.T) {
tests := []struct {
err error
expected bool
}{
{nil, false},
{errors.New("connection refused"), true},
{errors.New("dial tcp [::1]:19333: connect: connection refused"), true},
{errors.New("connection error: desc = \"transport: Error while dialing\""), true},
{errors.New("connection timeout"), true},
{errors.New("no route to host"), true},
{errors.New("network unreachable"), true},
{errors.New("some other error"), false},
{errors.New("invalid argument"), false},
}
for _, test := range tests {
result := isConnectionError(test.err)
if result != test.expected {
t.Errorf("For error %v, expected %v, got %v", test.err, test.expected, result)
}
}
}
func TestMaintenanceManager_GetErrorState(t *testing.T) {
config := DefaultMaintenanceConfig()
manager := NewMaintenanceManager(nil, config)
// Test initial state
errorCount, lastError, backoffDelay := manager.GetErrorState()
if errorCount != 0 || lastError != nil || backoffDelay != time.Second {
t.Errorf("Expected initial state to be clean")
}
// Add some errors
err := errors.New("test error")
manager.handleScanError(err)
manager.handleScanError(err)
errorCount, lastError, backoffDelay = manager.GetErrorState()
if errorCount != 2 || lastError != err || backoffDelay != 2*time.Second {
t.Errorf("Expected error state to be tracked correctly: count=%d, err=%v, delay=%v",
errorCount, lastError, backoffDelay)
}
}
func TestMaintenanceManager_LogThrottling(t *testing.T) {
config := DefaultMaintenanceConfig()
manager := NewMaintenanceManager(nil, config)
// This is a basic test to ensure the error handling doesn't panic
// In practice, you'd want to capture log output to verify throttling
err := errors.New("test error")
// Generate many errors to test throttling
for i := 0; i < 25; i++ {
manager.handleScanError(err)
}
// Should not panic and should have capped backoff
if manager.backoffDelay > 5*time.Minute {
t.Errorf("Expected backoff to be capped at 5 minutes")
}
if manager.errorCount != 25 {
t.Errorf("Expected error count to be 25, got %d", manager.errorCount)
}
}

View File

@@ -0,0 +1,500 @@
package maintenance
import (
"sort"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
)
// NewMaintenanceQueue creates a new maintenance queue
func NewMaintenanceQueue(policy *MaintenancePolicy) *MaintenanceQueue {
queue := &MaintenanceQueue{
tasks: make(map[string]*MaintenanceTask),
workers: make(map[string]*MaintenanceWorker),
pendingTasks: make([]*MaintenanceTask, 0),
policy: policy,
}
return queue
}
// SetIntegration sets the integration reference
func (mq *MaintenanceQueue) SetIntegration(integration *MaintenanceIntegration) {
mq.integration = integration
glog.V(1).Infof("Maintenance queue configured with integration")
}
// AddTask adds a new maintenance task to the queue
func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) {
mq.mutex.Lock()
defer mq.mutex.Unlock()
task.ID = generateTaskID()
task.Status = TaskStatusPending
task.CreatedAt = time.Now()
task.MaxRetries = 3 // Default retry count
mq.tasks[task.ID] = task
mq.pendingTasks = append(mq.pendingTasks, task)
// Sort pending tasks by priority and schedule time
sort.Slice(mq.pendingTasks, func(i, j int) bool {
if mq.pendingTasks[i].Priority != mq.pendingTasks[j].Priority {
return mq.pendingTasks[i].Priority > mq.pendingTasks[j].Priority
}
return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt)
})
glog.V(2).Infof("Added maintenance task %s: %s for volume %d", task.ID, task.Type, task.VolumeID)
}
// AddTasksFromResults converts detection results to tasks and adds them to the queue
func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult) {
for _, result := range results {
task := &MaintenanceTask{
Type: result.TaskType,
Priority: result.Priority,
VolumeID: result.VolumeID,
Server: result.Server,
Collection: result.Collection,
Parameters: result.Parameters,
Reason: result.Reason,
ScheduledAt: result.ScheduleAt,
}
mq.AddTask(task)
}
}
// GetNextTask returns the next available task for a worker
func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask {
mq.mutex.Lock()
defer mq.mutex.Unlock()
worker, exists := mq.workers[workerID]
if !exists {
return nil
}
// Check if worker has capacity
if worker.CurrentLoad >= worker.MaxConcurrent {
return nil
}
now := time.Now()
// Find the next suitable task
for i, task := range mq.pendingTasks {
// Check if it's time to execute the task
if task.ScheduledAt.After(now) {
continue
}
// Check if worker can handle this task type
if !mq.workerCanHandle(task.Type, capabilities) {
continue
}
// Check scheduling logic - use simplified system if available, otherwise fallback
if !mq.canScheduleTaskNow(task) {
continue
}
// Assign task to worker
task.Status = TaskStatusAssigned
task.WorkerID = workerID
startTime := now
task.StartedAt = &startTime
// Remove from pending tasks
mq.pendingTasks = append(mq.pendingTasks[:i], mq.pendingTasks[i+1:]...)
// Update worker
worker.CurrentTask = task
worker.CurrentLoad++
worker.Status = "busy"
glog.V(2).Infof("Assigned task %s to worker %s", task.ID, workerID)
return task
}
return nil
}
// CompleteTask marks a task as completed
func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) {
mq.mutex.Lock()
defer mq.mutex.Unlock()
task, exists := mq.tasks[taskID]
if !exists {
return
}
completedTime := time.Now()
task.CompletedAt = &completedTime
if error != "" {
task.Status = TaskStatusFailed
task.Error = error
// Check if task should be retried
if task.RetryCount < task.MaxRetries {
task.RetryCount++
task.Status = TaskStatusPending
task.WorkerID = ""
task.StartedAt = nil
task.CompletedAt = nil
task.Error = ""
task.ScheduledAt = time.Now().Add(15 * time.Minute) // Retry delay
mq.pendingTasks = append(mq.pendingTasks, task)
glog.V(2).Infof("Retrying task %s (attempt %d/%d)", taskID, task.RetryCount, task.MaxRetries)
} else {
glog.Errorf("Task %s failed permanently after %d retries: %s", taskID, task.MaxRetries, error)
}
} else {
task.Status = TaskStatusCompleted
task.Progress = 100
glog.V(2).Infof("Task %s completed successfully", taskID)
}
// Update worker
if task.WorkerID != "" {
if worker, exists := mq.workers[task.WorkerID]; exists {
worker.CurrentTask = nil
worker.CurrentLoad--
if worker.CurrentLoad == 0 {
worker.Status = "active"
}
}
}
}
// UpdateTaskProgress updates the progress of a running task
func (mq *MaintenanceQueue) UpdateTaskProgress(taskID string, progress float64) {
mq.mutex.RLock()
defer mq.mutex.RUnlock()
if task, exists := mq.tasks[taskID]; exists {
task.Progress = progress
task.Status = TaskStatusInProgress
}
}
// RegisterWorker registers a new worker
func (mq *MaintenanceQueue) RegisterWorker(worker *MaintenanceWorker) {
mq.mutex.Lock()
defer mq.mutex.Unlock()
worker.LastHeartbeat = time.Now()
worker.Status = "active"
worker.CurrentLoad = 0
mq.workers[worker.ID] = worker
glog.V(1).Infof("Registered maintenance worker %s at %s", worker.ID, worker.Address)
}
// UpdateWorkerHeartbeat updates worker heartbeat
func (mq *MaintenanceQueue) UpdateWorkerHeartbeat(workerID string) {
mq.mutex.Lock()
defer mq.mutex.Unlock()
if worker, exists := mq.workers[workerID]; exists {
worker.LastHeartbeat = time.Now()
}
}
// GetRunningTaskCount returns the number of running tasks of a specific type
func (mq *MaintenanceQueue) GetRunningTaskCount(taskType MaintenanceTaskType) int {
mq.mutex.RLock()
defer mq.mutex.RUnlock()
count := 0
for _, task := range mq.tasks {
if task.Type == taskType && (task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress) {
count++
}
}
return count
}
// WasTaskRecentlyCompleted checks if a similar task was recently completed
func (mq *MaintenanceQueue) WasTaskRecentlyCompleted(taskType MaintenanceTaskType, volumeID uint32, server string, now time.Time) bool {
mq.mutex.RLock()
defer mq.mutex.RUnlock()
// Get the repeat prevention interval for this task type
interval := mq.getRepeatPreventionInterval(taskType)
cutoff := now.Add(-interval)
for _, task := range mq.tasks {
if task.Type == taskType &&
task.VolumeID == volumeID &&
task.Server == server &&
task.Status == TaskStatusCompleted &&
task.CompletedAt != nil &&
task.CompletedAt.After(cutoff) {
return true
}
}
return false
}
// getRepeatPreventionInterval returns the interval for preventing task repetition
func (mq *MaintenanceQueue) getRepeatPreventionInterval(taskType MaintenanceTaskType) time.Duration {
// First try to get default from task scheduler
if mq.integration != nil {
if scheduler := mq.integration.GetTaskScheduler(taskType); scheduler != nil {
defaultInterval := scheduler.GetDefaultRepeatInterval()
if defaultInterval > 0 {
glog.V(3).Infof("Using task scheduler default repeat interval for %s: %v", taskType, defaultInterval)
return defaultInterval
}
}
}
// Fallback to policy configuration if no scheduler available or scheduler doesn't provide default
if mq.policy != nil {
repeatIntervalHours := mq.policy.GetRepeatInterval(taskType)
if repeatIntervalHours > 0 {
interval := time.Duration(repeatIntervalHours) * time.Hour
glog.V(3).Infof("Using policy configuration repeat interval for %s: %v", taskType, interval)
return interval
}
}
// Ultimate fallback - but avoid hardcoded values where possible
glog.V(2).Infof("No scheduler or policy configuration found for task type %s, using minimal default: 1h", taskType)
return time.Hour // Minimal safe default
}
// GetTasks returns tasks with optional filtering
func (mq *MaintenanceQueue) GetTasks(status MaintenanceTaskStatus, taskType MaintenanceTaskType, limit int) []*MaintenanceTask {
mq.mutex.RLock()
defer mq.mutex.RUnlock()
var tasks []*MaintenanceTask
for _, task := range mq.tasks {
if status != "" && task.Status != status {
continue
}
if taskType != "" && task.Type != taskType {
continue
}
tasks = append(tasks, task)
if limit > 0 && len(tasks) >= limit {
break
}
}
// Sort by creation time (newest first)
sort.Slice(tasks, func(i, j int) bool {
return tasks[i].CreatedAt.After(tasks[j].CreatedAt)
})
return tasks
}
// GetWorkers returns all registered workers
func (mq *MaintenanceQueue) GetWorkers() []*MaintenanceWorker {
mq.mutex.RLock()
defer mq.mutex.RUnlock()
var workers []*MaintenanceWorker
for _, worker := range mq.workers {
workers = append(workers, worker)
}
return workers
}
// generateTaskID generates a unique ID for tasks
func generateTaskID() string {
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
b := make([]byte, 8)
for i := range b {
b[i] = charset[i%len(charset)]
}
return string(b)
}
// CleanupOldTasks removes old completed and failed tasks
func (mq *MaintenanceQueue) CleanupOldTasks(retention time.Duration) int {
mq.mutex.Lock()
defer mq.mutex.Unlock()
cutoff := time.Now().Add(-retention)
removed := 0
for id, task := range mq.tasks {
if (task.Status == TaskStatusCompleted || task.Status == TaskStatusFailed) &&
task.CompletedAt != nil &&
task.CompletedAt.Before(cutoff) {
delete(mq.tasks, id)
removed++
}
}
glog.V(2).Infof("Cleaned up %d old maintenance tasks", removed)
return removed
}
// RemoveStaleWorkers removes workers that haven't sent heartbeat recently
func (mq *MaintenanceQueue) RemoveStaleWorkers(timeout time.Duration) int {
mq.mutex.Lock()
defer mq.mutex.Unlock()
cutoff := time.Now().Add(-timeout)
removed := 0
for id, worker := range mq.workers {
if worker.LastHeartbeat.Before(cutoff) {
// Mark any assigned tasks as failed
for _, task := range mq.tasks {
if task.WorkerID == id && (task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress) {
task.Status = TaskStatusFailed
task.Error = "Worker became unavailable"
completedTime := time.Now()
task.CompletedAt = &completedTime
}
}
delete(mq.workers, id)
removed++
glog.Warningf("Removed stale maintenance worker %s", id)
}
}
return removed
}
// GetStats returns maintenance statistics
func (mq *MaintenanceQueue) GetStats() *MaintenanceStats {
mq.mutex.RLock()
defer mq.mutex.RUnlock()
stats := &MaintenanceStats{
TotalTasks: len(mq.tasks),
TasksByStatus: make(map[MaintenanceTaskStatus]int),
TasksByType: make(map[MaintenanceTaskType]int),
ActiveWorkers: 0,
}
today := time.Now().Truncate(24 * time.Hour)
var totalDuration time.Duration
var completedTasks int
for _, task := range mq.tasks {
stats.TasksByStatus[task.Status]++
stats.TasksByType[task.Type]++
if task.CompletedAt != nil && task.CompletedAt.After(today) {
if task.Status == TaskStatusCompleted {
stats.CompletedToday++
} else if task.Status == TaskStatusFailed {
stats.FailedToday++
}
if task.StartedAt != nil {
duration := task.CompletedAt.Sub(*task.StartedAt)
totalDuration += duration
completedTasks++
}
}
}
for _, worker := range mq.workers {
if worker.Status == "active" || worker.Status == "busy" {
stats.ActiveWorkers++
}
}
if completedTasks > 0 {
stats.AverageTaskTime = totalDuration / time.Duration(completedTasks)
}
return stats
}
// workerCanHandle checks if a worker can handle a specific task type
func (mq *MaintenanceQueue) workerCanHandle(taskType MaintenanceTaskType, capabilities []MaintenanceTaskType) bool {
for _, capability := range capabilities {
if capability == taskType {
return true
}
}
return false
}
// canScheduleTaskNow determines if a task can be scheduled using task schedulers or fallback logic
func (mq *MaintenanceQueue) canScheduleTaskNow(task *MaintenanceTask) bool {
// Try task scheduling logic first
if mq.integration != nil {
// Get all running tasks and available workers
runningTasks := mq.getRunningTasks()
availableWorkers := mq.getAvailableWorkers()
canSchedule := mq.integration.CanScheduleWithTaskSchedulers(task, runningTasks, availableWorkers)
glog.V(3).Infof("Task scheduler decision for task %s (%s): %v", task.ID, task.Type, canSchedule)
return canSchedule
}
// Fallback to hardcoded logic
return mq.canExecuteTaskType(task.Type)
}
// canExecuteTaskType checks if we can execute more tasks of this type (concurrency limits) - fallback logic
func (mq *MaintenanceQueue) canExecuteTaskType(taskType MaintenanceTaskType) bool {
runningCount := mq.GetRunningTaskCount(taskType)
maxConcurrent := mq.getMaxConcurrentForTaskType(taskType)
return runningCount < maxConcurrent
}
// getMaxConcurrentForTaskType returns the maximum concurrent tasks allowed for a task type
func (mq *MaintenanceQueue) getMaxConcurrentForTaskType(taskType MaintenanceTaskType) int {
// First try to get default from task scheduler
if mq.integration != nil {
if scheduler := mq.integration.GetTaskScheduler(taskType); scheduler != nil {
maxConcurrent := scheduler.GetMaxConcurrent()
if maxConcurrent > 0 {
glog.V(3).Infof("Using task scheduler max concurrent for %s: %d", taskType, maxConcurrent)
return maxConcurrent
}
}
}
// Fallback to policy configuration if no scheduler available or scheduler doesn't provide default
if mq.policy != nil {
maxConcurrent := mq.policy.GetMaxConcurrent(taskType)
if maxConcurrent > 0 {
glog.V(3).Infof("Using policy configuration max concurrent for %s: %d", taskType, maxConcurrent)
return maxConcurrent
}
}
// Ultimate fallback - minimal safe default
glog.V(2).Infof("No scheduler or policy configuration found for task type %s, using minimal default: 1", taskType)
return 1
}
// getRunningTasks returns all currently running tasks
func (mq *MaintenanceQueue) getRunningTasks() []*MaintenanceTask {
var runningTasks []*MaintenanceTask
for _, task := range mq.tasks {
if task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress {
runningTasks = append(runningTasks, task)
}
}
return runningTasks
}
// getAvailableWorkers returns all workers that can take more work
func (mq *MaintenanceQueue) getAvailableWorkers() []*MaintenanceWorker {
var availableWorkers []*MaintenanceWorker
for _, worker := range mq.workers {
if worker.Status == "active" && worker.CurrentLoad < worker.MaxConcurrent {
availableWorkers = append(availableWorkers, worker)
}
}
return availableWorkers
}

View File

@@ -0,0 +1,163 @@
package maintenance
import (
"context"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// NewMaintenanceScanner creates a new maintenance scanner
func NewMaintenanceScanner(adminClient AdminClient, policy *MaintenancePolicy, queue *MaintenanceQueue) *MaintenanceScanner {
scanner := &MaintenanceScanner{
adminClient: adminClient,
policy: policy,
queue: queue,
lastScan: make(map[MaintenanceTaskType]time.Time),
}
// Initialize integration
scanner.integration = NewMaintenanceIntegration(queue, policy)
// Set up bidirectional relationship
queue.SetIntegration(scanner.integration)
glog.V(1).Infof("Initialized maintenance scanner with task system")
return scanner
}
// ScanForMaintenanceTasks analyzes the cluster and generates maintenance tasks
func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult, error) {
// Get volume health metrics
volumeMetrics, err := ms.getVolumeHealthMetrics()
if err != nil {
return nil, fmt.Errorf("failed to get volume health metrics: %v", err)
}
// Use task system for all task types
if ms.integration != nil {
// Convert metrics to task system format
taskMetrics := ms.convertToTaskMetrics(volumeMetrics)
// Use task detection system
results, err := ms.integration.ScanWithTaskDetectors(taskMetrics)
if err != nil {
glog.Errorf("Task scanning failed: %v", err)
return nil, err
}
glog.V(1).Infof("Maintenance scan completed: found %d tasks", len(results))
return results, nil
}
// No integration available
glog.Warningf("No integration available, no tasks will be scheduled")
return []*TaskDetectionResult{}, nil
}
// getVolumeHealthMetrics collects health information for all volumes
func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, error) {
var metrics []*VolumeHealthMetrics
err := ms.adminClient.WithMasterClient(func(client master_pb.SeaweedClient) error {
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
if err != nil {
return err
}
if resp.TopologyInfo == nil {
return nil
}
for _, dc := range resp.TopologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
for _, diskInfo := range node.DiskInfos {
for _, volInfo := range diskInfo.VolumeInfos {
metric := &VolumeHealthMetrics{
VolumeID: volInfo.Id,
Server: node.Id,
Collection: volInfo.Collection,
Size: volInfo.Size,
DeletedBytes: volInfo.DeletedByteCount,
LastModified: time.Unix(int64(volInfo.ModifiedAtSecond), 0),
IsReadOnly: volInfo.ReadOnly,
IsECVolume: false, // Will be determined from volume structure
ReplicaCount: 1, // Will be counted
ExpectedReplicas: int(volInfo.ReplicaPlacement),
}
// Calculate derived metrics
if metric.Size > 0 {
metric.GarbageRatio = float64(metric.DeletedBytes) / float64(metric.Size)
// Calculate fullness ratio (would need volume size limit)
// metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimit)
}
metric.Age = time.Since(metric.LastModified)
metrics = append(metrics, metric)
}
}
}
}
}
return nil
})
if err != nil {
return nil, err
}
// Count actual replicas and identify EC volumes
ms.enrichVolumeMetrics(metrics)
return metrics, nil
}
// enrichVolumeMetrics adds additional information like replica counts
func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*VolumeHealthMetrics) {
// Group volumes by ID to count replicas
volumeGroups := make(map[uint32][]*VolumeHealthMetrics)
for _, metric := range metrics {
volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric)
}
// Update replica counts
for _, group := range volumeGroups {
actualReplicas := len(group)
for _, metric := range group {
metric.ReplicaCount = actualReplicas
}
}
}
// convertToTaskMetrics converts existing volume metrics to task system format
func (ms *MaintenanceScanner) convertToTaskMetrics(metrics []*VolumeHealthMetrics) []*types.VolumeHealthMetrics {
var simplified []*types.VolumeHealthMetrics
for _, metric := range metrics {
simplified = append(simplified, &types.VolumeHealthMetrics{
VolumeID: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
Size: metric.Size,
DeletedBytes: metric.DeletedBytes,
GarbageRatio: metric.GarbageRatio,
LastModified: metric.LastModified,
Age: metric.Age,
ReplicaCount: metric.ReplicaCount,
ExpectedReplicas: metric.ExpectedReplicas,
IsReadOnly: metric.IsReadOnly,
HasRemoteCopy: metric.HasRemoteCopy,
IsECVolume: metric.IsECVolume,
FullnessRatio: metric.FullnessRatio,
})
}
return simplified
}

View File

@@ -0,0 +1,560 @@
package maintenance
import (
"html/template"
"sort"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// AdminClient interface defines what the maintenance system needs from the admin server
type AdminClient interface {
WithMasterClient(fn func(client master_pb.SeaweedClient) error) error
}
// MaintenanceTaskType represents different types of maintenance operations
type MaintenanceTaskType string
// GetRegisteredMaintenanceTaskTypes returns all registered task types as MaintenanceTaskType values
// sorted alphabetically for consistent menu ordering
func GetRegisteredMaintenanceTaskTypes() []MaintenanceTaskType {
typesRegistry := tasks.GetGlobalTypesRegistry()
var taskTypes []MaintenanceTaskType
for workerTaskType := range typesRegistry.GetAllDetectors() {
maintenanceTaskType := MaintenanceTaskType(string(workerTaskType))
taskTypes = append(taskTypes, maintenanceTaskType)
}
// Sort task types alphabetically to ensure consistent menu ordering
sort.Slice(taskTypes, func(i, j int) bool {
return string(taskTypes[i]) < string(taskTypes[j])
})
return taskTypes
}
// GetMaintenanceTaskType returns a specific task type if it's registered, or empty string if not found
func GetMaintenanceTaskType(taskTypeName string) MaintenanceTaskType {
typesRegistry := tasks.GetGlobalTypesRegistry()
for workerTaskType := range typesRegistry.GetAllDetectors() {
if string(workerTaskType) == taskTypeName {
return MaintenanceTaskType(taskTypeName)
}
}
return MaintenanceTaskType("")
}
// IsMaintenanceTaskTypeRegistered checks if a task type is registered
func IsMaintenanceTaskTypeRegistered(taskType MaintenanceTaskType) bool {
typesRegistry := tasks.GetGlobalTypesRegistry()
for workerTaskType := range typesRegistry.GetAllDetectors() {
if string(workerTaskType) == string(taskType) {
return true
}
}
return false
}
// MaintenanceTaskPriority represents task execution priority
type MaintenanceTaskPriority int
const (
PriorityLow MaintenanceTaskPriority = iota
PriorityNormal
PriorityHigh
PriorityCritical
)
// MaintenanceTaskStatus represents the current status of a task
type MaintenanceTaskStatus string
const (
TaskStatusPending MaintenanceTaskStatus = "pending"
TaskStatusAssigned MaintenanceTaskStatus = "assigned"
TaskStatusInProgress MaintenanceTaskStatus = "in_progress"
TaskStatusCompleted MaintenanceTaskStatus = "completed"
TaskStatusFailed MaintenanceTaskStatus = "failed"
TaskStatusCancelled MaintenanceTaskStatus = "cancelled"
)
// MaintenanceTask represents a single maintenance operation
type MaintenanceTask struct {
ID string `json:"id"`
Type MaintenanceTaskType `json:"type"`
Priority MaintenanceTaskPriority `json:"priority"`
Status MaintenanceTaskStatus `json:"status"`
VolumeID uint32 `json:"volume_id,omitempty"`
Server string `json:"server,omitempty"`
Collection string `json:"collection,omitempty"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
Reason string `json:"reason"`
CreatedAt time.Time `json:"created_at"`
ScheduledAt time.Time `json:"scheduled_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
WorkerID string `json:"worker_id,omitempty"`
Error string `json:"error,omitempty"`
Progress float64 `json:"progress"` // 0-100
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
}
// TaskPolicy represents configuration for a specific task type
type TaskPolicy struct {
Enabled bool `json:"enabled"`
MaxConcurrent int `json:"max_concurrent"`
RepeatInterval int `json:"repeat_interval"` // Hours to wait before repeating
CheckInterval int `json:"check_interval"` // Hours between checks
Configuration map[string]interface{} `json:"configuration"` // Task-specific config
}
// MaintenancePolicy defines policies for maintenance operations using a dynamic structure
type MaintenancePolicy struct {
// Task-specific policies mapped by task type
TaskPolicies map[MaintenanceTaskType]*TaskPolicy `json:"task_policies"`
// Global policy settings
GlobalMaxConcurrent int `json:"global_max_concurrent"` // Overall limit across all task types
DefaultRepeatInterval int `json:"default_repeat_interval"` // Default hours if task doesn't specify
DefaultCheckInterval int `json:"default_check_interval"` // Default hours for periodic checks
}
// GetTaskPolicy returns the policy for a specific task type, creating generic defaults if needed
func (mp *MaintenancePolicy) GetTaskPolicy(taskType MaintenanceTaskType) *TaskPolicy {
if mp.TaskPolicies == nil {
mp.TaskPolicies = make(map[MaintenanceTaskType]*TaskPolicy)
}
policy, exists := mp.TaskPolicies[taskType]
if !exists {
// Create generic default policy using global settings - no hardcoded fallbacks
policy = &TaskPolicy{
Enabled: false, // Conservative default - require explicit enabling
MaxConcurrent: 1, // Conservative default concurrency
RepeatInterval: mp.DefaultRepeatInterval, // Use configured default, 0 if not set
CheckInterval: mp.DefaultCheckInterval, // Use configured default, 0 if not set
Configuration: make(map[string]interface{}),
}
mp.TaskPolicies[taskType] = policy
}
return policy
}
// SetTaskPolicy sets the policy for a specific task type
func (mp *MaintenancePolicy) SetTaskPolicy(taskType MaintenanceTaskType, policy *TaskPolicy) {
if mp.TaskPolicies == nil {
mp.TaskPolicies = make(map[MaintenanceTaskType]*TaskPolicy)
}
mp.TaskPolicies[taskType] = policy
}
// IsTaskEnabled returns whether a task type is enabled
func (mp *MaintenancePolicy) IsTaskEnabled(taskType MaintenanceTaskType) bool {
policy := mp.GetTaskPolicy(taskType)
return policy.Enabled
}
// GetMaxConcurrent returns the max concurrent limit for a task type
func (mp *MaintenancePolicy) GetMaxConcurrent(taskType MaintenanceTaskType) int {
policy := mp.GetTaskPolicy(taskType)
return policy.MaxConcurrent
}
// GetRepeatInterval returns the repeat interval for a task type
func (mp *MaintenancePolicy) GetRepeatInterval(taskType MaintenanceTaskType) int {
policy := mp.GetTaskPolicy(taskType)
return policy.RepeatInterval
}
// GetTaskConfig returns a configuration value for a task type
func (mp *MaintenancePolicy) GetTaskConfig(taskType MaintenanceTaskType, key string) (interface{}, bool) {
policy := mp.GetTaskPolicy(taskType)
value, exists := policy.Configuration[key]
return value, exists
}
// SetTaskConfig sets a configuration value for a task type
func (mp *MaintenancePolicy) SetTaskConfig(taskType MaintenanceTaskType, key string, value interface{}) {
policy := mp.GetTaskPolicy(taskType)
if policy.Configuration == nil {
policy.Configuration = make(map[string]interface{})
}
policy.Configuration[key] = value
}
// MaintenanceWorker represents a worker instance
type MaintenanceWorker struct {
ID string `json:"id"`
Address string `json:"address"`
LastHeartbeat time.Time `json:"last_heartbeat"`
Status string `json:"status"` // active, inactive, busy
CurrentTask *MaintenanceTask `json:"current_task,omitempty"`
Capabilities []MaintenanceTaskType `json:"capabilities"`
MaxConcurrent int `json:"max_concurrent"`
CurrentLoad int `json:"current_load"`
}
// MaintenanceQueue manages the task queue and worker coordination
type MaintenanceQueue struct {
tasks map[string]*MaintenanceTask
workers map[string]*MaintenanceWorker
pendingTasks []*MaintenanceTask
mutex sync.RWMutex
policy *MaintenancePolicy
integration *MaintenanceIntegration
}
// MaintenanceScanner analyzes the cluster and generates maintenance tasks
type MaintenanceScanner struct {
adminClient AdminClient
policy *MaintenancePolicy
queue *MaintenanceQueue
lastScan map[MaintenanceTaskType]time.Time
integration *MaintenanceIntegration
}
// TaskDetectionResult represents the result of scanning for maintenance needs
type TaskDetectionResult struct {
TaskType MaintenanceTaskType `json:"task_type"`
VolumeID uint32 `json:"volume_id,omitempty"`
Server string `json:"server,omitempty"`
Collection string `json:"collection,omitempty"`
Priority MaintenanceTaskPriority `json:"priority"`
Reason string `json:"reason"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
ScheduleAt time.Time `json:"schedule_at"`
}
// VolumeHealthMetrics contains health information about a volume
type VolumeHealthMetrics struct {
VolumeID uint32 `json:"volume_id"`
Server string `json:"server"`
Collection string `json:"collection"`
Size uint64 `json:"size"`
DeletedBytes uint64 `json:"deleted_bytes"`
GarbageRatio float64 `json:"garbage_ratio"`
LastModified time.Time `json:"last_modified"`
Age time.Duration `json:"age"`
ReplicaCount int `json:"replica_count"`
ExpectedReplicas int `json:"expected_replicas"`
IsReadOnly bool `json:"is_read_only"`
HasRemoteCopy bool `json:"has_remote_copy"`
IsECVolume bool `json:"is_ec_volume"`
FullnessRatio float64 `json:"fullness_ratio"`
}
// MaintenanceStats provides statistics about maintenance operations
type MaintenanceStats struct {
TotalTasks int `json:"total_tasks"`
TasksByStatus map[MaintenanceTaskStatus]int `json:"tasks_by_status"`
TasksByType map[MaintenanceTaskType]int `json:"tasks_by_type"`
ActiveWorkers int `json:"active_workers"`
CompletedToday int `json:"completed_today"`
FailedToday int `json:"failed_today"`
AverageTaskTime time.Duration `json:"average_task_time"`
LastScanTime time.Time `json:"last_scan_time"`
NextScanTime time.Time `json:"next_scan_time"`
}
// MaintenanceConfig holds configuration for the maintenance system
type MaintenanceConfig struct {
Enabled bool `json:"enabled"`
ScanIntervalSeconds int `json:"scan_interval_seconds"` // How often to scan for maintenance needs (in seconds)
WorkerTimeoutSeconds int `json:"worker_timeout_seconds"` // Worker heartbeat timeout (in seconds)
TaskTimeoutSeconds int `json:"task_timeout_seconds"` // Individual task timeout (in seconds)
RetryDelaySeconds int `json:"retry_delay_seconds"` // Delay between retries (in seconds)
MaxRetries int `json:"max_retries"` // Default max retries for tasks
CleanupIntervalSeconds int `json:"cleanup_interval_seconds"` // How often to clean up old tasks (in seconds)
TaskRetentionSeconds int `json:"task_retention_seconds"` // How long to keep completed/failed tasks (in seconds)
Policy *MaintenancePolicy `json:"policy"`
}
// Default configuration values
func DefaultMaintenanceConfig() *MaintenanceConfig {
return &MaintenanceConfig{
Enabled: false, // Disabled by default for safety
ScanIntervalSeconds: 30 * 60, // 30 minutes
WorkerTimeoutSeconds: 5 * 60, // 5 minutes
TaskTimeoutSeconds: 2 * 60 * 60, // 2 hours
RetryDelaySeconds: 15 * 60, // 15 minutes
MaxRetries: 3,
CleanupIntervalSeconds: 24 * 60 * 60, // 24 hours
TaskRetentionSeconds: 7 * 24 * 60 * 60, // 7 days
Policy: &MaintenancePolicy{
GlobalMaxConcurrent: 4,
DefaultRepeatInterval: 6,
DefaultCheckInterval: 12,
},
}
}
// MaintenanceQueueData represents data for the queue visualization UI
type MaintenanceQueueData struct {
Tasks []*MaintenanceTask `json:"tasks"`
Workers []*MaintenanceWorker `json:"workers"`
Stats *QueueStats `json:"stats"`
LastUpdated time.Time `json:"last_updated"`
}
// QueueStats provides statistics for the queue UI
type QueueStats struct {
PendingTasks int `json:"pending_tasks"`
RunningTasks int `json:"running_tasks"`
CompletedToday int `json:"completed_today"`
FailedToday int `json:"failed_today"`
TotalTasks int `json:"total_tasks"`
}
// MaintenanceConfigData represents configuration data for the UI
type MaintenanceConfigData struct {
Config *MaintenanceConfig `json:"config"`
IsEnabled bool `json:"is_enabled"`
LastScanTime time.Time `json:"last_scan_time"`
NextScanTime time.Time `json:"next_scan_time"`
SystemStats *MaintenanceStats `json:"system_stats"`
MenuItems []*MaintenanceMenuItem `json:"menu_items"`
}
// MaintenanceMenuItem represents a menu item for task configuration
type MaintenanceMenuItem struct {
TaskType MaintenanceTaskType `json:"task_type"`
DisplayName string `json:"display_name"`
Description string `json:"description"`
Icon string `json:"icon"`
IsEnabled bool `json:"is_enabled"`
Path string `json:"path"`
}
// WorkerDetailsData represents detailed worker information
type WorkerDetailsData struct {
Worker *MaintenanceWorker `json:"worker"`
CurrentTasks []*MaintenanceTask `json:"current_tasks"`
RecentTasks []*MaintenanceTask `json:"recent_tasks"`
Performance *WorkerPerformance `json:"performance"`
LastUpdated time.Time `json:"last_updated"`
}
// WorkerPerformance tracks worker performance metrics
type WorkerPerformance struct {
TasksCompleted int `json:"tasks_completed"`
TasksFailed int `json:"tasks_failed"`
AverageTaskTime time.Duration `json:"average_task_time"`
Uptime time.Duration `json:"uptime"`
SuccessRate float64 `json:"success_rate"`
}
// TaskConfigData represents data for individual task configuration page
type TaskConfigData struct {
TaskType MaintenanceTaskType `json:"task_type"`
TaskName string `json:"task_name"`
TaskIcon string `json:"task_icon"`
Description string `json:"description"`
ConfigFormHTML template.HTML `json:"config_form_html"`
}
// ClusterReplicationTask represents a cluster replication task parameters
type ClusterReplicationTask struct {
SourcePath string `json:"source_path"`
TargetCluster string `json:"target_cluster"`
TargetPath string `json:"target_path"`
ReplicationMode string `json:"replication_mode"` // "sync", "async", "backup"
Priority int `json:"priority"`
Checksum string `json:"checksum,omitempty"`
FileSize int64 `json:"file_size"`
CreatedAt time.Time `json:"created_at"`
Metadata map[string]string `json:"metadata,omitempty"`
}
// BuildMaintenancePolicyFromTasks creates a maintenance policy with configurations
// from all registered tasks using their UI providers
func BuildMaintenancePolicyFromTasks() *MaintenancePolicy {
policy := &MaintenancePolicy{
TaskPolicies: make(map[MaintenanceTaskType]*TaskPolicy),
GlobalMaxConcurrent: 4,
DefaultRepeatInterval: 6,
DefaultCheckInterval: 12,
}
// Get all registered task types from the UI registry
uiRegistry := tasks.GetGlobalUIRegistry()
typesRegistry := tasks.GetGlobalTypesRegistry()
for taskType, provider := range uiRegistry.GetAllProviders() {
// Convert task type to maintenance task type
maintenanceTaskType := MaintenanceTaskType(string(taskType))
// Get the default configuration from the UI provider
defaultConfig := provider.GetCurrentConfig()
// Create task policy from UI configuration
taskPolicy := &TaskPolicy{
Enabled: true, // Default enabled
MaxConcurrent: 2, // Default concurrency
RepeatInterval: policy.DefaultRepeatInterval,
CheckInterval: policy.DefaultCheckInterval,
Configuration: make(map[string]interface{}),
}
// Extract configuration from UI provider's config
if configMap, ok := defaultConfig.(map[string]interface{}); ok {
// Copy all configuration values
for key, value := range configMap {
taskPolicy.Configuration[key] = value
}
// Extract common fields
if enabled, exists := configMap["enabled"]; exists {
if enabledBool, ok := enabled.(bool); ok {
taskPolicy.Enabled = enabledBool
}
}
if maxConcurrent, exists := configMap["max_concurrent"]; exists {
if maxConcurrentInt, ok := maxConcurrent.(int); ok {
taskPolicy.MaxConcurrent = maxConcurrentInt
} else if maxConcurrentFloat, ok := maxConcurrent.(float64); ok {
taskPolicy.MaxConcurrent = int(maxConcurrentFloat)
}
}
}
// Also get defaults from scheduler if available (using types.TaskScheduler explicitly)
var scheduler types.TaskScheduler = typesRegistry.GetScheduler(taskType)
if scheduler != nil {
if taskPolicy.MaxConcurrent <= 0 {
taskPolicy.MaxConcurrent = scheduler.GetMaxConcurrent()
}
// Convert default repeat interval to hours
if repeatInterval := scheduler.GetDefaultRepeatInterval(); repeatInterval > 0 {
taskPolicy.RepeatInterval = int(repeatInterval.Hours())
}
}
// Also get defaults from detector if available (using types.TaskDetector explicitly)
var detector types.TaskDetector = typesRegistry.GetDetector(taskType)
if detector != nil {
// Convert scan interval to check interval (hours)
if scanInterval := detector.ScanInterval(); scanInterval > 0 {
taskPolicy.CheckInterval = int(scanInterval.Hours())
}
}
policy.TaskPolicies[maintenanceTaskType] = taskPolicy
glog.V(3).Infof("Built policy for task type %s: enabled=%v, max_concurrent=%d",
maintenanceTaskType, taskPolicy.Enabled, taskPolicy.MaxConcurrent)
}
glog.V(2).Infof("Built maintenance policy with %d task configurations", len(policy.TaskPolicies))
return policy
}
// SetPolicyFromTasks sets the maintenance policy from registered tasks
func SetPolicyFromTasks(policy *MaintenancePolicy) {
if policy == nil {
return
}
// Build new policy from tasks
newPolicy := BuildMaintenancePolicyFromTasks()
// Copy task policies
policy.TaskPolicies = newPolicy.TaskPolicies
glog.V(1).Infof("Updated maintenance policy with %d task configurations from registered tasks", len(policy.TaskPolicies))
}
// GetTaskIcon returns the icon CSS class for a task type from its UI provider
func GetTaskIcon(taskType MaintenanceTaskType) string {
typesRegistry := tasks.GetGlobalTypesRegistry()
uiRegistry := tasks.GetGlobalUIRegistry()
// Convert MaintenanceTaskType to TaskType
for workerTaskType := range typesRegistry.GetAllDetectors() {
if string(workerTaskType) == string(taskType) {
// Get the UI provider for this task type
provider := uiRegistry.GetProvider(workerTaskType)
if provider != nil {
return provider.GetIcon()
}
break
}
}
// Default icon if no UI provider found
return "fas fa-cog text-muted"
}
// GetTaskDisplayName returns the display name for a task type from its UI provider
func GetTaskDisplayName(taskType MaintenanceTaskType) string {
typesRegistry := tasks.GetGlobalTypesRegistry()
uiRegistry := tasks.GetGlobalUIRegistry()
// Convert MaintenanceTaskType to TaskType
for workerTaskType := range typesRegistry.GetAllDetectors() {
if string(workerTaskType) == string(taskType) {
// Get the UI provider for this task type
provider := uiRegistry.GetProvider(workerTaskType)
if provider != nil {
return provider.GetDisplayName()
}
break
}
}
// Fallback to the task type string
return string(taskType)
}
// GetTaskDescription returns the description for a task type from its UI provider
func GetTaskDescription(taskType MaintenanceTaskType) string {
typesRegistry := tasks.GetGlobalTypesRegistry()
uiRegistry := tasks.GetGlobalUIRegistry()
// Convert MaintenanceTaskType to TaskType
for workerTaskType := range typesRegistry.GetAllDetectors() {
if string(workerTaskType) == string(taskType) {
// Get the UI provider for this task type
provider := uiRegistry.GetProvider(workerTaskType)
if provider != nil {
return provider.GetDescription()
}
break
}
}
// Fallback to a generic description
return "Configure detailed settings for " + string(taskType) + " tasks."
}
// BuildMaintenanceMenuItems creates menu items for all registered task types
func BuildMaintenanceMenuItems() []*MaintenanceMenuItem {
var menuItems []*MaintenanceMenuItem
// Get all registered task types
registeredTypes := GetRegisteredMaintenanceTaskTypes()
for _, taskType := range registeredTypes {
menuItem := &MaintenanceMenuItem{
TaskType: taskType,
DisplayName: GetTaskDisplayName(taskType),
Description: GetTaskDescription(taskType),
Icon: GetTaskIcon(taskType),
IsEnabled: IsMaintenanceTaskTypeRegistered(taskType),
Path: "/maintenance/config/" + string(taskType),
}
menuItems = append(menuItems, menuItem)
}
return menuItems
}

View File

@@ -0,0 +1,413 @@
package maintenance
import (
"fmt"
"os"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
// Import task packages to trigger their auto-registration
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
)
// MaintenanceWorkerService manages maintenance task execution
// TaskExecutor defines the function signature for task execution
type TaskExecutor func(*MaintenanceWorkerService, *MaintenanceTask) error
// TaskExecutorFactory creates a task executor for a given worker service
type TaskExecutorFactory func() TaskExecutor
// Global registry for task executor factories
var taskExecutorFactories = make(map[MaintenanceTaskType]TaskExecutorFactory)
var executorRegistryMutex sync.RWMutex
var executorRegistryInitOnce sync.Once
// initializeExecutorFactories dynamically registers executor factories for all auto-registered task types
func initializeExecutorFactories() {
executorRegistryInitOnce.Do(func() {
// Get all registered task types from the global registry
typesRegistry := tasks.GetGlobalTypesRegistry()
var taskTypes []MaintenanceTaskType
for workerTaskType := range typesRegistry.GetAllDetectors() {
// Convert types.TaskType to MaintenanceTaskType by string conversion
maintenanceTaskType := MaintenanceTaskType(string(workerTaskType))
taskTypes = append(taskTypes, maintenanceTaskType)
}
// Register generic executor for all task types
for _, taskType := range taskTypes {
RegisterTaskExecutorFactory(taskType, createGenericTaskExecutor)
}
glog.V(1).Infof("Dynamically registered generic task executor for %d task types: %v", len(taskTypes), taskTypes)
})
}
// RegisterTaskExecutorFactory registers a factory function for creating task executors
func RegisterTaskExecutorFactory(taskType MaintenanceTaskType, factory TaskExecutorFactory) {
executorRegistryMutex.Lock()
defer executorRegistryMutex.Unlock()
taskExecutorFactories[taskType] = factory
glog.V(2).Infof("Registered executor factory for task type: %s", taskType)
}
// GetTaskExecutorFactory returns the factory for a task type
func GetTaskExecutorFactory(taskType MaintenanceTaskType) (TaskExecutorFactory, bool) {
// Ensure executor factories are initialized
initializeExecutorFactories()
executorRegistryMutex.RLock()
defer executorRegistryMutex.RUnlock()
factory, exists := taskExecutorFactories[taskType]
return factory, exists
}
// GetSupportedExecutorTaskTypes returns all task types with registered executor factories
func GetSupportedExecutorTaskTypes() []MaintenanceTaskType {
// Ensure executor factories are initialized
initializeExecutorFactories()
executorRegistryMutex.RLock()
defer executorRegistryMutex.RUnlock()
taskTypes := make([]MaintenanceTaskType, 0, len(taskExecutorFactories))
for taskType := range taskExecutorFactories {
taskTypes = append(taskTypes, taskType)
}
return taskTypes
}
// createGenericTaskExecutor creates a generic task executor that uses the task registry
func createGenericTaskExecutor() TaskExecutor {
return func(mws *MaintenanceWorkerService, task *MaintenanceTask) error {
return mws.executeGenericTask(task)
}
}
// init does minimal initialization - actual registration happens lazily
func init() {
// Executor factory registration will happen lazily when first accessed
glog.V(1).Infof("Maintenance worker initialized - executor factories will be registered on first access")
}
type MaintenanceWorkerService struct {
workerID string
address string
adminServer string
capabilities []MaintenanceTaskType
maxConcurrent int
currentTasks map[string]*MaintenanceTask
queue *MaintenanceQueue
adminClient AdminClient
running bool
stopChan chan struct{}
// Task execution registry
taskExecutors map[MaintenanceTaskType]TaskExecutor
// Task registry for creating task instances
taskRegistry *tasks.TaskRegistry
}
// NewMaintenanceWorkerService creates a new maintenance worker service
func NewMaintenanceWorkerService(workerID, address, adminServer string) *MaintenanceWorkerService {
// Get all registered maintenance task types dynamically
capabilities := GetRegisteredMaintenanceTaskTypes()
worker := &MaintenanceWorkerService{
workerID: workerID,
address: address,
adminServer: adminServer,
capabilities: capabilities,
maxConcurrent: 2, // Default concurrent task limit
currentTasks: make(map[string]*MaintenanceTask),
stopChan: make(chan struct{}),
taskExecutors: make(map[MaintenanceTaskType]TaskExecutor),
taskRegistry: tasks.GetGlobalRegistry(), // Use global registry with auto-registered tasks
}
// Initialize task executor registry
worker.initializeTaskExecutors()
glog.V(1).Infof("Created maintenance worker with %d registered task types", len(worker.taskRegistry.GetSupportedTypes()))
return worker
}
// executeGenericTask executes a task using the task registry instead of hardcoded methods
func (mws *MaintenanceWorkerService) executeGenericTask(task *MaintenanceTask) error {
glog.V(2).Infof("Executing generic task %s: %s for volume %d", task.ID, task.Type, task.VolumeID)
// Convert MaintenanceTask to types.TaskType
taskType := types.TaskType(string(task.Type))
// Create task parameters
taskParams := types.TaskParams{
VolumeID: task.VolumeID,
Server: task.Server,
Collection: task.Collection,
Parameters: task.Parameters,
}
// Create task instance using the registry
taskInstance, err := mws.taskRegistry.CreateTask(taskType, taskParams)
if err != nil {
return fmt.Errorf("failed to create task instance: %v", err)
}
// Update progress to show task has started
mws.updateTaskProgress(task.ID, 5)
// Execute the task
err = taskInstance.Execute(taskParams)
if err != nil {
return fmt.Errorf("task execution failed: %v", err)
}
// Update progress to show completion
mws.updateTaskProgress(task.ID, 100)
glog.V(2).Infof("Generic task %s completed successfully", task.ID)
return nil
}
// initializeTaskExecutors sets up the task execution registry dynamically
func (mws *MaintenanceWorkerService) initializeTaskExecutors() {
mws.taskExecutors = make(map[MaintenanceTaskType]TaskExecutor)
// Get all registered executor factories and create executors
executorRegistryMutex.RLock()
defer executorRegistryMutex.RUnlock()
for taskType, factory := range taskExecutorFactories {
executor := factory()
mws.taskExecutors[taskType] = executor
glog.V(3).Infof("Initialized executor for task type: %s", taskType)
}
glog.V(2).Infof("Initialized %d task executors", len(mws.taskExecutors))
}
// RegisterTaskExecutor allows dynamic registration of new task executors
func (mws *MaintenanceWorkerService) RegisterTaskExecutor(taskType MaintenanceTaskType, executor TaskExecutor) {
if mws.taskExecutors == nil {
mws.taskExecutors = make(map[MaintenanceTaskType]TaskExecutor)
}
mws.taskExecutors[taskType] = executor
glog.V(1).Infof("Registered executor for task type: %s", taskType)
}
// GetSupportedTaskTypes returns all task types that this worker can execute
func (mws *MaintenanceWorkerService) GetSupportedTaskTypes() []MaintenanceTaskType {
return GetSupportedExecutorTaskTypes()
}
// Start begins the worker service
func (mws *MaintenanceWorkerService) Start() error {
mws.running = true
// Register with admin server
worker := &MaintenanceWorker{
ID: mws.workerID,
Address: mws.address,
Capabilities: mws.capabilities,
MaxConcurrent: mws.maxConcurrent,
}
if mws.queue != nil {
mws.queue.RegisterWorker(worker)
}
// Start worker loop
go mws.workerLoop()
glog.Infof("Maintenance worker %s started at %s", mws.workerID, mws.address)
return nil
}
// Stop terminates the worker service
func (mws *MaintenanceWorkerService) Stop() {
mws.running = false
close(mws.stopChan)
// Wait for current tasks to complete or timeout
timeout := time.NewTimer(30 * time.Second)
defer timeout.Stop()
for len(mws.currentTasks) > 0 {
select {
case <-timeout.C:
glog.Warningf("Worker %s stopping with %d tasks still running", mws.workerID, len(mws.currentTasks))
return
case <-time.After(time.Second):
// Check again
}
}
glog.Infof("Maintenance worker %s stopped", mws.workerID)
}
// workerLoop is the main worker event loop
func (mws *MaintenanceWorkerService) workerLoop() {
heartbeatTicker := time.NewTicker(30 * time.Second)
defer heartbeatTicker.Stop()
taskRequestTicker := time.NewTicker(5 * time.Second)
defer taskRequestTicker.Stop()
for mws.running {
select {
case <-mws.stopChan:
return
case <-heartbeatTicker.C:
mws.sendHeartbeat()
case <-taskRequestTicker.C:
mws.requestTasks()
}
}
}
// sendHeartbeat sends heartbeat to admin server
func (mws *MaintenanceWorkerService) sendHeartbeat() {
if mws.queue != nil {
mws.queue.UpdateWorkerHeartbeat(mws.workerID)
}
}
// requestTasks requests new tasks from the admin server
func (mws *MaintenanceWorkerService) requestTasks() {
if len(mws.currentTasks) >= mws.maxConcurrent {
return // Already at capacity
}
if mws.queue != nil {
task := mws.queue.GetNextTask(mws.workerID, mws.capabilities)
if task != nil {
mws.executeTask(task)
}
}
}
// executeTask executes a maintenance task
func (mws *MaintenanceWorkerService) executeTask(task *MaintenanceTask) {
mws.currentTasks[task.ID] = task
go func() {
defer func() {
delete(mws.currentTasks, task.ID)
}()
glog.Infof("Worker %s executing task %s: %s", mws.workerID, task.ID, task.Type)
// Execute task using dynamic executor registry
var err error
if executor, exists := mws.taskExecutors[task.Type]; exists {
err = executor(mws, task)
} else {
err = fmt.Errorf("unsupported task type: %s", task.Type)
glog.Errorf("No executor registered for task type: %s", task.Type)
}
// Report task completion
if mws.queue != nil {
errorMsg := ""
if err != nil {
errorMsg = err.Error()
}
mws.queue.CompleteTask(task.ID, errorMsg)
}
if err != nil {
glog.Errorf("Worker %s failed to execute task %s: %v", mws.workerID, task.ID, err)
} else {
glog.Infof("Worker %s completed task %s successfully", mws.workerID, task.ID)
}
}()
}
// updateTaskProgress updates the progress of a task
func (mws *MaintenanceWorkerService) updateTaskProgress(taskID string, progress float64) {
if mws.queue != nil {
mws.queue.UpdateTaskProgress(taskID, progress)
}
}
// GetStatus returns the current status of the worker
func (mws *MaintenanceWorkerService) GetStatus() map[string]interface{} {
return map[string]interface{}{
"worker_id": mws.workerID,
"address": mws.address,
"running": mws.running,
"capabilities": mws.capabilities,
"max_concurrent": mws.maxConcurrent,
"current_tasks": len(mws.currentTasks),
"task_details": mws.currentTasks,
}
}
// SetQueue sets the maintenance queue for the worker
func (mws *MaintenanceWorkerService) SetQueue(queue *MaintenanceQueue) {
mws.queue = queue
}
// SetAdminClient sets the admin client for the worker
func (mws *MaintenanceWorkerService) SetAdminClient(client AdminClient) {
mws.adminClient = client
}
// SetCapabilities sets the worker capabilities
func (mws *MaintenanceWorkerService) SetCapabilities(capabilities []MaintenanceTaskType) {
mws.capabilities = capabilities
}
// SetMaxConcurrent sets the maximum concurrent tasks
func (mws *MaintenanceWorkerService) SetMaxConcurrent(max int) {
mws.maxConcurrent = max
}
// SetHeartbeatInterval sets the heartbeat interval (placeholder for future use)
func (mws *MaintenanceWorkerService) SetHeartbeatInterval(interval time.Duration) {
// Future implementation for configurable heartbeat
}
// SetTaskRequestInterval sets the task request interval (placeholder for future use)
func (mws *MaintenanceWorkerService) SetTaskRequestInterval(interval time.Duration) {
// Future implementation for configurable task requests
}
// MaintenanceWorkerCommand represents a standalone maintenance worker command
type MaintenanceWorkerCommand struct {
workerService *MaintenanceWorkerService
}
// NewMaintenanceWorkerCommand creates a new worker command
func NewMaintenanceWorkerCommand(workerID, address, adminServer string) *MaintenanceWorkerCommand {
return &MaintenanceWorkerCommand{
workerService: NewMaintenanceWorkerService(workerID, address, adminServer),
}
}
// Run starts the maintenance worker as a standalone service
func (mwc *MaintenanceWorkerCommand) Run() error {
// Generate worker ID if not provided
if mwc.workerService.workerID == "" {
hostname, _ := os.Hostname()
mwc.workerService.workerID = fmt.Sprintf("worker-%s-%d", hostname, time.Now().Unix())
}
// Start the worker service
err := mwc.workerService.Start()
if err != nil {
return fmt.Errorf("failed to start maintenance worker: %v", err)
}
// Wait for interrupt signal
select {}
}