Admin: misc improvements on admin server and workers. EC now works. (#7055)

* initial design

* added simulation as tests

* reorganized the codebase to move the simulation framework and tests into their own dedicated package

* integration test. ec worker task

* remove "enhanced" reference

* start master, volume servers, filer

Current Status
 Master: Healthy and running (port 9333)
 Filer: Healthy and running (port 8888)
 Volume Servers: All 6 servers running (ports 8080-8085)
🔄 Admin/Workers: Will start when dependencies are ready

* generate write load

* tasks are assigned

* admin start wtih grpc port. worker has its own working directory

* Update .gitignore

* working worker and admin. Task detection is not working yet.

* compiles, detection uses volumeSizeLimitMB from master

* compiles

* worker retries connecting to admin

* build and restart

* rendering pending tasks

* skip task ID column

* sticky worker id

* test canScheduleTaskNow

* worker reconnect to admin

* clean up logs

* worker register itself first

* worker can run ec work and report status

but:
1. one volume should not be repeatedly worked on.
2. ec shards needs to be distributed and source data should be deleted.

* move ec task logic

* listing ec shards

* local copy, ec. Need to distribute.

* ec is mostly working now

* distribution of ec shards needs improvement
* need configuration to enable ec

* show ec volumes

* interval field UI component

* rename

* integration test with vauuming

* garbage percentage threshold

* fix warning

* display ec shard sizes

* fix ec volumes list

* Update ui.go

* show default values

* ensure correct default value

* MaintenanceConfig use ConfigField

* use schema defined defaults

* config

* reduce duplication

* refactor to use BaseUIProvider

* each task register its schema

* checkECEncodingCandidate use ecDetector

* use vacuumDetector

* use volumeSizeLimitMB

* remove

remove

* remove unused

* refactor

* use new framework

* remove v2 reference

* refactor

* left menu can scroll now

* The maintenance manager was not being initialized when no data directory was configured for persistent storage.

* saving config

* Update task_config_schema_templ.go

* enable/disable tasks

* protobuf encoded task configurations

* fix system settings

* use ui component

* remove logs

* interface{} Reduction

* reduce interface{}

* reduce interface{}

* avoid from/to map

* reduce interface{}

* refactor

* keep it DRY

* added logging

* debug messages

* debug level

* debug

* show the log caller line

* use configured task policy

* log level

* handle admin heartbeat response

* Update worker.go

* fix EC rack and dc count

* Report task status to admin server

* fix task logging, simplify interface checking, use erasure_coding constants

* factor in empty volume server during task planning

* volume.list adds disk id

* track disk id also

* fix locking scheduled and manual scanning

* add active topology

* simplify task detector

* ec task completed, but shards are not showing up

* implement ec in ec_typed.go

* adjust log level

* dedup

* implementing ec copying shards and only ecx files

* use disk id when distributing ec shards

🎯 Planning: ActiveTopology creates DestinationPlan with specific TargetDisk
📦 Task Creation: maintenance_integration.go creates ECDestination with DiskId
🚀 Task Execution: EC task passes DiskId in VolumeEcShardsCopyRequest
💾 Volume Server: Receives disk_id and stores shards on specific disk (vs.store.Locations[req.DiskId])
📂 File System: EC shards and metadata land in the exact disk directory planned

* Delete original volume from all locations

* clean up existing shard locations

* local encoding and distributing

* Update docker/admin_integration/EC-TESTING-README.md

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* check volume id range

* simplify

* fix tests

* fix types

* clean up logs and tests

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
Chris Lu
2025-07-30 12:38:03 -07:00
committed by GitHub
parent 64198dad83
commit 891a2fb6eb
130 changed files with 27737 additions and 4429 deletions

View File

@@ -0,0 +1,190 @@
package maintenance
import (
"github.com/seaweedfs/seaweedfs/weed/admin/config"
)
// Type aliases for backward compatibility
type ConfigFieldType = config.FieldType
type ConfigFieldUnit = config.FieldUnit
type ConfigField = config.Field
// Constant aliases for backward compatibility
const (
FieldTypeBool = config.FieldTypeBool
FieldTypeInt = config.FieldTypeInt
FieldTypeDuration = config.FieldTypeDuration
FieldTypeInterval = config.FieldTypeInterval
FieldTypeString = config.FieldTypeString
FieldTypeFloat = config.FieldTypeFloat
)
const (
UnitSeconds = config.UnitSeconds
UnitMinutes = config.UnitMinutes
UnitHours = config.UnitHours
UnitDays = config.UnitDays
UnitCount = config.UnitCount
UnitNone = config.UnitNone
)
// Function aliases for backward compatibility
var (
SecondsToIntervalValueUnit = config.SecondsToIntervalValueUnit
IntervalValueUnitToSeconds = config.IntervalValueUnitToSeconds
)
// MaintenanceConfigSchema defines the schema for maintenance configuration
type MaintenanceConfigSchema struct {
config.Schema // Embed common schema functionality
}
// GetMaintenanceConfigSchema returns the schema for maintenance configuration
func GetMaintenanceConfigSchema() *MaintenanceConfigSchema {
return &MaintenanceConfigSchema{
Schema: config.Schema{
Fields: []*config.Field{
{
Name: "enabled",
JSONName: "enabled",
Type: config.FieldTypeBool,
DefaultValue: true,
Required: false,
DisplayName: "Enable Maintenance System",
Description: "When enabled, the system will automatically scan for and execute maintenance tasks",
HelpText: "Toggle this to enable or disable the entire maintenance system",
InputType: "checkbox",
CSSClasses: "form-check-input",
},
{
Name: "scan_interval_seconds",
JSONName: "scan_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 30 * 60, // 30 minutes in seconds
MinValue: 1 * 60, // 1 minute
MaxValue: 24 * 60 * 60, // 24 hours
Required: true,
DisplayName: "Scan Interval",
Description: "How often to scan for maintenance tasks",
HelpText: "The system will check for new maintenance tasks at this interval",
Placeholder: "30",
Unit: config.UnitMinutes,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "worker_timeout_seconds",
JSONName: "worker_timeout_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 5 * 60, // 5 minutes
MinValue: 1 * 60, // 1 minute
MaxValue: 60 * 60, // 1 hour
Required: true,
DisplayName: "Worker Timeout",
Description: "How long to wait for worker heartbeat before considering it inactive",
HelpText: "Workers that don't send heartbeats within this time are considered offline",
Placeholder: "5",
Unit: config.UnitMinutes,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "task_timeout_seconds",
JSONName: "task_timeout_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 2 * 60 * 60, // 2 hours
MinValue: 10 * 60, // 10 minutes
MaxValue: 24 * 60 * 60, // 24 hours
Required: true,
DisplayName: "Task Timeout",
Description: "Maximum time allowed for a task to complete",
HelpText: "Tasks that exceed this duration will be marked as failed",
Placeholder: "2",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "retry_delay_seconds",
JSONName: "retry_delay_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 15 * 60, // 15 minutes
MinValue: 1 * 60, // 1 minute
MaxValue: 24 * 60 * 60, // 24 hours
Required: true,
DisplayName: "Retry Delay",
Description: "How long to wait before retrying a failed task",
HelpText: "Failed tasks will be retried after this delay",
Placeholder: "15",
Unit: config.UnitMinutes,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "max_retries",
JSONName: "max_retries",
Type: config.FieldTypeInt,
DefaultValue: 3,
MinValue: 0,
MaxValue: 10,
Required: true,
DisplayName: "Max Retries",
Description: "Maximum number of times to retry a failed task",
HelpText: "Tasks that fail more than this many times will be marked as permanently failed",
Placeholder: "3",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
{
Name: "cleanup_interval_seconds",
JSONName: "cleanup_interval_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 24 * 60 * 60, // 24 hours
MinValue: 1 * 60 * 60, // 1 hour
MaxValue: 7 * 24 * 60 * 60, // 7 days
Required: true,
DisplayName: "Cleanup Interval",
Description: "How often to run maintenance cleanup operations",
HelpText: "Removes old task records and temporary files at this interval",
Placeholder: "24",
Unit: config.UnitHours,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "task_retention_seconds",
JSONName: "task_retention_seconds",
Type: config.FieldTypeInterval,
DefaultValue: 7 * 24 * 60 * 60, // 7 days
MinValue: 1 * 24 * 60 * 60, // 1 day
MaxValue: 30 * 24 * 60 * 60, // 30 days
Required: true,
DisplayName: "Task Retention",
Description: "How long to keep completed task records",
HelpText: "Task history older than this duration will be automatically deleted",
Placeholder: "7",
Unit: config.UnitDays,
InputType: "interval",
CSSClasses: "form-control",
},
{
Name: "global_max_concurrent",
JSONName: "global_max_concurrent",
Type: config.FieldTypeInt,
DefaultValue: 10,
MinValue: 1,
MaxValue: 100,
Required: true,
DisplayName: "Global Max Concurrent Tasks",
Description: "Maximum number of maintenance tasks that can run simultaneously across all workers",
HelpText: "Limits the total number of maintenance operations to control system load",
Placeholder: "10",
Unit: config.UnitCount,
InputType: "number",
CSSClasses: "form-control",
},
},
},
}
}

View File

@@ -0,0 +1,124 @@
package maintenance
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
)
// VerifyProtobufConfig demonstrates that the protobuf configuration system is working
func VerifyProtobufConfig() error {
// Create configuration manager
configManager := NewMaintenanceConfigManager()
config := configManager.GetConfig()
// Verify basic configuration
if !config.Enabled {
return fmt.Errorf("expected config to be enabled by default")
}
if config.ScanIntervalSeconds != 30*60 {
return fmt.Errorf("expected scan interval to be 1800 seconds, got %d", config.ScanIntervalSeconds)
}
// Verify policy configuration
if config.Policy == nil {
return fmt.Errorf("expected policy to be configured")
}
if config.Policy.GlobalMaxConcurrent != 4 {
return fmt.Errorf("expected global max concurrent to be 4, got %d", config.Policy.GlobalMaxConcurrent)
}
// Verify task policies
vacuumPolicy := config.Policy.TaskPolicies["vacuum"]
if vacuumPolicy == nil {
return fmt.Errorf("expected vacuum policy to be configured")
}
if !vacuumPolicy.Enabled {
return fmt.Errorf("expected vacuum policy to be enabled")
}
// Verify typed configuration access
vacuumConfig := vacuumPolicy.GetVacuumConfig()
if vacuumConfig == nil {
return fmt.Errorf("expected vacuum config to be accessible")
}
if vacuumConfig.GarbageThreshold != 0.3 {
return fmt.Errorf("expected garbage threshold to be 0.3, got %f", vacuumConfig.GarbageThreshold)
}
// Verify helper functions work
if !IsTaskEnabled(config.Policy, "vacuum") {
return fmt.Errorf("expected vacuum task to be enabled via helper function")
}
maxConcurrent := GetMaxConcurrent(config.Policy, "vacuum")
if maxConcurrent != 2 {
return fmt.Errorf("expected vacuum max concurrent to be 2, got %d", maxConcurrent)
}
// Verify erasure coding configuration
ecPolicy := config.Policy.TaskPolicies["erasure_coding"]
if ecPolicy == nil {
return fmt.Errorf("expected EC policy to be configured")
}
ecConfig := ecPolicy.GetErasureCodingConfig()
if ecConfig == nil {
return fmt.Errorf("expected EC config to be accessible")
}
// Verify configurable EC fields only
if ecConfig.FullnessRatio <= 0 || ecConfig.FullnessRatio > 1 {
return fmt.Errorf("expected EC config to have valid fullness ratio (0-1), got %f", ecConfig.FullnessRatio)
}
return nil
}
// GetProtobufConfigSummary returns a summary of the current protobuf configuration
func GetProtobufConfigSummary() string {
configManager := NewMaintenanceConfigManager()
config := configManager.GetConfig()
summary := fmt.Sprintf("SeaweedFS Protobuf Maintenance Configuration:\n")
summary += fmt.Sprintf(" Enabled: %v\n", config.Enabled)
summary += fmt.Sprintf(" Scan Interval: %d seconds\n", config.ScanIntervalSeconds)
summary += fmt.Sprintf(" Max Retries: %d\n", config.MaxRetries)
summary += fmt.Sprintf(" Global Max Concurrent: %d\n", config.Policy.GlobalMaxConcurrent)
summary += fmt.Sprintf(" Task Policies: %d configured\n", len(config.Policy.TaskPolicies))
for taskType, policy := range config.Policy.TaskPolicies {
summary += fmt.Sprintf(" %s: enabled=%v, max_concurrent=%d\n",
taskType, policy.Enabled, policy.MaxConcurrent)
}
return summary
}
// CreateCustomConfig demonstrates creating a custom protobuf configuration
func CreateCustomConfig() *worker_pb.MaintenanceConfig {
return &worker_pb.MaintenanceConfig{
Enabled: true,
ScanIntervalSeconds: 60 * 60, // 1 hour
MaxRetries: 5,
Policy: &worker_pb.MaintenancePolicy{
GlobalMaxConcurrent: 8,
TaskPolicies: map[string]*worker_pb.TaskPolicy{
"custom_vacuum": {
Enabled: true,
MaxConcurrent: 4,
TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
VacuumConfig: &worker_pb.VacuumTaskConfig{
GarbageThreshold: 0.5,
MinVolumeAgeHours: 48,
},
},
},
},
},
}
}

View File

