* Fix issue #7880: Tasks use Volume IDs instead of ip:port When volume servers are registered with custom IDs, tasks were attempting to connect using the ID instead of the actual ip:port address, causing connection failures. Modified task detection logic in balance, erasure coding, and vacuum tasks to resolve volume server IDs to their actual ip:port addresses using ActiveTopology information. * Use server addresses directly instead of translating from IDs Modified VolumeHealthMetrics to include ServerAddress field populated directly from topology DataNodeInfo.Address. Updated task detection logic to use addresses directly without runtime lookups. Changes: - Added ServerAddress field to VolumeHealthMetrics - Updated maintenance scanner to populate ServerAddress - Modified task detection to use ServerAddress for Node fields - Updated DestinationPlan to include TargetAddress - Removed runtime address lookups in favor of direct address usage * Address PR comments: add ServerAddress field, improve error handling - Add missing ServerAddress field to VolumeHealthMetrics struct - Add warning in vacuum detection when server not found in topology - Improve error handling in erasure coding to abort task if sources missing - Make vacuum task stricter by skipping if server not found in topology * Refactor: Extract common address resolution logic into shared utility - Created weed/worker/tasks/util/address.go with ResolveServerAddress function - Updated balance, erasure_coding, and vacuum detection to use the shared utility - Removed code duplication and improved maintainability - Consistent error handling across all task types * Fix critical issues in task address resolution - Vacuum: Require topology availability and fail if server not found (no fallback to ID) - Ensure all task types consistently fail early when topology is incomplete - Prevent creation of tasks that would fail due to missing server addresses * Address additional PR feedback - Add validation for empty addresses in ResolveServerAddress - Remove redundant serverAddress variable in vacuum detection - Improve robustness of address resolution * Improve error logging in vacuum detection - Include actual error details in log message for better diagnostics - Make error messages consistent with other task types
123 lines
4.6 KiB
Go
123 lines
4.6 KiB
Go
package topology
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
)
|
|
|
|
// TaskSource represents a single source in a multi-source task (for replicated volume cleanup)
|
|
type TaskSource struct {
|
|
SourceServer string `json:"source_server"`
|
|
SourceDisk uint32 `json:"source_disk"`
|
|
StorageChange StorageSlotChange `json:"storage_change"` // Storage impact on this source
|
|
EstimatedSize int64 `json:"estimated_size"` // Estimated size for this source
|
|
}
|
|
|
|
// TaskDestination represents a single destination in a multi-destination task
|
|
type TaskDestination struct {
|
|
TargetServer string `json:"target_server"`
|
|
TargetDisk uint32 `json:"target_disk"`
|
|
StorageChange StorageSlotChange `json:"storage_change"` // Storage impact on this destination
|
|
EstimatedSize int64 `json:"estimated_size"` // Estimated size for this destination
|
|
}
|
|
|
|
// taskState represents the current state of tasks affecting the topology (internal)
|
|
// Uses unified multi-source/multi-destination design:
|
|
// - Single-source tasks (balance, vacuum, replication): 1 source, 1 destination
|
|
// - Multi-source EC tasks (replicated volumes): N sources, M destinations
|
|
type taskState struct {
|
|
VolumeID uint32 `json:"volume_id"`
|
|
TaskType TaskType `json:"task_type"`
|
|
Status TaskStatus `json:"status"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
CompletedAt time.Time `json:"completed_at,omitempty"`
|
|
EstimatedSize int64 `json:"estimated_size"` // Total estimated size of task
|
|
|
|
// Unified source and destination arrays (always used)
|
|
Sources []TaskSource `json:"sources"` // Source locations (1+ for all task types)
|
|
Destinations []TaskDestination `json:"destinations"` // Destination locations (1+ for all task types)
|
|
}
|
|
|
|
// DiskInfo represents a disk with its current state and ongoing tasks (public for external access)
|
|
type DiskInfo struct {
|
|
NodeID string `json:"node_id"`
|
|
DiskID uint32 `json:"disk_id"`
|
|
DiskType string `json:"disk_type"`
|
|
DataCenter string `json:"data_center"`
|
|
Rack string `json:"rack"`
|
|
DiskInfo *master_pb.DiskInfo `json:"disk_info"`
|
|
LoadCount int `json:"load_count"` // Number of active tasks
|
|
}
|
|
|
|
// activeDisk represents internal disk state (private)
|
|
type activeDisk struct {
|
|
*DiskInfo
|
|
pendingTasks []*taskState
|
|
assignedTasks []*taskState
|
|
recentTasks []*taskState // Completed in last N seconds
|
|
}
|
|
|
|
// activeNode represents a node with its disks (private)
|
|
type activeNode struct {
|
|
nodeID string
|
|
dataCenter string
|
|
rack string
|
|
nodeInfo *master_pb.DataNodeInfo
|
|
disks map[uint32]*activeDisk // DiskID -> activeDisk
|
|
}
|
|
|
|
// ActiveTopology provides a real-time view of cluster state with task awareness
|
|
type ActiveTopology struct {
|
|
// Core topology from master
|
|
topologyInfo *master_pb.TopologyInfo
|
|
lastUpdated time.Time
|
|
|
|
// Structured topology for easy access (private)
|
|
nodes map[string]*activeNode // NodeID -> activeNode
|
|
disks map[string]*activeDisk // "NodeID:DiskID" -> activeDisk
|
|
|
|
// Performance indexes for O(1) lookups (private)
|
|
volumeIndex map[uint32][]string // VolumeID -> list of "NodeID:DiskID" where volume replicas exist
|
|
ecShardIndex map[uint32][]string // VolumeID -> list of "NodeID:DiskID" where EC shards exist
|
|
|
|
// Task states affecting the topology (private)
|
|
pendingTasks map[string]*taskState
|
|
assignedTasks map[string]*taskState
|
|
recentTasks map[string]*taskState
|
|
|
|
// Configuration
|
|
recentTaskWindowSeconds int
|
|
|
|
// Synchronization
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// DestinationPlan represents a planned destination for a volume/shard operation
|
|
type DestinationPlan struct {
|
|
TargetNode string `json:"target_node"`
|
|
TargetAddress string `json:"target_address"`
|
|
TargetDisk uint32 `json:"target_disk"`
|
|
TargetRack string `json:"target_rack"`
|
|
TargetDC string `json:"target_dc"`
|
|
ExpectedSize uint64 `json:"expected_size"`
|
|
PlacementScore float64 `json:"placement_score"`
|
|
}
|
|
|
|
// MultiDestinationPlan represents multiple planned destinations for operations like EC
|
|
type MultiDestinationPlan struct {
|
|
Plans []*DestinationPlan `json:"plans"`
|
|
TotalShards int `json:"total_shards"`
|
|
SuccessfulRack int `json:"successful_racks"`
|
|
SuccessfulDCs int `json:"successful_dcs"`
|
|
}
|
|
|
|
// VolumeReplica represents a replica location with server and disk information
|
|
type VolumeReplica struct {
|
|
ServerID string `json:"server_id"`
|
|
DiskID uint32 `json:"disk_id"`
|
|
DataCenter string `json:"data_center"`
|
|
Rack string `json:"rack"`
|
|
}
|