@@ -0,0 +1,287 @@
package maintenance
import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
)
// MaintenanceConfigManager handles protobuf-based configuration
type MaintenanceConfigManager struct {
config *worker_pb.MaintenanceConfig
}
// NewMaintenanceConfigManager creates a new config manager with defaults
func NewMaintenanceConfigManager() *MaintenanceConfigManager {
return &MaintenanceConfigManager{
config: DefaultMaintenanceConfigProto(),
}
}
// DefaultMaintenanceConfigProto returns default configuration as protobuf
func DefaultMaintenanceConfigProto() *worker_pb.MaintenanceConfig {
return &worker_pb.MaintenanceConfig{
Enabled: true,
ScanIntervalSeconds: 30 * 60, // 30 minutes
WorkerTimeoutSeconds: 5 * 60, // 5 minutes
TaskTimeoutSeconds: 2 * 60 * 60, // 2 hours
RetryDelaySeconds: 15 * 60, // 15 minutes
MaxRetries: 3,
CleanupIntervalSeconds: 24 * 60 * 60, // 24 hours
TaskRetentionSeconds: 7 * 24 * 60 * 60, // 7 days
// Policy field will be populated dynamically from separate task configuration files
Policy: nil,
}
}
// GetConfig returns the current configuration
func (mcm *MaintenanceConfigManager) GetConfig() *worker_pb.MaintenanceConfig {
return mcm.config
}
// Type-safe configuration accessors
// GetVacuumConfig returns vacuum-specific configuration for a task type
func (mcm *MaintenanceConfigManager) GetVacuumConfig(taskType string) *worker_pb.VacuumTaskConfig {
if policy := mcm.getTaskPolicy(taskType); policy != nil {
if vacuumConfig := policy.GetVacuumConfig(); vacuumConfig != nil {
return vacuumConfig
}
}
// Return defaults if not configured
return &worker_pb.VacuumTaskConfig{
GarbageThreshold: 0.3,
MinVolumeAgeHours: 24,
MinIntervalSeconds: 7 * 24 * 60 * 60, // 7 days
}
}
// GetErasureCodingConfig returns EC-specific configuration for a task type
func (mcm *MaintenanceConfigManager) GetErasureCodingConfig(taskType string) *worker_pb.ErasureCodingTaskConfig {
if policy := mcm.getTaskPolicy(taskType); policy != nil {
if ecConfig := policy.GetErasureCodingConfig(); ecConfig != nil {
return ecConfig
}
}
// Return defaults if not configured
return &worker_pb.ErasureCodingTaskConfig{
FullnessRatio: 0.95,
QuietForSeconds: 3600,
MinVolumeSizeMb: 100,
CollectionFilter: "",
}
}
// GetBalanceConfig returns balance-specific configuration for a task type
func (mcm *MaintenanceConfigManager) GetBalanceConfig(taskType string) *worker_pb.BalanceTaskConfig {
if policy := mcm.getTaskPolicy(taskType); policy != nil {
if balanceConfig := policy.GetBalanceConfig(); balanceConfig != nil {
return balanceConfig
}
}
// Return defaults if not configured
return &worker_pb.BalanceTaskConfig{
ImbalanceThreshold: 0.2,
MinServerCount: 2,
}
}
// GetReplicationConfig returns replication-specific configuration for a task type
func (mcm *MaintenanceConfigManager) GetReplicationConfig(taskType string) *worker_pb.ReplicationTaskConfig {
if policy := mcm.getTaskPolicy(taskType); policy != nil {
if replicationConfig := policy.GetReplicationConfig(); replicationConfig != nil {
return replicationConfig
}
}
// Return defaults if not configured
return &worker_pb.ReplicationTaskConfig{
TargetReplicaCount: 2,
}
}
// Typed convenience methods for getting task configurations
// GetVacuumTaskConfigForType returns vacuum configuration for a specific task type
func (mcm *MaintenanceConfigManager) GetVacuumTaskConfigForType(taskType string) *worker_pb.VacuumTaskConfig {
return GetVacuumTaskConfig(mcm.config.Policy, MaintenanceTaskType(taskType))
}
// GetErasureCodingTaskConfigForType returns erasure coding configuration for a specific task type
func (mcm *MaintenanceConfigManager) GetErasureCodingTaskConfigForType(taskType string) *worker_pb.ErasureCodingTaskConfig {
return GetErasureCodingTaskConfig(mcm.config.Policy, MaintenanceTaskType(taskType))
}
// GetBalanceTaskConfigForType returns balance configuration for a specific task type
func (mcm *MaintenanceConfigManager) GetBalanceTaskConfigForType(taskType string) *worker_pb.BalanceTaskConfig {
return GetBalanceTaskConfig(mcm.config.Policy, MaintenanceTaskType(taskType))
}
// GetReplicationTaskConfigForType returns replication configuration for a specific task type
func (mcm *MaintenanceConfigManager) GetReplicationTaskConfigForType(taskType string) *worker_pb.ReplicationTaskConfig {
return GetReplicationTaskConfig(mcm.config.Policy, MaintenanceTaskType(taskType))
}
// Helper methods
func (mcm *MaintenanceConfigManager) getTaskPolicy(taskType string) *worker_pb.TaskPolicy {
if mcm.config.Policy != nil && mcm.config.Policy.TaskPolicies != nil {
return mcm.config.Policy.TaskPolicies[taskType]
}
return nil
}
// IsTaskEnabled returns whether a task type is enabled
func (mcm *MaintenanceConfigManager) IsTaskEnabled(taskType string) bool {
if policy := mcm.getTaskPolicy(taskType); policy != nil {
return policy.Enabled
}
return false
}
// GetMaxConcurrent returns the max concurrent limit for a task type
func (mcm *MaintenanceConfigManager) GetMaxConcurrent(taskType string) int32 {
if policy := mcm.getTaskPolicy(taskType); policy != nil {
return policy.MaxConcurrent
}
return 1 // Default
}
// GetRepeatInterval returns the repeat interval for a task type in seconds
func (mcm *MaintenanceConfigManager) GetRepeatInterval(taskType string) int32 {
if policy := mcm.getTaskPolicy(taskType); policy != nil {
return policy.RepeatIntervalSeconds
}
return mcm.config.Policy.DefaultRepeatIntervalSeconds
}
// GetCheckInterval returns the check interval for a task type in seconds
func (mcm *MaintenanceConfigManager) GetCheckInterval(taskType string) int32 {
if policy := mcm.getTaskPolicy(taskType); policy != nil {
return policy.CheckIntervalSeconds
}
return mcm.config.Policy.DefaultCheckIntervalSeconds
}
// Duration accessor methods
// GetScanInterval returns the scan interval as a time.Duration
func (mcm *MaintenanceConfigManager) GetScanInterval() time.Duration {
return time.Duration(mcm.config.ScanIntervalSeconds) * time.Second
}
// GetWorkerTimeout returns the worker timeout as a time.Duration
func (mcm *MaintenanceConfigManager) GetWorkerTimeout() time.Duration {
return time.Duration(mcm.config.WorkerTimeoutSeconds) * time.Second
}
// GetTaskTimeout returns the task timeout as a time.Duration
func (mcm *MaintenanceConfigManager) GetTaskTimeout() time.Duration {
return time.Duration(mcm.config.TaskTimeoutSeconds) * time.Second
}
// GetRetryDelay returns the retry delay as a time.Duration
func (mcm *MaintenanceConfigManager) GetRetryDelay() time.Duration {
return time.Duration(mcm.config.RetryDelaySeconds) * time.Second
}
// GetCleanupInterval returns the cleanup interval as a time.Duration
func (mcm *MaintenanceConfigManager) GetCleanupInterval() time.Duration {
return time.Duration(mcm.config.CleanupIntervalSeconds) * time.Second
}
// GetTaskRetention returns the task retention period as a time.Duration
func (mcm *MaintenanceConfigManager) GetTaskRetention() time.Duration {
return time.Duration(mcm.config.TaskRetentionSeconds) * time.Second
}
// ValidateMaintenanceConfigWithSchema validates protobuf maintenance configuration using ConfigField rules
func ValidateMaintenanceConfigWithSchema(config *worker_pb.MaintenanceConfig) error {
if config == nil {
return fmt.Errorf("configuration cannot be nil")
}
// Get the schema to access field validation rules
schema := GetMaintenanceConfigSchema()
// Validate each field individually using the ConfigField rules
if err := validateFieldWithSchema(schema, "enabled", config.Enabled); err != nil {
return err
}
if err := validateFieldWithSchema(schema, "scan_interval_seconds", int(config.ScanIntervalSeconds)); err != nil {
return err
}
if err := validateFieldWithSchema(schema, "worker_timeout_seconds", int(config.WorkerTimeoutSeconds)); err != nil {
return err
}
if err := validateFieldWithSchema(schema, "task_timeout_seconds", int(config.TaskTimeoutSeconds)); err != nil {
return err
}
if err := validateFieldWithSchema(schema, "retry_delay_seconds", int(config.RetryDelaySeconds)); err != nil {
return err
}
if err := validateFieldWithSchema(schema, "max_retries", int(config.MaxRetries)); err != nil {
return err
}
if err := validateFieldWithSchema(schema, "cleanup_interval_seconds", int(config.CleanupIntervalSeconds)); err != nil {
return err
}
if err := validateFieldWithSchema(schema, "task_retention_seconds", int(config.TaskRetentionSeconds)); err != nil {
return err
}
// Validate policy fields if present
if config.Policy != nil {
// Note: These field names might need to be adjusted based on the actual schema
if err := validatePolicyField("global_max_concurrent", int(config.Policy.GlobalMaxConcurrent)); err != nil {
return err
}
if err := validatePolicyField("default_repeat_interval_seconds", int(config.Policy.DefaultRepeatIntervalSeconds)); err != nil {
return err
}
if err := validatePolicyField("default_check_interval_seconds", int(config.Policy.DefaultCheckIntervalSeconds)); err != nil {
return err
}
}
return nil
}
// validateFieldWithSchema validates a single field using its ConfigField definition
func validateFieldWithSchema(schema *MaintenanceConfigSchema, fieldName string, value interface{}) error {
field := schema.GetFieldByName(fieldName)
if field == nil {
// Field not in schema, skip validation
return nil
}
return field.ValidateValue(value)
}
// validatePolicyField validates policy fields (simplified validation for now)
func validatePolicyField(fieldName string, value int) error {
switch fieldName {
case "global_max_concurrent":
if value < 1 || value > 20 {
return fmt.Errorf("Global Max Concurrent must be between 1 and 20, got %d", value)
}
case "default_repeat_interval":
if value < 1 || value > 168 {
return fmt.Errorf("Default Repeat Interval must be between 1 and 168 hours, got %d", value)
}
case "default_check_interval":
if value < 1 || value > 168 {
return fmt.Errorf("Default Check Interval must be between 1 and 168 hours, got %d", value)
}
}
return nil
}

View File

@@ -1,11 +1,20 @@
package maintenance
import (
"context"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// MaintenanceIntegration bridges the task system with existing maintenance
@@ -17,6 +26,12 @@ type MaintenanceIntegration struct {
maintenanceQueue *MaintenanceQueue
maintenancePolicy *MaintenancePolicy
// Pending operations tracker
pendingOperations *PendingOperations
// Active topology for task detection and target selection
activeTopology *topology.ActiveTopology
// Type conversion maps
taskTypeMap map[types.TaskType]MaintenanceTaskType
revTaskTypeMap map[MaintenanceTaskType]types.TaskType
@@ -31,8 +46,12 @@ func NewMaintenanceIntegration(queue *MaintenanceQueue, policy *MaintenancePolic
uiRegistry: tasks.GetGlobalUIRegistry(), // Use global UI registry with auto-registered UI providers
maintenanceQueue: queue,
maintenancePolicy: policy,
pendingOperations: NewPendingOperations(),
}
// Initialize active topology with 10 second recent task window
integration.activeTopology = topology.NewActiveTopology(10)
// Initialize type conversion maps
integration.initializeTypeMaps()
@@ -96,7 +115,7 @@ func (s *MaintenanceIntegration) registerAllTasks() {
s.buildTaskTypeMappings()
// Configure tasks from policy
s.configureTasksFromPolicy()
s.ConfigureTasksFromPolicy()
registeredTaskTypes := make([]string, 0, len(s.taskTypeMap))
for _, maintenanceTaskType := range s.taskTypeMap {
@@ -105,8 +124,8 @@ func (s *MaintenanceIntegration) registerAllTasks() {
glog.V(1).Infof("Registered tasks: %v", registeredTaskTypes)
}
// configureTasksFromPolicy dynamically configures all registered tasks based on the maintenance policy
func (s *MaintenanceIntegration) configureTasksFromPolicy() {
// ConfigureTasksFromPolicy dynamically configures all registered tasks based on the maintenance policy
func (s *MaintenanceIntegration) ConfigureTasksFromPolicy() {
if s.maintenancePolicy == nil {
return
}
@@ -143,7 +162,7 @@ func (s *MaintenanceIntegration) configureDetectorFromPolicy(taskType types.Task
// Convert task system type to maintenance task type for policy lookup
maintenanceTaskType, exists := s.taskTypeMap[taskType]
if exists {
enabled := s.maintenancePolicy.IsTaskEnabled(maintenanceTaskType)
enabled := IsTaskEnabled(s.maintenancePolicy, maintenanceTaskType)
basicDetector.SetEnabled(enabled)
glog.V(3).Infof("Set enabled=%v for detector %s", enabled, taskType)
}
@@ -172,14 +191,14 @@ func (s *MaintenanceIntegration) configureSchedulerFromPolicy(taskType types.Tas
// Set enabled status if scheduler supports it
if enableableScheduler, ok := scheduler.(interface{ SetEnabled(bool) }); ok {
enabled := s.maintenancePolicy.IsTaskEnabled(maintenanceTaskType)
enabled := IsTaskEnabled(s.maintenancePolicy, maintenanceTaskType)
enableableScheduler.SetEnabled(enabled)
glog.V(3).Infof("Set enabled=%v for scheduler %s", enabled, taskType)
}
// Set max concurrent if scheduler supports it
if concurrentScheduler, ok := scheduler.(interface{ SetMaxConcurrent(int) }); ok {
maxConcurrent := s.maintenancePolicy.GetMaxConcurrent(maintenanceTaskType)
maxConcurrent := GetMaxConcurrent(s.maintenancePolicy, maintenanceTaskType)
if maxConcurrent > 0 {
concurrentScheduler.SetMaxConcurrent(maxConcurrent)
glog.V(3).Infof("Set max concurrent=%d for scheduler %s", maxConcurrent, taskType)
@@ -193,11 +212,20 @@ func (s *MaintenanceIntegration) configureSchedulerFromPolicy(taskType types.Tas
// ScanWithTaskDetectors performs a scan using the task system
func (s *MaintenanceIntegration) ScanWithTaskDetectors(volumeMetrics []*types.VolumeHealthMetrics) ([]*TaskDetectionResult, error) {
// Note: ActiveTopology gets updated from topology info instead of volume metrics
glog.V(2).Infof("Processed %d volume metrics for task detection", len(volumeMetrics))
// Filter out volumes with pending operations to avoid duplicates
filteredMetrics := s.pendingOperations.FilterVolumeMetricsExcludingPending(volumeMetrics)
glog.V(1).Infof("Scanning %d volumes (filtered from %d) excluding pending operations",
len(filteredMetrics), len(volumeMetrics))
var allResults []*TaskDetectionResult
// Create cluster info
clusterInfo := &types.ClusterInfo{
TotalVolumes: len(volumeMetrics),
TotalVolumes: len(filteredMetrics),
LastUpdated: time.Now(),
}
@@ -209,17 +237,26 @@ func (s *MaintenanceIntegration) ScanWithTaskDetectors(volumeMetrics []*types.Vo
glog.V(2).Infof("Running detection for task type: %s", taskType)
results, err := detector.ScanForTasks(volumeMetrics, clusterInfo)
results, err := detector.ScanForTasks(filteredMetrics, clusterInfo)
if err != nil {
glog.Errorf("Failed to scan for %s tasks: %v", taskType, err)
continue
}
// Convert results to existing system format
// Convert results to existing system format and check for conflicts
for _, result := range results {
existingResult := s.convertToExistingFormat(result)
if existingResult != nil {
allResults = append(allResults, existingResult)
// Double-check for conflicts with pending operations
opType := s.mapMaintenanceTaskTypeToPendingOperationType(existingResult.TaskType)
if !s.pendingOperations.WouldConflictWithPending(existingResult.VolumeID, opType) {
// Plan destination for operations that need it
s.planDestinationForTask(existingResult, opType)
allResults = append(allResults, existingResult)
} else {
glog.V(2).Infof("Skipping task %s for volume %d due to conflict with pending operation",
existingResult.TaskType, existingResult.VolumeID)
}
}
}
@@ -229,6 +266,11 @@ func (s *MaintenanceIntegration) ScanWithTaskDetectors(volumeMetrics []*types.Vo
return allResults, nil
}
// UpdateTopologyInfo updates the volume shard tracker with topology information for empty servers
func (s *MaintenanceIntegration) UpdateTopologyInfo(topologyInfo *master_pb.TopologyInfo) error {
return s.activeTopology.UpdateTopology(topologyInfo)
}
// convertToExistingFormat converts task results to existing system format using dynamic mapping
func (s *MaintenanceIntegration) convertToExistingFormat(result *types.TaskDetectionResult) *TaskDetectionResult {
// Convert types using mapping tables
@@ -241,49 +283,62 @@ func (s *MaintenanceIntegration) convertToExistingFormat(result *types.TaskDetec
existingPriority, exists := s.priorityMap[result.Priority]
if !exists {
glog.Warningf("Unknown priority %d, defaulting to normal", result.Priority)
glog.Warningf("Unknown priority %s, defaulting to normal", result.Priority)
existingPriority = PriorityNormal
}
return &TaskDetectionResult{
TaskType: existingType,
VolumeID: result.VolumeID,
Server: result.Server,
Collection: result.Collection,
Priority: existingPriority,
Reason: result.Reason,
Parameters: result.Parameters,
ScheduleAt: result.ScheduleAt,
TaskType: existingType,
VolumeID: result.VolumeID,
Server: result.Server,
Collection: result.Collection,
Priority: existingPriority,
Reason: result.Reason,
TypedParams: result.TypedParams,
ScheduleAt: result.ScheduleAt,
}
}
// CanScheduleWithTaskSchedulers determines if a task can be scheduled using task schedulers with dynamic type conversion
func (s *MaintenanceIntegration) CanScheduleWithTaskSchedulers(task *MaintenanceTask, runningTasks []*MaintenanceTask, availableWorkers []*MaintenanceWorker) bool {
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Checking task %s (type: %s)", task.ID, task.Type)
// Convert existing types to task types using mapping
taskType, exists := s.revTaskTypeMap[task.Type]
if !exists {
glog.V(2).Infof("Unknown task type %s for scheduling, falling back to existing logic", task.Type)
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Unknown task type %s for scheduling, falling back to existing logic", task.Type)
return false // Fallback to existing logic for unknown types
}
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Mapped task type %s to %s", task.Type, taskType)
// Convert task objects
taskObject := s.convertTaskToTaskSystem(task)
if taskObject == nil {
glog.V(2).Infof("Failed to convert task %s for scheduling", task.ID)
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Failed to convert task %s for scheduling", task.ID)
return false
}
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Successfully converted task %s", task.ID)
runningTaskObjects := s.convertTasksToTaskSystem(runningTasks)
workerObjects := s.convertWorkersToTaskSystem(availableWorkers)
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Converted %d running tasks and %d workers", len(runningTaskObjects), len(workerObjects))
// Get the appropriate scheduler
scheduler := s.taskRegistry.GetScheduler(taskType)
if scheduler == nil {
glog.V(2).Infof("No scheduler found for task type %s", taskType)
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: No scheduler found for task type %s", taskType)
return false
}
return scheduler.CanScheduleNow(taskObject, runningTaskObjects, workerObjects)
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Found scheduler for task type %s", taskType)
canSchedule := scheduler.CanScheduleNow(taskObject, runningTaskObjects, workerObjects)
glog.Infof("DEBUG CanScheduleWithTaskSchedulers: Scheduler decision for task %s: %v", task.ID, canSchedule)
return canSchedule
}
// convertTaskToTaskSystem converts existing task to task system format using dynamic mapping
@@ -304,14 +359,14 @@ func (s *MaintenanceIntegration) convertTaskToTaskSystem(task *MaintenanceTask)
}
return &types.Task{
ID: task.ID,
Type: taskType,
Priority: priority,
VolumeID: task.VolumeID,
Server: task.Server,
Collection: task.Collection,
Parameters: task.Parameters,
CreatedAt: task.CreatedAt,
ID: task.ID,
Type: taskType,
Priority: priority,
VolumeID: task.VolumeID,
Server: task.Server,
Collection: task.Collection,
TypedParams: task.TypedParams,
CreatedAt: task.CreatedAt,
}
}
@@ -407,3 +462,463 @@ func (s *MaintenanceIntegration) GetAllTaskStats() []*types.TaskStats {
return stats
}
// mapMaintenanceTaskTypeToPendingOperationType converts a maintenance task type to a pending operation type
func (s *MaintenanceIntegration) mapMaintenanceTaskTypeToPendingOperationType(taskType MaintenanceTaskType) PendingOperationType {
switch taskType {
case MaintenanceTaskType("balance"):
return OpTypeVolumeBalance
case MaintenanceTaskType("erasure_coding"):
return OpTypeErasureCoding
case MaintenanceTaskType("vacuum"):
return OpTypeVacuum
case MaintenanceTaskType("replication"):
return OpTypeReplication
default:
// For other task types, assume they're volume operations
return OpTypeVolumeMove
}
}
// GetPendingOperations returns the pending operations tracker
func (s *MaintenanceIntegration) GetPendingOperations() *PendingOperations {
return s.pendingOperations
}
// GetActiveTopology returns the active topology for task detection
func (s *MaintenanceIntegration) GetActiveTopology() *topology.ActiveTopology {
return s.activeTopology
}
// planDestinationForTask plans the destination for a task that requires it and creates typed protobuf parameters
func (s *MaintenanceIntegration) planDestinationForTask(task *TaskDetectionResult, opType PendingOperationType) {
// Only plan destinations for operations that move volumes/shards
if opType == OpTypeVacuum {
// For vacuum tasks, create VacuumTaskParams
s.createVacuumTaskParams(task)
return
}
glog.V(1).Infof("Planning destination for %s task on volume %d (server: %s)", task.TaskType, task.VolumeID, task.Server)
// Use ActiveTopology for destination planning
destinationPlan, err := s.planDestinationWithActiveTopology(task, opType)
if err != nil {
glog.Warningf("Failed to plan primary destination for %s task volume %d: %v",
task.TaskType, task.VolumeID, err)
// Don't return here - still try to create task params which might work with multiple destinations
}
// Create typed protobuf parameters based on operation type
switch opType {
case OpTypeErasureCoding:
if destinationPlan == nil {
glog.Warningf("Cannot create EC task for volume %d: destination planning failed", task.VolumeID)
return
}
s.createErasureCodingTaskParams(task, destinationPlan)
case OpTypeVolumeMove, OpTypeVolumeBalance:
if destinationPlan == nil {
glog.Warningf("Cannot create balance task for volume %d: destination planning failed", task.VolumeID)
return
}
s.createBalanceTaskParams(task, destinationPlan.(*topology.DestinationPlan))
case OpTypeReplication:
if destinationPlan == nil {
glog.Warningf("Cannot create replication task for volume %d: destination planning failed", task.VolumeID)
return
}
s.createReplicationTaskParams(task, destinationPlan.(*topology.DestinationPlan))
default:
glog.V(2).Infof("Unknown operation type for task %s: %v", task.TaskType, opType)
}
if destinationPlan != nil {
switch plan := destinationPlan.(type) {
case *topology.DestinationPlan:
glog.V(1).Infof("Completed destination planning for %s task on volume %d: %s -> %s",
task.TaskType, task.VolumeID, task.Server, plan.TargetNode)
case *topology.MultiDestinationPlan:
glog.V(1).Infof("Completed EC destination planning for volume %d: %s -> %d destinations (racks: %d, DCs: %d)",
task.VolumeID, task.Server, len(plan.Plans), plan.SuccessfulRack, plan.SuccessfulDCs)
}
} else {
glog.V(1).Infof("Completed destination planning for %s task on volume %d: no destination planned",
task.TaskType, task.VolumeID)
}
}
// createVacuumTaskParams creates typed parameters for vacuum tasks
func (s *MaintenanceIntegration) createVacuumTaskParams(task *TaskDetectionResult) {
// Get configuration from policy instead of using hard-coded values
vacuumConfig := GetVacuumTaskConfig(s.maintenancePolicy, MaintenanceTaskType("vacuum"))
// Use configured values or defaults if config is not available
garbageThreshold := 0.3 // Default 30%
verifyChecksum := true // Default to verify
batchSize := int32(1000) // Default batch size
workingDir := "/tmp/seaweedfs_vacuum_work" // Default working directory
if vacuumConfig != nil {
garbageThreshold = vacuumConfig.GarbageThreshold
// Note: VacuumTaskConfig has GarbageThreshold, MinVolumeAgeHours, MinIntervalSeconds
// Other fields like VerifyChecksum, BatchSize, WorkingDir would need to be added
// to the protobuf definition if they should be configurable
}
// Create typed protobuf parameters
task.TypedParams = &worker_pb.TaskParams{
VolumeId: task.VolumeID,
Server: task.Server,
Collection: task.Collection,
TaskParams: &worker_pb.TaskParams_VacuumParams{
VacuumParams: &worker_pb.VacuumTaskParams{
GarbageThreshold: garbageThreshold,
ForceVacuum: false,
BatchSize: batchSize,
WorkingDir: workingDir,
VerifyChecksum: verifyChecksum,
},
},
}
}
// planDestinationWithActiveTopology uses ActiveTopology to plan destinations
func (s *MaintenanceIntegration) planDestinationWithActiveTopology(task *TaskDetectionResult, opType PendingOperationType) (interface{}, error) {
// Get source node information from topology
var sourceRack, sourceDC string
// Extract rack and DC from topology info
topologyInfo := s.activeTopology.GetTopologyInfo()
if topologyInfo != nil {
for _, dc := range topologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, dataNodeInfo := range rack.DataNodeInfos {
if dataNodeInfo.Id == task.Server {
sourceDC = dc.Id
sourceRack = rack.Id
break
}
}
if sourceRack != "" {
break
}
}
if sourceDC != "" {
break
}
}
}
switch opType {
case OpTypeVolumeBalance, OpTypeVolumeMove:
// Plan single destination for balance operation
return s.activeTopology.PlanBalanceDestination(task.VolumeID, task.Server, sourceRack, sourceDC, 0)
case OpTypeErasureCoding:
// Plan multiple destinations for EC operation using adaptive shard counts
// Start with the default configuration, but fall back to smaller configurations if insufficient disks
totalShards := s.getOptimalECShardCount()
multiPlan, err := s.activeTopology.PlanECDestinations(task.VolumeID, task.Server, sourceRack, sourceDC, totalShards)
if err != nil {
return nil, err
}
if multiPlan != nil && len(multiPlan.Plans) > 0 {
// Return the multi-destination plan for EC
return multiPlan, nil
}
return nil, fmt.Errorf("no EC destinations found")
default:
return nil, fmt.Errorf("unsupported operation type for destination planning: %v", opType)
}
}
// createErasureCodingTaskParams creates typed parameters for EC tasks
func (s *MaintenanceIntegration) createErasureCodingTaskParams(task *TaskDetectionResult, destinationPlan interface{}) {
// Determine EC shard counts based on the number of planned destinations
multiPlan, ok := destinationPlan.(*topology.MultiDestinationPlan)
if !ok {
glog.Warningf("EC task for volume %d received unexpected destination plan type", task.VolumeID)
task.TypedParams = nil
return
}
// Use adaptive shard configuration based on actual planned destinations
totalShards := len(multiPlan.Plans)
dataShards, parityShards := s.getECShardCounts(totalShards)
// Extract disk-aware destinations from the multi-destination plan
var destinations []*worker_pb.ECDestination
var allConflicts []string
for _, plan := range multiPlan.Plans {
allConflicts = append(allConflicts, plan.Conflicts...)
// Create disk-aware destination
destinations = append(destinations, &worker_pb.ECDestination{
Node: plan.TargetNode,
DiskId: plan.TargetDisk,
Rack: plan.TargetRack,
DataCenter: plan.TargetDC,
PlacementScore: plan.PlacementScore,
})
}
glog.V(1).Infof("EC destination planning for volume %d: got %d destinations (%d+%d shards) across %d racks and %d DCs",
task.VolumeID, len(destinations), dataShards, parityShards, multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs)
if len(destinations) == 0 {
glog.Warningf("No destinations available for EC task volume %d - rejecting task", task.VolumeID)
task.TypedParams = nil
return
}
// Collect existing EC shard locations for cleanup
existingShardLocations := s.collectExistingEcShardLocations(task.VolumeID)
// Create EC task parameters
ecParams := &worker_pb.ErasureCodingTaskParams{
Destinations: destinations, // Disk-aware destinations
DataShards: dataShards,
ParityShards: parityShards,
WorkingDir: "/tmp/seaweedfs_ec_work",
MasterClient: "localhost:9333",
CleanupSource: true,
ExistingShardLocations: existingShardLocations, // Pass existing shards for cleanup
}
// Add placement conflicts if any
if len(allConflicts) > 0 {
// Remove duplicates
conflictMap := make(map[string]bool)
var uniqueConflicts []string
for _, conflict := range allConflicts {
if !conflictMap[conflict] {
conflictMap[conflict] = true
uniqueConflicts = append(uniqueConflicts, conflict)
}
}
ecParams.PlacementConflicts = uniqueConflicts
}
// Wrap in TaskParams
task.TypedParams = &worker_pb.TaskParams{
VolumeId: task.VolumeID,
Server: task.Server,
Collection: task.Collection,
TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
ErasureCodingParams: ecParams,
},
}
glog.V(1).Infof("Created EC task params with %d destinations for volume %d",
len(destinations), task.VolumeID)
}
// createBalanceTaskParams creates typed parameters for balance/move tasks
func (s *MaintenanceIntegration) createBalanceTaskParams(task *TaskDetectionResult, destinationPlan *topology.DestinationPlan) {
// balanceConfig could be used for future config options like ImbalanceThreshold, MinServerCount
// Create balance task parameters
balanceParams := &worker_pb.BalanceTaskParams{
DestNode: destinationPlan.TargetNode,
EstimatedSize: destinationPlan.ExpectedSize,
DestRack: destinationPlan.TargetRack,
DestDc: destinationPlan.TargetDC,
PlacementScore: destinationPlan.PlacementScore,
ForceMove: false, // Default to false
TimeoutSeconds: 300, // Default 5 minutes
}
// Add placement conflicts if any
if len(destinationPlan.Conflicts) > 0 {
balanceParams.PlacementConflicts = destinationPlan.Conflicts
}
// Note: balanceConfig would have ImbalanceThreshold, MinServerCount if needed for future enhancements
// Wrap in TaskParams
task.TypedParams = &worker_pb.TaskParams{
VolumeId: task.VolumeID,
Server: task.Server,
Collection: task.Collection,
TaskParams: &worker_pb.TaskParams_BalanceParams{
BalanceParams: balanceParams,
},
}
glog.V(1).Infof("Created balance task params for volume %d: %s -> %s (score: %.2f)",
task.VolumeID, task.Server, destinationPlan.TargetNode, destinationPlan.PlacementScore)
}
// createReplicationTaskParams creates typed parameters for replication tasks
func (s *MaintenanceIntegration) createReplicationTaskParams(task *TaskDetectionResult, destinationPlan *topology.DestinationPlan) {
// replicationConfig could be used for future config options like TargetReplicaCount
// Create replication task parameters
replicationParams := &worker_pb.ReplicationTaskParams{
DestNode: destinationPlan.TargetNode,
DestRack: destinationPlan.TargetRack,
DestDc: destinationPlan.TargetDC,
PlacementScore: destinationPlan.PlacementScore,
}
// Add placement conflicts if any
if len(destinationPlan.Conflicts) > 0 {
replicationParams.PlacementConflicts = destinationPlan.Conflicts
}
// Note: replicationConfig would have TargetReplicaCount if needed for future enhancements
// Wrap in TaskParams
task.TypedParams = &worker_pb.TaskParams{
VolumeId: task.VolumeID,
Server: task.Server,
Collection: task.Collection,
TaskParams: &worker_pb.TaskParams_ReplicationParams{
ReplicationParams: replicationParams,
},
}
glog.V(1).Infof("Created replication task params for volume %d: %s -> %s",
task.VolumeID, task.Server, destinationPlan.TargetNode)
}
// getOptimalECShardCount returns the optimal number of EC shards based on available disks
// Uses a simplified approach to avoid blocking during UI access
func (s *MaintenanceIntegration) getOptimalECShardCount() int {
// Try to get available disks quickly, but don't block if topology is busy
availableDisks := s.getAvailableDisksQuickly()
// EC configurations in order of preference: (data+parity=total)
// Use smaller configurations for smaller clusters
if availableDisks >= 14 {
glog.V(1).Infof("Using default EC configuration: 10+4=14 shards for %d available disks", availableDisks)
return 14 // Default: 10+4
} else if availableDisks >= 6 {
glog.V(1).Infof("Using small cluster EC configuration: 4+2=6 shards for %d available disks", availableDisks)
return 6 // Small cluster: 4+2
} else if availableDisks >= 4 {
glog.V(1).Infof("Using minimal EC configuration: 3+1=4 shards for %d available disks", availableDisks)
return 4 // Minimal: 3+1
} else {
glog.V(1).Infof("Using very small cluster EC configuration: 2+1=3 shards for %d available disks", availableDisks)
return 3 // Very small: 2+1
}
}
// getAvailableDisksQuickly returns available disk count with a fast path to avoid UI blocking
func (s *MaintenanceIntegration) getAvailableDisksQuickly() int {
// Use ActiveTopology's optimized disk counting if available
// Use empty task type and node filter for general availability check
allDisks := s.activeTopology.GetAvailableDisks(topology.TaskTypeErasureCoding, "")
if len(allDisks) > 0 {
return len(allDisks)
}
// Fallback: try to count from topology but don't hold locks for too long
topologyInfo := s.activeTopology.GetTopologyInfo()
return s.countAvailableDisks(topologyInfo)
}
// countAvailableDisks counts the total number of available disks in the topology
func (s *MaintenanceIntegration) countAvailableDisks(topologyInfo *master_pb.TopologyInfo) int {
if topologyInfo == nil {
return 0
}
diskCount := 0
for _, dc := range topologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
diskCount += len(node.DiskInfos)
}
}
}
return diskCount
}
// getECShardCounts determines data and parity shard counts for a given total
func (s *MaintenanceIntegration) getECShardCounts(totalShards int) (int32, int32) {
// Map total shards to (data, parity) configurations
switch totalShards {
case 14:
return 10, 4 // Default: 10+4
case 9:
return 6, 3 // Medium: 6+3
case 6:
return 4, 2 // Small: 4+2
case 4:
return 3, 1 // Minimal: 3+1
case 3:
return 2, 1 // Very small: 2+1
default:
// For any other total, try to maintain roughly 3:1 or 4:1 ratio
if totalShards >= 4 {
parityShards := totalShards / 4
if parityShards < 1 {
parityShards = 1
}
dataShards := totalShards - parityShards
return int32(dataShards), int32(parityShards)
}
// Fallback for very small clusters
return int32(totalShards - 1), 1
}
}
// collectExistingEcShardLocations queries the master for existing EC shard locations during planning
func (s *MaintenanceIntegration) collectExistingEcShardLocations(volumeId uint32) []*worker_pb.ExistingECShardLocation {
var existingShardLocations []*worker_pb.ExistingECShardLocation
// Use insecure connection for simplicity - in production this might be configurable
grpcDialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
err := operation.WithMasterServerClient(false, pb.ServerAddress("localhost:9333"), grpcDialOption,
func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.LookupEcVolumeRequest{
VolumeId: volumeId,
}
resp, err := masterClient.LookupEcVolume(context.Background(), req)
if err != nil {
// If volume doesn't exist as EC volume, that's fine - just no existing shards
glog.V(1).Infof("LookupEcVolume for volume %d returned: %v (this is normal if no existing EC shards)", volumeId, err)
return nil
}
// Group shard locations by server
serverShardMap := make(map[string][]uint32)
for _, shardIdLocation := range resp.ShardIdLocations {
shardId := uint32(shardIdLocation.ShardId)
for _, location := range shardIdLocation.Locations {
serverAddr := pb.NewServerAddressFromLocation(location)
serverShardMap[string(serverAddr)] = append(serverShardMap[string(serverAddr)], shardId)
}
}
// Convert to protobuf format
for serverAddr, shardIds := range serverShardMap {
existingShardLocations = append(existingShardLocations, &worker_pb.ExistingECShardLocation{
Node: serverAddr,
ShardIds: shardIds,
})
}
return nil
})
if err != nil {
glog.Errorf("Failed to lookup existing EC shards from master for volume %d: %v", volumeId, err)
// Return empty list - cleanup will be skipped but task can continue
return []*worker_pb.ExistingECShardLocation{}
}
if len(existingShardLocations) > 0 {
glog.V(1).Infof("Found existing EC shards for volume %d on %d servers during planning", volumeId, len(existingShardLocations))
}
return existingShardLocations
}

View File

@@ -7,8 +7,76 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
)
// buildPolicyFromTaskConfigs loads task configurations from separate files and builds a MaintenancePolicy
func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy {
policy := &worker_pb.MaintenancePolicy{
GlobalMaxConcurrent: 4,
DefaultRepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
DefaultCheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
TaskPolicies: make(map[string]*worker_pb.TaskPolicy),
}
// Load vacuum task configuration
if vacuumConfig := vacuum.LoadConfigFromPersistence(nil); vacuumConfig != nil {
policy.TaskPolicies["vacuum"] = &worker_pb.TaskPolicy{
Enabled: vacuumConfig.Enabled,
MaxConcurrent: int32(vacuumConfig.MaxConcurrent),
RepeatIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
VacuumConfig: &worker_pb.VacuumTaskConfig{
GarbageThreshold: float64(vacuumConfig.GarbageThreshold),
MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds),
},
},
}
}
// Load erasure coding task configuration
if ecConfig := erasure_coding.LoadConfigFromPersistence(nil); ecConfig != nil {
policy.TaskPolicies["erasure_coding"] = &worker_pb.TaskPolicy{
Enabled: ecConfig.Enabled,
MaxConcurrent: int32(ecConfig.MaxConcurrent),
RepeatIntervalSeconds: int32(ecConfig.ScanIntervalSeconds),
CheckIntervalSeconds: int32(ecConfig.ScanIntervalSeconds),
TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
FullnessRatio: float64(ecConfig.FullnessRatio),
QuietForSeconds: int32(ecConfig.QuietForSeconds),
MinVolumeSizeMb: int32(ecConfig.MinSizeMB),
CollectionFilter: ecConfig.CollectionFilter,
},
},
}
}
// Load balance task configuration
if balanceConfig := balance.LoadConfigFromPersistence(nil); balanceConfig != nil {
policy.TaskPolicies["balance"] = &worker_pb.TaskPolicy{
Enabled: balanceConfig.Enabled,
MaxConcurrent: int32(balanceConfig.MaxConcurrent),
RepeatIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
CheckIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
BalanceConfig: &worker_pb.BalanceTaskConfig{
ImbalanceThreshold: float64(balanceConfig.ImbalanceThreshold),
MinServerCount: int32(balanceConfig.MinServerCount),
},
},
}
}
glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies))
return policy
}
// MaintenanceManager coordinates the maintenance system
type MaintenanceManager struct {
config *MaintenanceConfig
@@ -18,11 +86,12 @@ type MaintenanceManager struct {
running bool
stopChan chan struct{}
// Error handling and backoff
errorCount int
lastError error
lastErrorTime time.Time
backoffDelay time.Duration
mutex sync.RWMutex
errorCount int
lastError error
lastErrorTime time.Time
backoffDelay time.Duration
mutex sync.RWMutex
scanInProgress bool
}
// NewMaintenanceManager creates a new maintenance manager
@@ -31,8 +100,15 @@ func NewMaintenanceManager(adminClient AdminClient, config *MaintenanceConfig) *
config = DefaultMaintenanceConfig()
}
queue := NewMaintenanceQueue(config.Policy)
scanner := NewMaintenanceScanner(adminClient, config.Policy, queue)
// Use the policy from the config (which is populated from separate task files in LoadMaintenanceConfig)
policy := config.Policy
if policy == nil {
// Fallback: build policy from separate task configuration files if not already populated
policy = buildPolicyFromTaskConfigs()
}
queue := NewMaintenanceQueue(policy)
scanner := NewMaintenanceScanner(adminClient, policy, queue)
return &MaintenanceManager{
config: config,
@@ -125,23 +201,14 @@ func (mm *MaintenanceManager) scanLoop() {
return
case <-ticker.C:
glog.V(1).Infof("Performing maintenance scan every %v", scanInterval)
mm.performScan()
// Adjust ticker interval based on error state
mm.mutex.RLock()
currentInterval := scanInterval
if mm.errorCount > 0 {
// Use backoff delay when there are errors
currentInterval = mm.backoffDelay
if currentInterval > scanInterval {
// Don't make it longer than the configured interval * 10
maxInterval := scanInterval * 10
if currentInterval > maxInterval {
currentInterval = maxInterval
}
}
// Use the same synchronization as TriggerScan to prevent concurrent scans
if err := mm.triggerScanInternal(false); err != nil {
glog.V(1).Infof("Scheduled scan skipped: %v", err)
}
mm.mutex.RUnlock()
// Adjust ticker interval based on error state (read error state safely)
currentInterval := mm.getScanInterval(scanInterval)
// Reset ticker with new interval if needed
if currentInterval != scanInterval {
@@ -152,6 +219,26 @@ func (mm *MaintenanceManager) scanLoop() {
}
}
// getScanInterval safely reads the current scan interval with error backoff
func (mm *MaintenanceManager) getScanInterval(baseInterval time.Duration) time.Duration {
mm.mutex.RLock()
defer mm.mutex.RUnlock()
if mm.errorCount > 0 {
// Use backoff delay when there are errors
currentInterval := mm.backoffDelay
if currentInterval > baseInterval {
// Don't make it longer than the configured interval * 10
maxInterval := baseInterval * 10
if currentInterval > maxInterval {
currentInterval = maxInterval
}
}
return currentInterval
}
return baseInterval
}
// cleanupLoop periodically cleans up old tasks and stale workers
func (mm *MaintenanceManager) cleanupLoop() {
cleanupInterval := time.Duration(mm.config.CleanupIntervalSeconds) * time.Second
@@ -170,25 +257,54 @@ func (mm *MaintenanceManager) cleanupLoop() {
// performScan executes a maintenance scan with error handling and backoff
func (mm *MaintenanceManager) performScan() {
mm.mutex.Lock()
defer mm.mutex.Unlock()
defer func() {
// Always reset scan in progress flag when done
mm.mutex.Lock()
mm.scanInProgress = false
mm.mutex.Unlock()
}()
glog.V(2).Infof("Starting maintenance scan")
glog.Infof("Starting maintenance scan...")
results, err := mm.scanner.ScanForMaintenanceTasks()
if err != nil {
// Handle scan error
mm.mutex.Lock()
mm.handleScanError(err)
mm.mutex.Unlock()
glog.Warningf("Maintenance scan failed: %v", err)
return
}
// Scan succeeded, reset error tracking
mm.resetErrorTracking()
// Scan succeeded - update state and process results
mm.handleScanSuccess(results)
}
if len(results) > 0 {
// handleScanSuccess processes successful scan results with proper lock management
func (mm *MaintenanceManager) handleScanSuccess(results []*TaskDetectionResult) {
// Update manager state first
mm.mutex.Lock()
mm.resetErrorTracking()
taskCount := len(results)
mm.mutex.Unlock()
if taskCount > 0 {
// Count tasks by type for logging (outside of lock)
taskCounts := make(map[MaintenanceTaskType]int)
for _, result := range results {
taskCounts[result.TaskType]++
}
// Add tasks to queue (no manager lock held)
mm.queue.AddTasksFromResults(results)
glog.V(1).Infof("Maintenance scan completed: added %d tasks", len(results))
// Log detailed scan results
glog.Infof("Maintenance scan completed: found %d tasks", taskCount)
for taskType, count := range taskCounts {
glog.Infof(" - %s: %d tasks", taskType, count)
}
} else {
glog.V(2).Infof("Maintenance scan completed: no tasks needed")
glog.Infof("Maintenance scan completed: no maintenance tasks needed")
}
}
@@ -272,8 +388,19 @@ func (mm *MaintenanceManager) performCleanup() {
removedTasks := mm.queue.CleanupOldTasks(taskRetention)
removedWorkers := mm.queue.RemoveStaleWorkers(workerTimeout)
if removedTasks > 0 || removedWorkers > 0 {
glog.V(1).Infof("Cleanup completed: removed %d old tasks and %d stale workers", removedTasks, removedWorkers)
// Clean up stale pending operations (operations running for more than 4 hours)
staleOperationTimeout := 4 * time.Hour
removedOperations := 0
if mm.scanner != nil && mm.scanner.integration != nil {
pendingOps := mm.scanner.integration.GetPendingOperations()
if pendingOps != nil {
removedOperations = pendingOps.CleanupStaleOperations(staleOperationTimeout)
}
}
if removedTasks > 0 || removedWorkers > 0 || removedOperations > 0 {
glog.V(1).Infof("Cleanup completed: removed %d old tasks, %d stale workers, and %d stale operations",
removedTasks, removedWorkers, removedOperations)
}
}
@@ -311,6 +438,21 @@ func (mm *MaintenanceManager) GetStats() *MaintenanceStats {
return stats
}
// ReloadTaskConfigurations reloads task configurations from the current policy
func (mm *MaintenanceManager) ReloadTaskConfigurations() error {
mm.mutex.Lock()
defer mm.mutex.Unlock()
// Trigger configuration reload in the integration layer
if mm.scanner != nil && mm.scanner.integration != nil {
mm.scanner.integration.ConfigureTasksFromPolicy()
glog.V(1).Infof("Task configurations reloaded from policy")
return nil
}
return fmt.Errorf("integration not available for configuration reload")
}
// GetErrorState returns the current error state for monitoring
func (mm *MaintenanceManager) GetErrorState() (errorCount int, lastError error, backoffDelay time.Duration) {
mm.mutex.RLock()
@@ -330,10 +472,29 @@ func (mm *MaintenanceManager) GetWorkers() []*MaintenanceWorker {
// TriggerScan manually triggers a maintenance scan
func (mm *MaintenanceManager) TriggerScan() error {
return mm.triggerScanInternal(true)
}
// triggerScanInternal handles both manual and automatic scan triggers
func (mm *MaintenanceManager) triggerScanInternal(isManual bool) error {
if !mm.running {
return fmt.Errorf("maintenance manager is not running")
}
// Prevent multiple concurrent scans
mm.mutex.Lock()
if mm.scanInProgress {
mm.mutex.Unlock()
if isManual {
glog.V(1).Infof("Manual scan already in progress, ignoring trigger request")
} else {
glog.V(2).Infof("Automatic scan already in progress, ignoring scheduled scan")
}
return fmt.Errorf("scan already in progress")
}
mm.scanInProgress = true
mm.mutex.Unlock()
go mm.performScan()
return nil
}

View File

@@ -1,10 +1,13 @@
package maintenance
import (
"crypto/rand"
"fmt"
"sort"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
)
// NewMaintenanceQueue creates a new maintenance queue
@@ -24,11 +27,18 @@ func (mq *MaintenanceQueue) SetIntegration(integration *MaintenanceIntegration)
glog.V(1).Infof("Maintenance queue configured with integration")
}
// AddTask adds a new maintenance task to the queue
// AddTask adds a new maintenance task to the queue with deduplication
func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) {
mq.mutex.Lock()
defer mq.mutex.Unlock()
// Check for duplicate tasks (same type + volume + not completed)
if mq.hasDuplicateTask(task) {
glog.V(1).Infof("Task skipped (duplicate): %s for volume %d on %s (already queued or running)",
task.Type, task.VolumeID, task.Server)
return
}
task.ID = generateTaskID()
task.Status = TaskStatusPending
task.CreatedAt = time.Now()
@@ -45,19 +55,48 @@ func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) {
return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt)
})
glog.V(2).Infof("Added maintenance task %s: %s for volume %d", task.ID, task.Type, task.VolumeID)
scheduleInfo := ""
if !task.ScheduledAt.IsZero() && time.Until(task.ScheduledAt) > time.Minute {
scheduleInfo = fmt.Sprintf(", scheduled for %v", task.ScheduledAt.Format("15:04:05"))
}
glog.Infof("Task queued: %s (%s) volume %d on %s, priority %d%s, reason: %s",
task.ID, task.Type, task.VolumeID, task.Server, task.Priority, scheduleInfo, task.Reason)
}
// hasDuplicateTask checks if a similar task already exists (same type, volume, and not completed)
func (mq *MaintenanceQueue) hasDuplicateTask(newTask *MaintenanceTask) bool {
for _, existingTask := range mq.tasks {
if existingTask.Type == newTask.Type &&
existingTask.VolumeID == newTask.VolumeID &&
existingTask.Server == newTask.Server &&
(existingTask.Status == TaskStatusPending ||
existingTask.Status == TaskStatusAssigned ||
existingTask.Status == TaskStatusInProgress) {
return true
}
}
return false
}
// AddTasksFromResults converts detection results to tasks and adds them to the queue
func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult) {
for _, result := range results {
// Validate that task has proper typed parameters
if result.TypedParams == nil {
glog.Warningf("Rejecting invalid task: %s for volume %d on %s - no typed parameters (insufficient destinations or planning failed)",
result.TaskType, result.VolumeID, result.Server)
continue
}
task := &MaintenanceTask{
Type: result.TaskType,
Priority: result.Priority,
VolumeID: result.VolumeID,
Server: result.Server,
Collection: result.Collection,
Parameters: result.Parameters,
Type: result.TaskType,
Priority: result.Priority,
VolumeID: result.VolumeID,
Server: result.Server,
Collection: result.Collection,
// Copy typed protobuf parameters
TypedParams: result.TypedParams,
Reason: result.Reason,
ScheduledAt: result.ScheduleAt,
}
@@ -67,57 +106,92 @@ func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult)
// GetNextTask returns the next available task for a worker
func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask {
mq.mutex.Lock()
defer mq.mutex.Unlock()
// Use read lock for initial checks and search
mq.mutex.RLock()
worker, exists := mq.workers[workerID]
if !exists {
mq.mutex.RUnlock()
glog.V(2).Infof("Task assignment failed for worker %s: worker not registered", workerID)
return nil
}
// Check if worker has capacity
if worker.CurrentLoad >= worker.MaxConcurrent {
mq.mutex.RUnlock()
glog.V(2).Infof("Task assignment failed for worker %s: at capacity (%d/%d)", workerID, worker.CurrentLoad, worker.MaxConcurrent)
return nil
}
now := time.Now()
var selectedTask *MaintenanceTask
var selectedIndex int = -1
// Find the next suitable task
// Find the next suitable task (using read lock)
for i, task := range mq.pendingTasks {
// Check if it's time to execute the task
if task.ScheduledAt.After(now) {
glog.V(3).Infof("Task %s skipped for worker %s: scheduled for future (%v)", task.ID, workerID, task.ScheduledAt)
continue
}
// Check if worker can handle this task type
if !mq.workerCanHandle(task.Type, capabilities) {
glog.V(3).Infof("Task %s (%s) skipped for worker %s: capability mismatch (worker has: %v)", task.ID, task.Type, workerID, capabilities)
continue
}
// Check scheduling logic - use simplified system if available, otherwise fallback
// Check if this task type needs a cooldown period
if !mq.canScheduleTaskNow(task) {
glog.V(3).Infof("Task %s (%s) skipped for worker %s: scheduling constraints not met", task.ID, task.Type, workerID)
continue
}
// Assign task to worker
task.Status = TaskStatusAssigned
task.WorkerID = workerID
startTime := now
task.StartedAt = &startTime
// Remove from pending tasks
mq.pendingTasks = append(mq.pendingTasks[:i], mq.pendingTasks[i+1:]...)
// Update worker
worker.CurrentTask = task
worker.CurrentLoad++
worker.Status = "busy"
glog.V(2).Infof("Assigned task %s to worker %s", task.ID, workerID)
return task
// Found a suitable task
selectedTask = task
selectedIndex = i
break
}
return nil
// Release read lock
mq.mutex.RUnlock()
// If no task found, return nil
if selectedTask == nil {
glog.V(2).Infof("No suitable tasks available for worker %s (checked %d pending tasks)", workerID, len(mq.pendingTasks))
return nil
}
// Now acquire write lock to actually assign the task
mq.mutex.Lock()
defer mq.mutex.Unlock()
// Re-check that the task is still available (it might have been assigned to another worker)
if selectedIndex >= len(mq.pendingTasks) || mq.pendingTasks[selectedIndex].ID != selectedTask.ID {
glog.V(2).Infof("Task %s no longer available for worker %s: assigned to another worker", selectedTask.ID, workerID)
return nil
}
// Assign the task
selectedTask.Status = TaskStatusAssigned
selectedTask.WorkerID = workerID
selectedTask.StartedAt = &now
// Remove from pending tasks
mq.pendingTasks = append(mq.pendingTasks[:selectedIndex], mq.pendingTasks[selectedIndex+1:]...)
// Update worker load
if worker, exists := mq.workers[workerID]; exists {
worker.CurrentLoad++
}
// Track pending operation
mq.trackPendingOperation(selectedTask)
glog.Infof("Task assigned: %s (%s) → worker %s (volume %d, server %s)",
selectedTask.ID, selectedTask.Type, workerID, selectedTask.VolumeID, selectedTask.Server)
return selectedTask
}
// CompleteTask marks a task as completed
@@ -127,12 +201,19 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) {
task, exists := mq.tasks[taskID]
if !exists {
glog.Warningf("Attempted to complete non-existent task: %s", taskID)
return
}
completedTime := time.Now()
task.CompletedAt = &completedTime
// Calculate task duration
var duration time.Duration
if task.StartedAt != nil {
duration = completedTime.Sub(*task.StartedAt)
}
if error != "" {
task.Status = TaskStatusFailed
task.Error = error
@@ -148,14 +229,17 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) {
task.ScheduledAt = time.Now().Add(15 * time.Minute) // Retry delay
mq.pendingTasks = append(mq.pendingTasks, task)
glog.V(2).Infof("Retrying task %s (attempt %d/%d)", taskID, task.RetryCount, task.MaxRetries)
glog.Warningf("Task failed, scheduling retry: %s (%s) attempt %d/%d, worker %s, duration %v, error: %s",
taskID, task.Type, task.RetryCount, task.MaxRetries, task.WorkerID, duration, error)
} else {
glog.Errorf("Task %s failed permanently after %d retries: %s", taskID, task.MaxRetries, error)
glog.Errorf("Task failed permanently: %s (%s) worker %s, duration %v, after %d retries: %s",
taskID, task.Type, task.WorkerID, duration, task.MaxRetries, error)
}
} else {
task.Status = TaskStatusCompleted
task.Progress = 100
glog.V(2).Infof("Task %s completed successfully", taskID)
glog.Infof("Task completed: %s (%s) worker %s, duration %v, volume %d",
taskID, task.Type, task.WorkerID, duration, task.VolumeID)
}
// Update worker
@@ -168,6 +252,11 @@ func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) {
}
}
}
// Remove pending operation (unless it's being retried)
if task.Status != TaskStatusPending {
mq.removePendingOperation(taskID)
}
}
// UpdateTaskProgress updates the progress of a running task
@@ -176,8 +265,26 @@ func (mq *MaintenanceQueue) UpdateTaskProgress(taskID string, progress float64)
defer mq.mutex.RUnlock()
if task, exists := mq.tasks[taskID]; exists {
oldProgress := task.Progress
task.Progress = progress
task.Status = TaskStatusInProgress
// Update pending operation status
mq.updatePendingOperationStatus(taskID, "in_progress")
// Log progress at significant milestones or changes
if progress == 0 {
glog.V(1).Infof("Task started: %s (%s) worker %s, volume %d",
taskID, task.Type, task.WorkerID, task.VolumeID)
} else if progress >= 100 {
glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete",
taskID, task.Type, task.WorkerID, progress)
} else if progress-oldProgress >= 25 { // Log every 25% increment
glog.V(1).Infof("Task progress: %s (%s) worker %s, %.1f%% complete",
taskID, task.Type, task.WorkerID, progress)
}
} else {
glog.V(2).Infof("Progress update for unknown task: %s (%.1f%%)", taskID, progress)
}
}
@@ -186,12 +293,25 @@ func (mq *MaintenanceQueue) RegisterWorker(worker *MaintenanceWorker) {
mq.mutex.Lock()
defer mq.mutex.Unlock()
isNewWorker := true
if existingWorker, exists := mq.workers[worker.ID]; exists {
isNewWorker = false
glog.Infof("Worker reconnected: %s at %s (capabilities: %v, max concurrent: %d)",
worker.ID, worker.Address, worker.Capabilities, worker.MaxConcurrent)
// Preserve current load when reconnecting
worker.CurrentLoad = existingWorker.CurrentLoad
} else {
glog.Infof("Worker registered: %s at %s (capabilities: %v, max concurrent: %d)",
worker.ID, worker.Address, worker.Capabilities, worker.MaxConcurrent)
}
worker.LastHeartbeat = time.Now()
worker.Status = "active"
worker.CurrentLoad = 0
if isNewWorker {
worker.CurrentLoad = 0
}
mq.workers[worker.ID] = worker
glog.V(1).Infof("Registered maintenance worker %s at %s", worker.ID, worker.Address)
}
// UpdateWorkerHeartbeat updates worker heartbeat
@@ -200,7 +320,15 @@ func (mq *MaintenanceQueue) UpdateWorkerHeartbeat(workerID string) {
defer mq.mutex.Unlock()
if worker, exists := mq.workers[workerID]; exists {
lastSeen := worker.LastHeartbeat
worker.LastHeartbeat = time.Now()
// Log if worker was offline for a while
if time.Since(lastSeen) > 2*time.Minute {
glog.Infof("Worker %s heartbeat resumed after %v", workerID, time.Since(lastSeen))
}
} else {
glog.V(2).Infof("Heartbeat from unknown worker: %s", workerID)
}
}
@@ -255,7 +383,7 @@ func (mq *MaintenanceQueue) getRepeatPreventionInterval(taskType MaintenanceTask
// Fallback to policy configuration if no scheduler available or scheduler doesn't provide default
if mq.policy != nil {
repeatIntervalHours := mq.policy.GetRepeatInterval(taskType)
repeatIntervalHours := GetRepeatInterval(mq.policy, taskType)
if repeatIntervalHours > 0 {
interval := time.Duration(repeatIntervalHours) * time.Hour
glog.V(3).Infof("Using policy configuration repeat interval for %s: %v", taskType, interval)
@@ -311,10 +439,23 @@ func (mq *MaintenanceQueue) GetWorkers() []*MaintenanceWorker {
func generateTaskID() string {
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
b := make([]byte, 8)
for i := range b {
b[i] = charset[i%len(charset)]
randBytes := make([]byte, 8)
// Generate random bytes
if _, err := rand.Read(randBytes); err != nil {
// Fallback to timestamp-based ID if crypto/rand fails
timestamp := time.Now().UnixNano()
return fmt.Sprintf("task-%d", timestamp)
}
return string(b)
// Convert random bytes to charset
for i := range b {
b[i] = charset[int(randBytes[i])%len(charset)]
}
// Add timestamp suffix to ensure uniqueness
timestamp := time.Now().Unix() % 10000 // last 4 digits of timestamp
return fmt.Sprintf("%s-%04d", string(b), timestamp)
}
// CleanupOldTasks removes old completed and failed tasks
@@ -427,19 +568,31 @@ func (mq *MaintenanceQueue) workerCanHandle(taskType MaintenanceTaskType, capabi
// canScheduleTaskNow determines if a task can be scheduled using task schedulers or fallback logic
func (mq *MaintenanceQueue) canScheduleTaskNow(task *MaintenanceTask) bool {
glog.V(2).Infof("Checking if task %s (type: %s) can be scheduled", task.ID, task.Type)
// TEMPORARY FIX: Skip integration task scheduler which is being overly restrictive
// Use fallback logic directly for now
glog.V(2).Infof("Using fallback logic for task scheduling")
canExecute := mq.canExecuteTaskType(task.Type)
glog.V(2).Infof("Fallback decision for task %s: %v", task.ID, canExecute)
return canExecute
// NOTE: Original integration code disabled temporarily
// Try task scheduling logic first
if mq.integration != nil {
// Get all running tasks and available workers
runningTasks := mq.getRunningTasks()
availableWorkers := mq.getAvailableWorkers()
/*
if mq.integration != nil {
glog.Infof("DEBUG canScheduleTaskNow: Using integration task scheduler")
// Get all running tasks and available workers
runningTasks := mq.getRunningTasks()
availableWorkers := mq.getAvailableWorkers()
canSchedule := mq.integration.CanScheduleWithTaskSchedulers(task, runningTasks, availableWorkers)
glog.V(3).Infof("Task scheduler decision for task %s (%s): %v", task.ID, task.Type, canSchedule)
return canSchedule
}
glog.Infof("DEBUG canScheduleTaskNow: Running tasks: %d, Available workers: %d", len(runningTasks), len(availableWorkers))
// Fallback to hardcoded logic
return mq.canExecuteTaskType(task.Type)
canSchedule := mq.integration.CanScheduleWithTaskSchedulers(task, runningTasks, availableWorkers)
glog.Infof("DEBUG canScheduleTaskNow: Task scheduler decision for task %s (%s): %v", task.ID, task.Type, canSchedule)
return canSchedule
}
*/
}
// canExecuteTaskType checks if we can execute more tasks of this type (concurrency limits) - fallback logic
@@ -465,7 +618,7 @@ func (mq *MaintenanceQueue) getMaxConcurrentForTaskType(taskType MaintenanceTask
// Fallback to policy configuration if no scheduler available or scheduler doesn't provide default
if mq.policy != nil {
maxConcurrent := mq.policy.GetMaxConcurrent(taskType)
maxConcurrent := GetMaxConcurrent(mq.policy, taskType)
if maxConcurrent > 0 {
glog.V(3).Infof("Using policy configuration max concurrent for %s: %d", taskType, maxConcurrent)
return maxConcurrent
@@ -498,3 +651,108 @@ func (mq *MaintenanceQueue) getAvailableWorkers() []*MaintenanceWorker {
}
return availableWorkers
}
// trackPendingOperation adds a task to the pending operations tracker
func (mq *MaintenanceQueue) trackPendingOperation(task *MaintenanceTask) {
if mq.integration == nil {
return
}
pendingOps := mq.integration.GetPendingOperations()
if pendingOps == nil {
return
}
// Skip tracking for tasks without proper typed parameters
if task.TypedParams == nil {
glog.V(2).Infof("Skipping pending operation tracking for task %s - no typed parameters", task.ID)
return
}
// Map maintenance task type to pending operation type
var opType PendingOperationType
switch task.Type {
case MaintenanceTaskType("balance"):
opType = OpTypeVolumeBalance
case MaintenanceTaskType("erasure_coding"):
opType = OpTypeErasureCoding
case MaintenanceTaskType("vacuum"):
opType = OpTypeVacuum
case MaintenanceTaskType("replication"):
opType = OpTypeReplication
default:
opType = OpTypeVolumeMove
}
// Determine destination node and estimated size from typed parameters
destNode := ""
estimatedSize := uint64(1024 * 1024 * 1024) // Default 1GB estimate
switch params := task.TypedParams.TaskParams.(type) {
case *worker_pb.TaskParams_ErasureCodingParams:
if params.ErasureCodingParams != nil {
if len(params.ErasureCodingParams.Destinations) > 0 {
destNode = params.ErasureCodingParams.Destinations[0].Node
}
if params.ErasureCodingParams.EstimatedShardSize > 0 {
estimatedSize = params.ErasureCodingParams.EstimatedShardSize
}
}
case *worker_pb.TaskParams_BalanceParams:
if params.BalanceParams != nil {
destNode = params.BalanceParams.DestNode
if params.BalanceParams.EstimatedSize > 0 {
estimatedSize = params.BalanceParams.EstimatedSize
}
}
case *worker_pb.TaskParams_ReplicationParams:
if params.ReplicationParams != nil {
destNode = params.ReplicationParams.DestNode
if params.ReplicationParams.EstimatedSize > 0 {
estimatedSize = params.ReplicationParams.EstimatedSize
}
}
}
operation := &PendingOperation{
VolumeID: task.VolumeID,
OperationType: opType,
SourceNode: task.Server,
DestNode: destNode,
TaskID: task.ID,
StartTime: time.Now(),
EstimatedSize: estimatedSize,
Collection: task.Collection,
Status: "assigned",
}
pendingOps.AddOperation(operation)
}
// removePendingOperation removes a task from the pending operations tracker
func (mq *MaintenanceQueue) removePendingOperation(taskID string) {
if mq.integration == nil {
return
}
pendingOps := mq.integration.GetPendingOperations()
if pendingOps == nil {
return
}
pendingOps.RemoveOperation(taskID)
}
// updatePendingOperationStatus updates the status of a pending operation
func (mq *MaintenanceQueue) updatePendingOperationStatus(taskID string, status string) {
if mq.integration == nil {
return
}
pendingOps := mq.integration.GetPendingOperations()
if pendingOps == nil {
return
}
pendingOps.UpdateOperationStatus(taskID, status)
}

View File

@@ -0,0 +1,353 @@
package maintenance
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
)
// Test suite for canScheduleTaskNow() function and related scheduling logic
//
// This test suite ensures that:
// 1. The fallback scheduling logic works correctly when no integration is present
// 2. Task concurrency limits are properly enforced per task type
// 3. Different task types don't interfere with each other's concurrency limits
// 4. Custom policies with higher concurrency limits work correctly
// 5. Edge cases (nil tasks, empty task types) are handled gracefully
// 6. Helper functions (GetRunningTaskCount, canExecuteTaskType, etc.) work correctly
//
// Background: The canScheduleTaskNow() function is critical for task assignment.
// It was previously failing due to an overly restrictive integration scheduler,
// so we implemented a temporary fix that bypasses the integration and uses
// fallback logic based on simple concurrency limits per task type.
func TestCanScheduleTaskNow_FallbackLogic(t *testing.T) {
// Test the current implementation which uses fallback logic
mq := &MaintenanceQueue{
tasks: make(map[string]*MaintenanceTask),
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
policy: nil, // No policy for default behavior
integration: nil, // No integration to force fallback
}
task := &MaintenanceTask{
ID: "test-task-1",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusPending,
}
// Should return true with fallback logic (no running tasks, default max concurrent = 1)
result := mq.canScheduleTaskNow(task)
if !result {
t.Errorf("Expected canScheduleTaskNow to return true with fallback logic, got false")
}
}
func TestCanScheduleTaskNow_FallbackWithRunningTasks(t *testing.T) {
// Test fallback logic when there are already running tasks
mq := &MaintenanceQueue{
tasks: map[string]*MaintenanceTask{
"running-task": {
ID: "running-task",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusInProgress,
},
},
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
policy: nil,
integration: nil,
}
task := &MaintenanceTask{
ID: "test-task-2",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusPending,
}
// Should return false because max concurrent is 1 and we have 1 running task
result := mq.canScheduleTaskNow(task)
if result {
t.Errorf("Expected canScheduleTaskNow to return false when at capacity, got true")
}
}
func TestCanScheduleTaskNow_DifferentTaskTypes(t *testing.T) {
// Test that different task types don't interfere with each other
mq := &MaintenanceQueue{
tasks: map[string]*MaintenanceTask{
"running-ec-task": {
ID: "running-ec-task",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusInProgress,
},
},
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
policy: nil,
integration: nil,
}
// Test vacuum task when EC task is running
vacuumTask := &MaintenanceTask{
ID: "vacuum-task",
Type: MaintenanceTaskType("vacuum"),
Status: TaskStatusPending,
}
// Should return true because vacuum and erasure_coding are different task types
result := mq.canScheduleTaskNow(vacuumTask)
if !result {
t.Errorf("Expected canScheduleTaskNow to return true for different task type, got false")
}
// Test another EC task when one is already running
ecTask := &MaintenanceTask{
ID: "ec-task",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusPending,
}
// Should return false because max concurrent for EC is 1 and we have 1 running
result = mq.canScheduleTaskNow(ecTask)
if result {
t.Errorf("Expected canScheduleTaskNow to return false for same task type at capacity, got true")
}
}
func TestCanScheduleTaskNow_WithIntegration(t *testing.T) {
// Test with a real MaintenanceIntegration (will use fallback logic in current implementation)
policy := &MaintenancePolicy{
TaskPolicies: make(map[string]*worker_pb.TaskPolicy),
GlobalMaxConcurrent: 10,
DefaultRepeatIntervalSeconds: 24 * 60 * 60, // 24 hours in seconds
DefaultCheckIntervalSeconds: 60 * 60, // 1 hour in seconds
}
mq := NewMaintenanceQueue(policy)
// Create a basic integration (this would normally be more complex)
integration := NewMaintenanceIntegration(mq, policy)
mq.SetIntegration(integration)
task := &MaintenanceTask{
ID: "test-task-3",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusPending,
}
// With our current implementation (fallback logic), this should return true
result := mq.canScheduleTaskNow(task)
if !result {
t.Errorf("Expected canScheduleTaskNow to return true with fallback logic, got false")
}
}
func TestGetRunningTaskCount(t *testing.T) {
// Test the helper function used by fallback logic
mq := &MaintenanceQueue{
tasks: map[string]*MaintenanceTask{
"task1": {
ID: "task1",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusInProgress,
},
"task2": {
ID: "task2",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusAssigned,
},
"task3": {
ID: "task3",
Type: MaintenanceTaskType("vacuum"),
Status: TaskStatusInProgress,
},
"task4": {
ID: "task4",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusCompleted,
},
},
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
}
// Should count 2 running EC tasks (in_progress + assigned)
ecCount := mq.GetRunningTaskCount(MaintenanceTaskType("erasure_coding"))
if ecCount != 2 {
t.Errorf("Expected 2 running EC tasks, got %d", ecCount)
}
// Should count 1 running vacuum task
vacuumCount := mq.GetRunningTaskCount(MaintenanceTaskType("vacuum"))
if vacuumCount != 1 {
t.Errorf("Expected 1 running vacuum task, got %d", vacuumCount)
}
// Should count 0 running balance tasks
balanceCount := mq.GetRunningTaskCount(MaintenanceTaskType("balance"))
if balanceCount != 0 {
t.Errorf("Expected 0 running balance tasks, got %d", balanceCount)
}
}
func TestCanExecuteTaskType(t *testing.T) {
// Test the fallback logic helper function
mq := &MaintenanceQueue{
tasks: map[string]*MaintenanceTask{
"running-task": {
ID: "running-task",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusInProgress,
},
},
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
policy: nil, // Will use default max concurrent = 1
integration: nil,
}
// Should return false for EC (1 running, max = 1)
result := mq.canExecuteTaskType(MaintenanceTaskType("erasure_coding"))
if result {
t.Errorf("Expected canExecuteTaskType to return false for EC at capacity, got true")
}
// Should return true for vacuum (0 running, max = 1)
result = mq.canExecuteTaskType(MaintenanceTaskType("vacuum"))
if !result {
t.Errorf("Expected canExecuteTaskType to return true for vacuum, got false")
}
}
func TestGetMaxConcurrentForTaskType_DefaultBehavior(t *testing.T) {
// Test the default behavior when no policy or integration is set
mq := &MaintenanceQueue{
tasks: make(map[string]*MaintenanceTask),
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
policy: nil,
integration: nil,
}
// Should return default value of 1
maxConcurrent := mq.getMaxConcurrentForTaskType(MaintenanceTaskType("erasure_coding"))
if maxConcurrent != 1 {
t.Errorf("Expected default max concurrent to be 1, got %d", maxConcurrent)
}
maxConcurrent = mq.getMaxConcurrentForTaskType(MaintenanceTaskType("vacuum"))
if maxConcurrent != 1 {
t.Errorf("Expected default max concurrent to be 1, got %d", maxConcurrent)
}
}
// Test edge cases and error conditions
func TestCanScheduleTaskNow_NilTask(t *testing.T) {
mq := &MaintenanceQueue{
tasks: make(map[string]*MaintenanceTask),
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
policy: nil,
integration: nil,
}
// This should panic with a nil task, so we expect and catch the panic
defer func() {
if r := recover(); r == nil {
t.Errorf("Expected canScheduleTaskNow to panic with nil task, but it didn't")
}
}()
// This should panic
mq.canScheduleTaskNow(nil)
}
func TestCanScheduleTaskNow_EmptyTaskType(t *testing.T) {
mq := &MaintenanceQueue{
tasks: make(map[string]*MaintenanceTask),
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
policy: nil,
integration: nil,
}
task := &MaintenanceTask{
ID: "empty-type-task",
Type: MaintenanceTaskType(""), // Empty task type
Status: TaskStatusPending,
}
// Should handle empty task type gracefully
result := mq.canScheduleTaskNow(task)
if !result {
t.Errorf("Expected canScheduleTaskNow to handle empty task type, got false")
}
}
func TestCanScheduleTaskNow_WithPolicy(t *testing.T) {
// Test with a policy that allows higher concurrency
policy := &MaintenancePolicy{
TaskPolicies: map[string]*worker_pb.TaskPolicy{
string(MaintenanceTaskType("erasure_coding")): {
Enabled: true,
MaxConcurrent: 3,
RepeatIntervalSeconds: 60 * 60, // 1 hour
CheckIntervalSeconds: 60 * 60, // 1 hour
},
string(MaintenanceTaskType("vacuum")): {
Enabled: true,
MaxConcurrent: 2,
RepeatIntervalSeconds: 60 * 60, // 1 hour
CheckIntervalSeconds: 60 * 60, // 1 hour
},
},
GlobalMaxConcurrent: 10,
DefaultRepeatIntervalSeconds: 24 * 60 * 60, // 24 hours in seconds
DefaultCheckIntervalSeconds: 60 * 60, // 1 hour in seconds
}
mq := &MaintenanceQueue{
tasks: map[string]*MaintenanceTask{
"running-task-1": {
ID: "running-task-1",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusInProgress,
},
"running-task-2": {
ID: "running-task-2",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusAssigned,
},
},
pendingTasks: []*MaintenanceTask{},
workers: make(map[string]*MaintenanceWorker),
policy: policy,
integration: nil,
}
task := &MaintenanceTask{
ID: "test-task-policy",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusPending,
}
// Should return true because we have 2 running EC tasks but max is 3
result := mq.canScheduleTaskNow(task)
if !result {
t.Errorf("Expected canScheduleTaskNow to return true with policy allowing 3 concurrent, got false")
}
// Add one more running task to reach the limit
mq.tasks["running-task-3"] = &MaintenanceTask{
ID: "running-task-3",
Type: MaintenanceTaskType("erasure_coding"),
Status: TaskStatusInProgress,
}
// Should return false because we now have 3 running EC tasks (at limit)
result = mq.canScheduleTaskNow(task)
if result {
t.Errorf("Expected canScheduleTaskNow to return false when at policy limit, got true")
}
}

View File

@@ -43,7 +43,18 @@ func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult,
// Convert metrics to task system format
taskMetrics := ms.convertToTaskMetrics(volumeMetrics)
// Use task detection system
// Update topology information for complete cluster view (including empty servers)
// This must happen before task detection to ensure EC placement can consider all servers
if ms.lastTopologyInfo != nil {
if err := ms.integration.UpdateTopologyInfo(ms.lastTopologyInfo); err != nil {
glog.Errorf("Failed to update topology info for empty servers: %v", err)
// Don't fail the scan - continue with just volume-bearing servers
} else {
glog.V(1).Infof("Updated topology info for complete cluster view including empty servers")
}
}
// Use task detection system with complete cluster information
results, err := ms.integration.ScanWithTaskDetectors(taskMetrics)
if err != nil {
glog.Errorf("Task scanning failed: %v", err)
@@ -62,25 +73,60 @@ func (ms *MaintenanceScanner) ScanForMaintenanceTasks() ([]*TaskDetectionResult,
// getVolumeHealthMetrics collects health information for all volumes
func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics, error) {
var metrics []*VolumeHealthMetrics
var volumeSizeLimitMB uint64
glog.V(1).Infof("Collecting volume health metrics from master")
err := ms.adminClient.WithMasterClient(func(client master_pb.SeaweedClient) error {
// First, get volume size limit from master configuration
configResp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
glog.Warningf("Failed to get volume size limit from master: %v", err)
volumeSizeLimitMB = 30000 // Default to 30GB if we can't get from master
} else {
volumeSizeLimitMB = uint64(configResp.VolumeSizeLimitMB)
}
// Now get volume list
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
if err != nil {
return err
}
if resp.TopologyInfo == nil {
glog.Warningf("No topology info received from master")
return nil
}
volumeSizeLimitBytes := volumeSizeLimitMB * 1024 * 1024 // Convert MB to bytes
// Track all nodes discovered in topology
var allNodesInTopology []string
var nodesWithVolumes []string
var nodesWithoutVolumes []string
for _, dc := range resp.TopologyInfo.DataCenterInfos {
glog.V(2).Infof("Processing datacenter: %s", dc.Id)
for _, rack := range dc.RackInfos {
glog.V(2).Infof("Processing rack: %s in datacenter: %s", rack.Id, dc.Id)
for _, node := range rack.DataNodeInfos {
for _, diskInfo := range node.DiskInfos {
allNodesInTopology = append(allNodesInTopology, node.Id)
glog.V(2).Infof("Found volume server in topology: %s (disks: %d)", node.Id, len(node.DiskInfos))
hasVolumes := false
// Process each disk on this node
for diskType, diskInfo := range node.DiskInfos {
if len(diskInfo.VolumeInfos) > 0 {
hasVolumes = true
glog.V(2).Infof("Volume server %s disk %s has %d volumes", node.Id, diskType, len(diskInfo.VolumeInfos))
}
// Process volumes on this specific disk
for _, volInfo := range diskInfo.VolumeInfos {
metric := &VolumeHealthMetrics{
VolumeID: volInfo.Id,
Server: node.Id,
DiskType: diskType, // Track which disk this volume is on
DiskId: volInfo.DiskId, // Use disk ID from volume info
Collection: volInfo.Collection,
Size: volInfo.Size,
DeletedBytes: volInfo.DeletedByteCount,
@@ -94,31 +140,58 @@ func (ms *MaintenanceScanner) getVolumeHealthMetrics() ([]*VolumeHealthMetrics,
// Calculate derived metrics
if metric.Size > 0 {
metric.GarbageRatio = float64(metric.DeletedBytes) / float64(metric.Size)
// Calculate fullness ratio (would need volume size limit)
// metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimit)
// Calculate fullness ratio using actual volume size limit from master
metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimitBytes)
}
metric.Age = time.Since(metric.LastModified)
glog.V(3).Infof("Volume %d on %s:%s (ID %d): size=%d, limit=%d, fullness=%.2f",
metric.VolumeID, metric.Server, metric.DiskType, metric.DiskId, metric.Size, volumeSizeLimitBytes, metric.FullnessRatio)
metrics = append(metrics, metric)
}
}
if hasVolumes {
nodesWithVolumes = append(nodesWithVolumes, node.Id)
} else {
nodesWithoutVolumes = append(nodesWithoutVolumes, node.Id)
glog.V(1).Infof("Volume server %s found in topology but has no volumes", node.Id)
}
}
}
}
glog.Infof("Topology discovery complete:")
glog.Infof(" - Total volume servers in topology: %d (%v)", len(allNodesInTopology), allNodesInTopology)
glog.Infof(" - Volume servers with volumes: %d (%v)", len(nodesWithVolumes), nodesWithVolumes)
glog.Infof(" - Volume servers without volumes: %d (%v)", len(nodesWithoutVolumes), nodesWithoutVolumes)
glog.Infof("Note: Maintenance system will track empty servers separately from volume metrics.")
// Store topology info for volume shard tracker
ms.lastTopologyInfo = resp.TopologyInfo
return nil
})
if err != nil {
glog.Errorf("Failed to get volume health metrics: %v", err)
return nil, err
}
glog.V(1).Infof("Successfully collected metrics for %d actual volumes with disk ID information", len(metrics))
// Count actual replicas and identify EC volumes
ms.enrichVolumeMetrics(metrics)
return metrics, nil
}
// getTopologyInfo returns the last collected topology information
func (ms *MaintenanceScanner) getTopologyInfo() *master_pb.TopologyInfo {
return ms.lastTopologyInfo
}
// enrichVolumeMetrics adds additional information like replica counts
func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*VolumeHealthMetrics) {
// Group volumes by ID to count replicas
@@ -127,13 +200,17 @@ func (ms *MaintenanceScanner) enrichVolumeMetrics(metrics []*VolumeHealthMetrics
volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric)
}
// Update replica counts
for _, group := range volumeGroups {
actualReplicas := len(group)
for _, metric := range group {
metric.ReplicaCount = actualReplicas
// Update replica counts for actual volumes
for volumeID, replicas := range volumeGroups {
replicaCount := len(replicas)
for _, replica := range replicas {
replica.ReplicaCount = replicaCount
}
glog.V(3).Infof("Volume %d has %d replicas", volumeID, replicaCount)
}
// TODO: Identify EC volumes by checking volume structure
// This would require querying volume servers for EC shard information
}
// convertToTaskMetrics converts existing volume metrics to task system format
@@ -144,6 +221,8 @@ func (ms *MaintenanceScanner) convertToTaskMetrics(metrics []*VolumeHealthMetric
simplified = append(simplified, &types.VolumeHealthMetrics{
VolumeID: metric.VolumeID,
Server: metric.Server,
DiskType: metric.DiskType,
DiskId: metric.DiskId,
Collection: metric.Collection,
Size: metric.Size,
DeletedBytes: metric.DeletedBytes,
@@ -159,5 +238,6 @@ func (ms *MaintenanceScanner) convertToTaskMetrics(metrics []*VolumeHealthMetric
})
}
glog.V(2).Infof("Converted %d volume metrics with disk ID information for task detection", len(simplified))
return simplified
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
@@ -96,7 +97,7 @@ type MaintenanceTask struct {
VolumeID uint32 `json:"volume_id,omitempty"`
Server string `json:"server,omitempty"`
Collection string `json:"collection,omitempty"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
TypedParams *worker_pb.TaskParams `json:"typed_params,omitempty"`
Reason string `json:"reason"`
CreatedAt time.Time `json:"created_at"`
ScheduledAt time.Time `json:"scheduled_at"`
@@ -109,90 +110,149 @@ type MaintenanceTask struct {
MaxRetries int `json:"max_retries"`
}
// MaintenanceConfig holds configuration for the maintenance system
// DEPRECATED: Use worker_pb.MaintenanceConfig instead
type MaintenanceConfig = worker_pb.MaintenanceConfig
// MaintenancePolicy defines policies for maintenance operations
// DEPRECATED: Use worker_pb.MaintenancePolicy instead
type MaintenancePolicy = worker_pb.MaintenancePolicy
// TaskPolicy represents configuration for a specific task type
type TaskPolicy struct {
Enabled bool `json:"enabled"`
MaxConcurrent int `json:"max_concurrent"`
RepeatInterval int `json:"repeat_interval"` // Hours to wait before repeating
CheckInterval int `json:"check_interval"` // Hours between checks
Configuration map[string]interface{} `json:"configuration"` // Task-specific config
// DEPRECATED: Use worker_pb.TaskPolicy instead
type TaskPolicy = worker_pb.TaskPolicy
// Default configuration values
func DefaultMaintenanceConfig() *MaintenanceConfig {
return DefaultMaintenanceConfigProto()
}
// MaintenancePolicy defines policies for maintenance operations using a dynamic structure
type MaintenancePolicy struct {
// Task-specific policies mapped by task type
TaskPolicies map[MaintenanceTaskType]*TaskPolicy `json:"task_policies"`
// Policy helper functions (since we can't add methods to type aliases)
// Global policy settings
GlobalMaxConcurrent int `json:"global_max_concurrent"` // Overall limit across all task types
DefaultRepeatInterval int `json:"default_repeat_interval"` // Default hours if task doesn't specify
DefaultCheckInterval int `json:"default_check_interval"` // Default hours for periodic checks
}
// GetTaskPolicy returns the policy for a specific task type, creating generic defaults if needed
func (mp *MaintenancePolicy) GetTaskPolicy(taskType MaintenanceTaskType) *TaskPolicy {
// GetTaskPolicy returns the policy for a specific task type
func GetTaskPolicy(mp *MaintenancePolicy, taskType MaintenanceTaskType) *TaskPolicy {
if mp.TaskPolicies == nil {
mp.TaskPolicies = make(map[MaintenanceTaskType]*TaskPolicy)
return nil
}
policy, exists := mp.TaskPolicies[taskType]
if !exists {
// Create generic default policy using global settings - no hardcoded fallbacks
policy = &TaskPolicy{
Enabled: false, // Conservative default - require explicit enabling
MaxConcurrent: 1, // Conservative default concurrency
RepeatInterval: mp.DefaultRepeatInterval, // Use configured default, 0 if not set
CheckInterval: mp.DefaultCheckInterval, // Use configured default, 0 if not set
Configuration: make(map[string]interface{}),
}
mp.TaskPolicies[taskType] = policy
}
return policy
return mp.TaskPolicies[string(taskType)]
}
// SetTaskPolicy sets the policy for a specific task type
func (mp *MaintenancePolicy) SetTaskPolicy(taskType MaintenanceTaskType, policy *TaskPolicy) {
func SetTaskPolicy(mp *MaintenancePolicy, taskType MaintenanceTaskType, policy *TaskPolicy) {
if mp.TaskPolicies == nil {
mp.TaskPolicies = make(map[MaintenanceTaskType]*TaskPolicy)
mp.TaskPolicies = make(map[string]*TaskPolicy)
}
mp.TaskPolicies[taskType] = policy
mp.TaskPolicies[string(taskType)] = policy
}
// IsTaskEnabled returns whether a task type is enabled
func (mp *MaintenancePolicy) IsTaskEnabled(taskType MaintenanceTaskType) bool {
policy := mp.GetTaskPolicy(taskType)
func IsTaskEnabled(mp *MaintenancePolicy, taskType MaintenanceTaskType) bool {
policy := GetTaskPolicy(mp, taskType)
if policy == nil {
return false
}
return policy.Enabled
}
// GetMaxConcurrent returns the max concurrent limit for a task type
func (mp *MaintenancePolicy) GetMaxConcurrent(taskType MaintenanceTaskType) int {
policy := mp.GetTaskPolicy(taskType)
return policy.MaxConcurrent
func GetMaxConcurrent(mp *MaintenancePolicy, taskType MaintenanceTaskType) int {
policy := GetTaskPolicy(mp, taskType)
if policy == nil {
return 1
}
return int(policy.MaxConcurrent)
}
// GetRepeatInterval returns the repeat interval for a task type
func (mp *MaintenancePolicy) GetRepeatInterval(taskType MaintenanceTaskType) int {
policy := mp.GetTaskPolicy(taskType)
return policy.RepeatInterval
}
// GetTaskConfig returns a configuration value for a task type
func (mp *MaintenancePolicy) GetTaskConfig(taskType MaintenanceTaskType, key string) (interface{}, bool) {
policy := mp.GetTaskPolicy(taskType)
value, exists := policy.Configuration[key]
return value, exists
}
// SetTaskConfig sets a configuration value for a task type
func (mp *MaintenancePolicy) SetTaskConfig(taskType MaintenanceTaskType, key string, value interface{}) {
policy := mp.GetTaskPolicy(taskType)
if policy.Configuration == nil {
policy.Configuration = make(map[string]interface{})
func GetRepeatInterval(mp *MaintenancePolicy, taskType MaintenanceTaskType) int {
policy := GetTaskPolicy(mp, taskType)
if policy == nil {
return int(mp.DefaultRepeatIntervalSeconds)
}
policy.Configuration[key] = value
return int(policy.RepeatIntervalSeconds)
}
// GetVacuumTaskConfig returns the vacuum task configuration
func GetVacuumTaskConfig(mp *MaintenancePolicy, taskType MaintenanceTaskType) *worker_pb.VacuumTaskConfig {
policy := GetTaskPolicy(mp, taskType)
if policy == nil {
return nil
}
return policy.GetVacuumConfig()
}
// GetErasureCodingTaskConfig returns the erasure coding task configuration
func GetErasureCodingTaskConfig(mp *MaintenancePolicy, taskType MaintenanceTaskType) *worker_pb.ErasureCodingTaskConfig {
policy := GetTaskPolicy(mp, taskType)
if policy == nil {
return nil
}
return policy.GetErasureCodingConfig()
}
// GetBalanceTaskConfig returns the balance task configuration
func GetBalanceTaskConfig(mp *MaintenancePolicy, taskType MaintenanceTaskType) *worker_pb.BalanceTaskConfig {
policy := GetTaskPolicy(mp, taskType)
if policy == nil {
return nil
}
return policy.GetBalanceConfig()
}
// GetReplicationTaskConfig returns the replication task configuration
func GetReplicationTaskConfig(mp *MaintenancePolicy, taskType MaintenanceTaskType) *worker_pb.ReplicationTaskConfig {
policy := GetTaskPolicy(mp, taskType)
if policy == nil {
return nil
}
return policy.GetReplicationConfig()
}
// Note: GetTaskConfig was removed - use typed getters: GetVacuumTaskConfig, GetErasureCodingTaskConfig, GetBalanceTaskConfig, or GetReplicationTaskConfig
// SetVacuumTaskConfig sets the vacuum task configuration
func SetVacuumTaskConfig(mp *MaintenancePolicy, taskType MaintenanceTaskType, config *worker_pb.VacuumTaskConfig) {
policy := GetTaskPolicy(mp, taskType)
if policy != nil {
policy.TaskConfig = &worker_pb.TaskPolicy_VacuumConfig{
VacuumConfig: config,
}
}
}
// SetErasureCodingTaskConfig sets the erasure coding task configuration
func SetErasureCodingTaskConfig(mp *MaintenancePolicy, taskType MaintenanceTaskType, config *worker_pb.ErasureCodingTaskConfig) {
policy := GetTaskPolicy(mp, taskType)
if policy != nil {
policy.TaskConfig = &worker_pb.TaskPolicy_ErasureCodingConfig{
ErasureCodingConfig: config,
}
}
}
// SetBalanceTaskConfig sets the balance task configuration
func SetBalanceTaskConfig(mp *MaintenancePolicy, taskType MaintenanceTaskType, config *worker_pb.BalanceTaskConfig) {
policy := GetTaskPolicy(mp, taskType)
if policy != nil {
policy.TaskConfig = &worker_pb.TaskPolicy_BalanceConfig{
BalanceConfig: config,
}
}
}
// SetReplicationTaskConfig sets the replication task configuration
func SetReplicationTaskConfig(mp *MaintenancePolicy, taskType MaintenanceTaskType, config *worker_pb.ReplicationTaskConfig) {
policy := GetTaskPolicy(mp, taskType)
if policy != nil {
policy.TaskConfig = &worker_pb.TaskPolicy_ReplicationConfig{
ReplicationConfig: config,
}
}
}
// SetTaskConfig sets a configuration value for a task type (legacy method - use typed setters above)
// Note: SetTaskConfig was removed - use typed setters: SetVacuumTaskConfig, SetErasureCodingTaskConfig, SetBalanceTaskConfig, or SetReplicationTaskConfig
// MaintenanceWorker represents a worker instance
type MaintenanceWorker struct {
ID string `json:"id"`
@@ -217,29 +277,32 @@ type MaintenanceQueue struct {
// MaintenanceScanner analyzes the cluster and generates maintenance tasks
type MaintenanceScanner struct {
adminClient AdminClient
policy *MaintenancePolicy
queue *MaintenanceQueue
lastScan map[MaintenanceTaskType]time.Time
integration *MaintenanceIntegration
adminClient AdminClient
policy *MaintenancePolicy
queue *MaintenanceQueue
lastScan map[MaintenanceTaskType]time.Time
integration *MaintenanceIntegration
lastTopologyInfo *master_pb.TopologyInfo
}
// TaskDetectionResult represents the result of scanning for maintenance needs
type TaskDetectionResult struct {
TaskType MaintenanceTaskType `json:"task_type"`
VolumeID uint32 `json:"volume_id,omitempty"`
Server string `json:"server,omitempty"`
Collection string `json:"collection,omitempty"`
Priority MaintenanceTaskPriority `json:"priority"`
Reason string `json:"reason"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
ScheduleAt time.Time `json:"schedule_at"`
TaskType MaintenanceTaskType `json:"task_type"`
VolumeID uint32 `json:"volume_id,omitempty"`
Server string `json:"server,omitempty"`
Collection string `json:"collection,omitempty"`
Priority MaintenanceTaskPriority `json:"priority"`
Reason string `json:"reason"`
TypedParams *worker_pb.TaskParams `json:"typed_params,omitempty"`
ScheduleAt time.Time `json:"schedule_at"`
}
// VolumeHealthMetrics contains health information about a volume
// VolumeHealthMetrics represents the health metrics for a volume
type VolumeHealthMetrics struct {
VolumeID uint32 `json:"volume_id"`
Server string `json:"server"`
DiskType string `json:"disk_type"` // Disk type (e.g., "hdd", "ssd") or disk path (e.g., "/data1")
DiskId uint32 `json:"disk_id"` // ID of the disk in Store.Locations array
Collection string `json:"collection"`
Size uint64 `json:"size"`
DeletedBytes uint64 `json:"deleted_bytes"`
@@ -267,38 +330,6 @@ type MaintenanceStats struct {
NextScanTime time.Time `json:"next_scan_time"`
}
// MaintenanceConfig holds configuration for the maintenance system
type MaintenanceConfig struct {
Enabled bool `json:"enabled"`
ScanIntervalSeconds int `json:"scan_interval_seconds"` // How often to scan for maintenance needs (in seconds)
WorkerTimeoutSeconds int `json:"worker_timeout_seconds"` // Worker heartbeat timeout (in seconds)
TaskTimeoutSeconds int `json:"task_timeout_seconds"` // Individual task timeout (in seconds)
RetryDelaySeconds int `json:"retry_delay_seconds"` // Delay between retries (in seconds)
MaxRetries int `json:"max_retries"` // Default max retries for tasks
CleanupIntervalSeconds int `json:"cleanup_interval_seconds"` // How often to clean up old tasks (in seconds)
TaskRetentionSeconds int `json:"task_retention_seconds"` // How long to keep completed/failed tasks (in seconds)
Policy *MaintenancePolicy `json:"policy"`
}
// Default configuration values
func DefaultMaintenanceConfig() *MaintenanceConfig {
return &MaintenanceConfig{
Enabled: false, // Disabled by default for safety
ScanIntervalSeconds: 30 * 60, // 30 minutes
WorkerTimeoutSeconds: 5 * 60, // 5 minutes
TaskTimeoutSeconds: 2 * 60 * 60, // 2 hours
RetryDelaySeconds: 15 * 60, // 15 minutes
MaxRetries: 3,
CleanupIntervalSeconds: 24 * 60 * 60, // 24 hours
TaskRetentionSeconds: 7 * 24 * 60 * 60, // 7 days
Policy: &MaintenancePolicy{
GlobalMaxConcurrent: 4,
DefaultRepeatInterval: 6,
DefaultCheckInterval: 12,
},
}
}
// MaintenanceQueueData represents data for the queue visualization UI
type MaintenanceQueueData struct {
Tasks []*MaintenanceTask `json:"tasks"`
@@ -380,10 +411,10 @@ type ClusterReplicationTask struct {
// from all registered tasks using their UI providers
func BuildMaintenancePolicyFromTasks() *MaintenancePolicy {
policy := &MaintenancePolicy{
TaskPolicies: make(map[MaintenanceTaskType]*TaskPolicy),
GlobalMaxConcurrent: 4,
DefaultRepeatInterval: 6,
DefaultCheckInterval: 12,
TaskPolicies: make(map[string]*TaskPolicy),
GlobalMaxConcurrent: 4,
DefaultRepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
DefaultCheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
}
// Get all registered task types from the UI registry
@@ -399,32 +430,23 @@ func BuildMaintenancePolicyFromTasks() *MaintenancePolicy {
// Create task policy from UI configuration
taskPolicy := &TaskPolicy{
Enabled: true, // Default enabled
MaxConcurrent: 2, // Default concurrency
RepeatInterval: policy.DefaultRepeatInterval,
CheckInterval: policy.DefaultCheckInterval,
Configuration: make(map[string]interface{}),
Enabled: true, // Default enabled
MaxConcurrent: 2, // Default concurrency
RepeatIntervalSeconds: policy.DefaultRepeatIntervalSeconds,
CheckIntervalSeconds: policy.DefaultCheckIntervalSeconds,
}
// Extract configuration from UI provider's config
if configMap, ok := defaultConfig.(map[string]interface{}); ok {
// Copy all configuration values
for key, value := range configMap {
taskPolicy.Configuration[key] = value
// Extract configuration using TaskConfig interface - no more map conversions!
if taskConfig, ok := defaultConfig.(interface{ ToTaskPolicy() *worker_pb.TaskPolicy }); ok {
// Use protobuf directly for clean, type-safe config extraction
pbTaskPolicy := taskConfig.ToTaskPolicy()
taskPolicy.Enabled = pbTaskPolicy.Enabled
taskPolicy.MaxConcurrent = pbTaskPolicy.MaxConcurrent
if pbTaskPolicy.RepeatIntervalSeconds > 0 {
taskPolicy.RepeatIntervalSeconds = pbTaskPolicy.RepeatIntervalSeconds
}
// Extract common fields
if enabled, exists := configMap["enabled"]; exists {
if enabledBool, ok := enabled.(bool); ok {
taskPolicy.Enabled = enabledBool
}
}
if maxConcurrent, exists := configMap["max_concurrent"]; exists {
if maxConcurrentInt, ok := maxConcurrent.(int); ok {
taskPolicy.MaxConcurrent = maxConcurrentInt
} else if maxConcurrentFloat, ok := maxConcurrent.(float64); ok {
taskPolicy.MaxConcurrent = int(maxConcurrentFloat)
}
if pbTaskPolicy.CheckIntervalSeconds > 0 {
taskPolicy.CheckIntervalSeconds = pbTaskPolicy.CheckIntervalSeconds
}
}
@@ -432,24 +454,24 @@ func BuildMaintenancePolicyFromTasks() *MaintenancePolicy {
var scheduler types.TaskScheduler = typesRegistry.GetScheduler(taskType)
if scheduler != nil {
if taskPolicy.MaxConcurrent <= 0 {
taskPolicy.MaxConcurrent = scheduler.GetMaxConcurrent()
taskPolicy.MaxConcurrent = int32(scheduler.GetMaxConcurrent())
}
// Convert default repeat interval to hours
// Convert default repeat interval to seconds
if repeatInterval := scheduler.GetDefaultRepeatInterval(); repeatInterval > 0 {
taskPolicy.RepeatInterval = int(repeatInterval.Hours())
taskPolicy.RepeatIntervalSeconds = int32(repeatInterval.Seconds())
}
}
// Also get defaults from detector if available (using types.TaskDetector explicitly)
var detector types.TaskDetector = typesRegistry.GetDetector(taskType)
if detector != nil {
// Convert scan interval to check interval (hours)
// Convert scan interval to check interval (seconds)
if scanInterval := detector.ScanInterval(); scanInterval > 0 {
taskPolicy.CheckInterval = int(scanInterval.Hours())
taskPolicy.CheckIntervalSeconds = int32(scanInterval.Seconds())
}
}
policy.TaskPolicies[maintenanceTaskType] = taskPolicy
policy.TaskPolicies[string(maintenanceTaskType)] = taskPolicy
glog.V(3).Infof("Built policy for task type %s: enabled=%v, max_concurrent=%d",
maintenanceTaskType, taskPolicy.Enabled, taskPolicy.MaxConcurrent)
}
@@ -558,3 +580,8 @@ func BuildMaintenanceMenuItems() []*MaintenanceMenuItem {
return menuItems
}
// Helper functions to extract configuration fields
// Note: Removed getVacuumConfigField, getErasureCodingConfigField, getBalanceConfigField, getReplicationConfigField
// These were orphaned after removing GetTaskConfig - use typed getters instead

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
@@ -145,15 +146,20 @@ func NewMaintenanceWorkerService(workerID, address, adminServer string) *Mainten
func (mws *MaintenanceWorkerService) executeGenericTask(task *MaintenanceTask) error {
glog.V(2).Infof("Executing generic task %s: %s for volume %d", task.ID, task.Type, task.VolumeID)
// Validate that task has proper typed parameters
if task.TypedParams == nil {
return fmt.Errorf("task %s has no typed parameters - task was not properly planned (insufficient destinations)", task.ID)
}
// Convert MaintenanceTask to types.TaskType
taskType := types.TaskType(string(task.Type))
// Create task parameters
taskParams := types.TaskParams{
VolumeID: task.VolumeID,
Server: task.Server,
Collection: task.Collection,
Parameters: task.Parameters,
VolumeID: task.VolumeID,
Server: task.Server,
Collection: task.Collection,
TypedParams: task.TypedParams,
}
// Create task instance using the registry
@@ -396,10 +402,19 @@ func NewMaintenanceWorkerCommand(workerID, address, adminServer string) *Mainten
// Run starts the maintenance worker as a standalone service
func (mwc *MaintenanceWorkerCommand) Run() error {
// Generate worker ID if not provided
// Generate or load persistent worker ID if not provided
if mwc.workerService.workerID == "" {
hostname, _ := os.Hostname()
mwc.workerService.workerID = fmt.Sprintf("worker-%s-%d", hostname, time.Now().Unix())
// Get current working directory for worker ID persistence
wd, err := os.Getwd()
if err != nil {
return fmt.Errorf("failed to get working directory: %w", err)
}
workerID, err := worker.GenerateOrLoadWorkerID(wd)
if err != nil {
return fmt.Errorf("failed to generate or load worker ID: %w", err)
}
mwc.workerService.workerID = workerID
}
// Start the worker service

View File

@@ -0,0 +1,311 @@
package maintenance
import (
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
// PendingOperationType represents the type of pending operation
type PendingOperationType string
const (
OpTypeVolumeMove PendingOperationType = "volume_move"
OpTypeVolumeBalance PendingOperationType = "volume_balance"
OpTypeErasureCoding PendingOperationType = "erasure_coding"
OpTypeVacuum PendingOperationType = "vacuum"
OpTypeReplication PendingOperationType = "replication"
)
// PendingOperation represents a pending volume/shard operation
type PendingOperation struct {
VolumeID uint32 `json:"volume_id"`
OperationType PendingOperationType `json:"operation_type"`
SourceNode string `json:"source_node"`
DestNode string `json:"dest_node,omitempty"` // Empty for non-movement operations
TaskID string `json:"task_id"`
StartTime time.Time `json:"start_time"`
EstimatedSize uint64 `json:"estimated_size"` // Bytes
Collection string `json:"collection"`
Status string `json:"status"` // "assigned", "in_progress", "completing"
}
// PendingOperations tracks all pending volume/shard operations
type PendingOperations struct {
// Operations by volume ID for conflict detection
byVolumeID map[uint32]*PendingOperation
// Operations by task ID for updates
byTaskID map[string]*PendingOperation
// Operations by node for capacity calculations
bySourceNode map[string][]*PendingOperation
byDestNode map[string][]*PendingOperation
mutex sync.RWMutex
}
// NewPendingOperations creates a new pending operations tracker
func NewPendingOperations() *PendingOperations {
return &PendingOperations{
byVolumeID: make(map[uint32]*PendingOperation),
byTaskID: make(map[string]*PendingOperation),
bySourceNode: make(map[string][]*PendingOperation),
byDestNode: make(map[string][]*PendingOperation),
}
}
// AddOperation adds a pending operation
func (po *PendingOperations) AddOperation(op *PendingOperation) {
po.mutex.Lock()
defer po.mutex.Unlock()
// Check for existing operation on this volume
if existing, exists := po.byVolumeID[op.VolumeID]; exists {
glog.V(1).Infof("Replacing existing pending operation on volume %d: %s -> %s",
op.VolumeID, existing.TaskID, op.TaskID)
po.removeOperationUnlocked(existing)
}
// Add new operation
po.byVolumeID[op.VolumeID] = op
po.byTaskID[op.TaskID] = op
// Add to node indexes
po.bySourceNode[op.SourceNode] = append(po.bySourceNode[op.SourceNode], op)
if op.DestNode != "" {
po.byDestNode[op.DestNode] = append(po.byDestNode[op.DestNode], op)
}
glog.V(2).Infof("Added pending operation: volume %d, type %s, task %s, %s -> %s",
op.VolumeID, op.OperationType, op.TaskID, op.SourceNode, op.DestNode)
}
// RemoveOperation removes a completed operation
func (po *PendingOperations) RemoveOperation(taskID string) {
po.mutex.Lock()
defer po.mutex.Unlock()
if op, exists := po.byTaskID[taskID]; exists {
po.removeOperationUnlocked(op)
glog.V(2).Infof("Removed completed operation: volume %d, task %s", op.VolumeID, taskID)
}
}
// removeOperationUnlocked removes an operation (must hold lock)
func (po *PendingOperations) removeOperationUnlocked(op *PendingOperation) {
delete(po.byVolumeID, op.VolumeID)
delete(po.byTaskID, op.TaskID)
// Remove from source node list
if ops, exists := po.bySourceNode[op.SourceNode]; exists {
for i, other := range ops {
if other.TaskID == op.TaskID {
po.bySourceNode[op.SourceNode] = append(ops[:i], ops[i+1:]...)
break
}
}
}
// Remove from dest node list
if op.DestNode != "" {
if ops, exists := po.byDestNode[op.DestNode]; exists {
for i, other := range ops {
if other.TaskID == op.TaskID {
po.byDestNode[op.DestNode] = append(ops[:i], ops[i+1:]...)
break
}
}
}
}
}
// HasPendingOperationOnVolume checks if a volume has a pending operation
func (po *PendingOperations) HasPendingOperationOnVolume(volumeID uint32) bool {
po.mutex.RLock()
defer po.mutex.RUnlock()
_, exists := po.byVolumeID[volumeID]
return exists
}
// GetPendingOperationOnVolume returns the pending operation on a volume
func (po *PendingOperations) GetPendingOperationOnVolume(volumeID uint32) *PendingOperation {
po.mutex.RLock()
defer po.mutex.RUnlock()
return po.byVolumeID[volumeID]
}
// WouldConflictWithPending checks if a new operation would conflict with pending ones
func (po *PendingOperations) WouldConflictWithPending(volumeID uint32, opType PendingOperationType) bool {
po.mutex.RLock()
defer po.mutex.RUnlock()
if existing, exists := po.byVolumeID[volumeID]; exists {
// Volume already has a pending operation
glog.V(3).Infof("Volume %d conflict: already has %s operation (task %s)",
volumeID, existing.OperationType, existing.TaskID)
return true
}
return false
}
// GetPendingCapacityImpactForNode calculates pending capacity changes for a node
func (po *PendingOperations) GetPendingCapacityImpactForNode(nodeID string) (incoming uint64, outgoing uint64) {
po.mutex.RLock()
defer po.mutex.RUnlock()
// Calculate outgoing capacity (volumes leaving this node)
if ops, exists := po.bySourceNode[nodeID]; exists {
for _, op := range ops {
// Only count movement operations
if op.DestNode != "" {
outgoing += op.EstimatedSize
}
}
}
// Calculate incoming capacity (volumes coming to this node)
if ops, exists := po.byDestNode[nodeID]; exists {
for _, op := range ops {
incoming += op.EstimatedSize
}
}
return incoming, outgoing
}
// FilterVolumeMetricsExcludingPending filters out volumes with pending operations
func (po *PendingOperations) FilterVolumeMetricsExcludingPending(metrics []*types.VolumeHealthMetrics) []*types.VolumeHealthMetrics {
po.mutex.RLock()
defer po.mutex.RUnlock()
var filtered []*types.VolumeHealthMetrics
excludedCount := 0
for _, metric := range metrics {
if _, hasPending := po.byVolumeID[metric.VolumeID]; !hasPending {
filtered = append(filtered, metric)
} else {
excludedCount++
glog.V(3).Infof("Excluding volume %d from scan due to pending operation", metric.VolumeID)
}
}
if excludedCount > 0 {
glog.V(1).Infof("Filtered out %d volumes with pending operations from %d total volumes",
excludedCount, len(metrics))
}
return filtered
}
// GetNodeCapacityProjection calculates projected capacity for a node
func (po *PendingOperations) GetNodeCapacityProjection(nodeID string, currentUsed uint64, totalCapacity uint64) NodeCapacityProjection {
incoming, outgoing := po.GetPendingCapacityImpactForNode(nodeID)
projectedUsed := currentUsed + incoming - outgoing
projectedFree := totalCapacity - projectedUsed
return NodeCapacityProjection{
NodeID: nodeID,
CurrentUsed: currentUsed,
TotalCapacity: totalCapacity,
PendingIncoming: incoming,
PendingOutgoing: outgoing,
ProjectedUsed: projectedUsed,
ProjectedFree: projectedFree,
}
}
// GetAllPendingOperations returns all pending operations
func (po *PendingOperations) GetAllPendingOperations() []*PendingOperation {
po.mutex.RLock()
defer po.mutex.RUnlock()
var operations []*PendingOperation
for _, op := range po.byVolumeID {
operations = append(operations, op)
}
return operations
}
// UpdateOperationStatus updates the status of a pending operation
func (po *PendingOperations) UpdateOperationStatus(taskID string, status string) {
po.mutex.Lock()
defer po.mutex.Unlock()
if op, exists := po.byTaskID[taskID]; exists {
op.Status = status
glog.V(3).Infof("Updated operation status: task %s, volume %d -> %s", taskID, op.VolumeID, status)
}
}
// CleanupStaleOperations removes operations that have been running too long
func (po *PendingOperations) CleanupStaleOperations(maxAge time.Duration) int {
po.mutex.Lock()
defer po.mutex.Unlock()
cutoff := time.Now().Add(-maxAge)
var staleOps []*PendingOperation
for _, op := range po.byVolumeID {
if op.StartTime.Before(cutoff) {
staleOps = append(staleOps, op)
}
}
for _, op := range staleOps {
po.removeOperationUnlocked(op)
glog.Warningf("Removed stale pending operation: volume %d, task %s, age %v",
op.VolumeID, op.TaskID, time.Since(op.StartTime))
}
return len(staleOps)
}
// NodeCapacityProjection represents projected capacity for a node
type NodeCapacityProjection struct {
NodeID string `json:"node_id"`
CurrentUsed uint64 `json:"current_used"`
TotalCapacity uint64 `json:"total_capacity"`
PendingIncoming uint64 `json:"pending_incoming"`
PendingOutgoing uint64 `json:"pending_outgoing"`
ProjectedUsed uint64 `json:"projected_used"`
ProjectedFree uint64 `json:"projected_free"`
}
// GetStats returns statistics about pending operations
func (po *PendingOperations) GetStats() PendingOperationsStats {
po.mutex.RLock()
defer po.mutex.RUnlock()
stats := PendingOperationsStats{
TotalOperations: len(po.byVolumeID),
ByType: make(map[PendingOperationType]int),
ByStatus: make(map[string]int),
}
var totalSize uint64
for _, op := range po.byVolumeID {
stats.ByType[op.OperationType]++
stats.ByStatus[op.Status]++
totalSize += op.EstimatedSize
}
stats.TotalEstimatedSize = totalSize
return stats
}
// PendingOperationsStats provides statistics about pending operations
type PendingOperationsStats struct {
TotalOperations int `json:"total_operations"`
ByType map[PendingOperationType]int `json:"by_type"`
ByStatus map[string]int `json:"by_status"`
TotalEstimatedSize uint64 `json:"total_estimated_size"`
}

View File

@@ -0,0 +1,250 @@
package maintenance
import (
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
func TestPendingOperations_ConflictDetection(t *testing.T) {
pendingOps := NewPendingOperations()
// Add a pending erasure coding operation on volume 123
op := &PendingOperation{
VolumeID: 123,
OperationType: OpTypeErasureCoding,
SourceNode: "node1",
TaskID: "task-001",
StartTime: time.Now(),
EstimatedSize: 1024 * 1024 * 1024, // 1GB
Collection: "test",
Status: "assigned",
}
pendingOps.AddOperation(op)
// Test conflict detection
if !pendingOps.HasPendingOperationOnVolume(123) {
t.Errorf("Expected volume 123 to have pending operation")
}
if !pendingOps.WouldConflictWithPending(123, OpTypeVacuum) {
t.Errorf("Expected conflict when trying to add vacuum operation on volume 123")
}
if pendingOps.HasPendingOperationOnVolume(124) {
t.Errorf("Expected volume 124 to have no pending operation")
}
if pendingOps.WouldConflictWithPending(124, OpTypeVacuum) {
t.Errorf("Expected no conflict for volume 124")
}
}
func TestPendingOperations_CapacityProjection(t *testing.T) {
pendingOps := NewPendingOperations()
// Add operation moving volume from node1 to node2
op1 := &PendingOperation{
VolumeID: 100,
OperationType: OpTypeVolumeMove,
SourceNode: "node1",
DestNode: "node2",
TaskID: "task-001",
StartTime: time.Now(),
EstimatedSize: 2 * 1024 * 1024 * 1024, // 2GB
Collection: "test",
Status: "in_progress",
}
// Add operation moving volume from node3 to node1
op2 := &PendingOperation{
VolumeID: 101,
OperationType: OpTypeVolumeMove,
SourceNode: "node3",
DestNode: "node1",
TaskID: "task-002",
StartTime: time.Now(),
EstimatedSize: 1 * 1024 * 1024 * 1024, // 1GB
Collection: "test",
Status: "assigned",
}
pendingOps.AddOperation(op1)
pendingOps.AddOperation(op2)
// Test capacity impact for node1
incoming, outgoing := pendingOps.GetPendingCapacityImpactForNode("node1")
expectedIncoming := uint64(1 * 1024 * 1024 * 1024) // 1GB incoming
expectedOutgoing := uint64(2 * 1024 * 1024 * 1024) // 2GB outgoing
if incoming != expectedIncoming {
t.Errorf("Expected incoming capacity %d, got %d", expectedIncoming, incoming)
}
if outgoing != expectedOutgoing {
t.Errorf("Expected outgoing capacity %d, got %d", expectedOutgoing, outgoing)
}
// Test projection for node1
currentUsed := uint64(10 * 1024 * 1024 * 1024) // 10GB current
totalCapacity := uint64(50 * 1024 * 1024 * 1024) // 50GB total
projection := pendingOps.GetNodeCapacityProjection("node1", currentUsed, totalCapacity)
expectedProjectedUsed := currentUsed + incoming - outgoing // 10 + 1 - 2 = 9GB
expectedProjectedFree := totalCapacity - expectedProjectedUsed // 50 - 9 = 41GB
if projection.ProjectedUsed != expectedProjectedUsed {
t.Errorf("Expected projected used %d, got %d", expectedProjectedUsed, projection.ProjectedUsed)
}
if projection.ProjectedFree != expectedProjectedFree {
t.Errorf("Expected projected free %d, got %d", expectedProjectedFree, projection.ProjectedFree)
}
}
func TestPendingOperations_VolumeFiltering(t *testing.T) {
pendingOps := NewPendingOperations()
// Create volume metrics
metrics := []*types.VolumeHealthMetrics{
{VolumeID: 100, Server: "node1"},
{VolumeID: 101, Server: "node2"},
{VolumeID: 102, Server: "node3"},
{VolumeID: 103, Server: "node1"},
}
// Add pending operations on volumes 101 and 103
op1 := &PendingOperation{
VolumeID: 101,
OperationType: OpTypeVacuum,
SourceNode: "node2",
TaskID: "task-001",
StartTime: time.Now(),
EstimatedSize: 1024 * 1024 * 1024,
Status: "in_progress",
}
op2 := &PendingOperation{
VolumeID: 103,
OperationType: OpTypeErasureCoding,
SourceNode: "node1",
TaskID: "task-002",
StartTime: time.Now(),
EstimatedSize: 2 * 1024 * 1024 * 1024,
Status: "assigned",
}
pendingOps.AddOperation(op1)
pendingOps.AddOperation(op2)
// Filter metrics
filtered := pendingOps.FilterVolumeMetricsExcludingPending(metrics)
// Should only have volumes 100 and 102 (101 and 103 are filtered out)
if len(filtered) != 2 {
t.Errorf("Expected 2 filtered metrics, got %d", len(filtered))
}
// Check that correct volumes remain
foundVolumes := make(map[uint32]bool)
for _, metric := range filtered {
foundVolumes[metric.VolumeID] = true
}
if !foundVolumes[100] || !foundVolumes[102] {
t.Errorf("Expected volumes 100 and 102 to remain after filtering")
}
if foundVolumes[101] || foundVolumes[103] {
t.Errorf("Expected volumes 101 and 103 to be filtered out")
}
}
func TestPendingOperations_OperationLifecycle(t *testing.T) {
pendingOps := NewPendingOperations()
// Add operation
op := &PendingOperation{
VolumeID: 200,
OperationType: OpTypeVolumeBalance,
SourceNode: "node1",
DestNode: "node2",
TaskID: "task-balance-001",
StartTime: time.Now(),
EstimatedSize: 1024 * 1024 * 1024,
Status: "assigned",
}
pendingOps.AddOperation(op)
// Check it exists
if !pendingOps.HasPendingOperationOnVolume(200) {
t.Errorf("Expected volume 200 to have pending operation")
}
// Update status
pendingOps.UpdateOperationStatus("task-balance-001", "in_progress")
retrievedOp := pendingOps.GetPendingOperationOnVolume(200)
if retrievedOp == nil {
t.Errorf("Expected to retrieve pending operation for volume 200")
} else if retrievedOp.Status != "in_progress" {
t.Errorf("Expected operation status to be 'in_progress', got '%s'", retrievedOp.Status)
}
// Complete operation
pendingOps.RemoveOperation("task-balance-001")
if pendingOps.HasPendingOperationOnVolume(200) {
t.Errorf("Expected volume 200 to have no pending operation after removal")
}
}
func TestPendingOperations_StaleCleanup(t *testing.T) {
pendingOps := NewPendingOperations()
// Add recent operation
recentOp := &PendingOperation{
VolumeID: 300,
OperationType: OpTypeVacuum,
SourceNode: "node1",
TaskID: "task-recent",
StartTime: time.Now(),
EstimatedSize: 1024 * 1024 * 1024,
Status: "in_progress",
}
// Add stale operation (24 hours ago)
staleOp := &PendingOperation{
VolumeID: 301,
OperationType: OpTypeErasureCoding,
SourceNode: "node2",
TaskID: "task-stale",
StartTime: time.Now().Add(-24 * time.Hour),
EstimatedSize: 2 * 1024 * 1024 * 1024,
Status: "in_progress",
}
pendingOps.AddOperation(recentOp)
pendingOps.AddOperation(staleOp)
// Clean up operations older than 1 hour
removedCount := pendingOps.CleanupStaleOperations(1 * time.Hour)
if removedCount != 1 {
t.Errorf("Expected to remove 1 stale operation, removed %d", removedCount)
}
// Recent operation should still exist
if !pendingOps.HasPendingOperationOnVolume(300) {
t.Errorf("Expected recent operation on volume 300 to still exist")
}
// Stale operation should be removed
if pendingOps.HasPendingOperationOnVolume(301) {
t.Errorf("Expected stale operation on volume 301 to be removed")
}
}