* fix(admin): reduce memory usage and verbose logging for large clusters (#8919) The admin server used excessive memory and produced thousands of log lines on clusters with many volumes (e.g., 33k volumes). Three root causes: 1. Scanner duplicated all volume metrics: getVolumeHealthMetrics() created VolumeHealthMetrics objects, then convertToTaskMetrics() copied them all into identical types.VolumeHealthMetrics. Now uses the task-system type directly, eliminating the duplicate allocation and removing convertToTaskMetrics. 2. All previous task states loaded at startup: LoadTasksFromPersistence read and deserialized every .pb file from disk, logging each one. With thousands of balance tasks persisted, this caused massive startup I/O, memory usage, and log noise (including unguarded DEBUG glog.Infof per task). Now starts with an empty queue — the scanner re-detects current needs from live cluster state. Terminal tasks are purged from memory and disk when new scan results arrive. 3. Verbose per-volume/per-node logging: V(2) and V(3) logs produced thousands of lines per scan. Per-volume logs bumped to V(4), per-node/rack/disk logs bumped to V(3). Topology summary now logs counts instead of full node ID arrays. Also removes lastTopologyInfo field from MaintenanceScanner — the raw protobuf topology is returned as a local value and not retained between 30-minute scans. * fix(admin): delete stale task files at startup, add DeleteAllTaskStates Old task .pb files from previous runs were left on disk. The periodic CleanupCompletedTasks still loads all files to find completed ones — the same expensive 4GB path from the pprof profile. Now at startup, DeleteAllTaskStates removes all .pb files by scanning the directory without reading or deserializing them. The scanner will re-detect any tasks still needed from live cluster state. * fix(admin): don't persist terminal tasks to disk CompleteTask was saving failed/completed tasks to disk where they'd accumulate. The periodic cleanup only triggered for completed tasks, not failed ones. Now terminal tasks are deleted from disk immediately and only kept in memory for the current session's UI. * fix(admin): cap in-memory tasks to 100 per job type Without a limit, the task map grows unbounded — balance could create thousands of pending tasks for a cluster with many imbalanced volumes. Now AddTask rejects new tasks when a job type already has 100 in the queue. The scanner will re-detect skipped volumes on the next scan. * fix(admin): address PR review - memory-only purge, active-only capacity - purgeTerminalTasks now only cleans in-memory map (terminal tasks are already deleted from disk by CompleteTask) - Per-type capacity limit counts only active tasks (pending/assigned/ in_progress), not terminal ones - When at capacity, purge terminal tasks first before rejecting * fix(admin): fix orphaned comment, add TaskStatusCancelled to terminal switch - Move hasQueuedOrActiveTaskForVolume comment to its function definition - Add TaskStatusCancelled to the terminal state switch in CompleteTask so cancelled task files are deleted from disk
315 lines
9.6 KiB
Go
315 lines
9.6 KiB
Go
package topology
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
)
|
|
|
|
// CountTopologyResources counts datacenters, nodes, and disks in topology info
|
|
func CountTopologyResources(topologyInfo *master_pb.TopologyInfo) (dcCount, nodeCount, diskCount int) {
|
|
if topologyInfo == nil {
|
|
return 0, 0, 0
|
|
}
|
|
dcCount = len(topologyInfo.DataCenterInfos)
|
|
for _, dc := range topologyInfo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
nodeCount += len(rack.DataNodeInfos)
|
|
for _, node := range rack.DataNodeInfos {
|
|
diskCount += len(node.DiskInfos)
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// UpdateTopology updates the topology information from master
|
|
func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error {
|
|
at.mutex.Lock()
|
|
defer at.mutex.Unlock()
|
|
|
|
// Validate topology updates to prevent clearing disk maps with invalid data
|
|
if topologyInfo == nil {
|
|
glog.Warningf("UpdateTopology received nil topologyInfo, preserving last-known-good topology")
|
|
return fmt.Errorf("rejected invalid topology update: nil topologyInfo")
|
|
}
|
|
|
|
if len(topologyInfo.DataCenterInfos) == 0 {
|
|
glog.Warningf("UpdateTopology received empty DataCenterInfos, preserving last-known-good topology (had %d nodes, %d disks)",
|
|
len(at.nodes), len(at.disks))
|
|
return fmt.Errorf("rejected invalid topology update: empty DataCenterInfos (had %d nodes, %d disks)", len(at.nodes), len(at.disks))
|
|
}
|
|
|
|
// Count incoming topology for validation logging
|
|
dcCount, incomingNodes, incomingDisks := CountTopologyResources(topologyInfo)
|
|
|
|
// Reject updates that would wipe out a valid topology with an empty one (e.g. during master restart)
|
|
if incomingNodes == 0 && len(at.nodes) > 0 {
|
|
glog.Warningf("UpdateTopology received topology with 0 nodes, preserving last-known-good topology (had %d nodes, %d disks)",
|
|
len(at.nodes), len(at.disks))
|
|
return fmt.Errorf("rejected invalid topology update: 0 nodes (had %d nodes, %d disks)", len(at.nodes), len(at.disks))
|
|
}
|
|
|
|
glog.V(2).Infof("UpdateTopology: validating update with %d datacenters, %d nodes, %d disks (current: %d nodes, %d disks)",
|
|
dcCount, incomingNodes, incomingDisks, len(at.nodes), len(at.disks))
|
|
|
|
at.topologyInfo = topologyInfo
|
|
at.lastUpdated = time.Now()
|
|
|
|
// Rebuild structured topology
|
|
at.nodes = make(map[string]*activeNode)
|
|
at.disks = make(map[string]*activeDisk)
|
|
|
|
for _, dc := range topologyInfo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
for _, nodeInfo := range rack.DataNodeInfos {
|
|
node := &activeNode{
|
|
nodeID: nodeInfo.Id,
|
|
dataCenter: dc.Id,
|
|
rack: rack.Id,
|
|
nodeInfo: nodeInfo,
|
|
disks: make(map[uint32]*activeDisk),
|
|
}
|
|
|
|
// Add disks for this node
|
|
for diskType, diskInfo := range nodeInfo.DiskInfos {
|
|
disk := &activeDisk{
|
|
DiskInfo: &DiskInfo{
|
|
NodeID: nodeInfo.Id,
|
|
DiskID: diskInfo.DiskId,
|
|
DiskType: diskType,
|
|
DataCenter: dc.Id,
|
|
Rack: rack.Id,
|
|
DiskInfo: diskInfo,
|
|
},
|
|
}
|
|
|
|
diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
|
|
glog.V(3).Infof("UpdateTopology: adding disk key=%q nodeId=%q diskId=%d diskType=%q address=%q grpcPort=%d volumes=%d maxVolumes=%d",
|
|
diskKey, nodeInfo.Id, diskInfo.DiskId, diskType, nodeInfo.Address, nodeInfo.GrpcPort, diskInfo.VolumeCount, diskInfo.MaxVolumeCount)
|
|
node.disks[diskInfo.DiskId] = disk
|
|
at.disks[diskKey] = disk
|
|
}
|
|
|
|
at.nodes[nodeInfo.Id] = node
|
|
}
|
|
}
|
|
}
|
|
|
|
// Rebuild performance indexes for O(1) lookups
|
|
at.rebuildIndexes()
|
|
|
|
// Reassign task states to updated topology
|
|
at.reassignTaskStates()
|
|
|
|
glog.V(1).Infof("ActiveTopology updated: %d nodes, %d disks, %d volume entries, %d EC shard entries",
|
|
len(at.nodes), len(at.disks), len(at.volumeIndex), len(at.ecShardIndex))
|
|
return nil
|
|
}
|
|
|
|
// GetAvailableDisks returns disks that can accept new tasks of the given type
|
|
// NOTE: For capacity-aware operations, prefer GetDisksWithEffectiveCapacity
|
|
func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID string) []*DiskInfo {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
var available []*DiskInfo
|
|
|
|
for _, disk := range at.disks {
|
|
if disk.NodeID == excludeNodeID {
|
|
continue // Skip excluded node
|
|
}
|
|
|
|
if at.isDiskAvailable(disk, taskType) {
|
|
// Create a copy with current load count and effective capacity
|
|
diskCopy := *disk.DiskInfo
|
|
diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
|
|
available = append(available, &diskCopy)
|
|
}
|
|
}
|
|
|
|
return available
|
|
}
|
|
|
|
// HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection)
|
|
func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
for _, task := range at.recentTasks {
|
|
if task.VolumeID == volumeID && task.TaskType == taskType {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// GetAllNodes returns information about all nodes (public interface)
|
|
func (at *ActiveTopology) GetAllNodes() map[string]*master_pb.DataNodeInfo {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
result := make(map[string]*master_pb.DataNodeInfo)
|
|
for nodeID, node := range at.nodes {
|
|
result[nodeID] = node.nodeInfo
|
|
}
|
|
return result
|
|
}
|
|
|
|
// GetTopologyInfo returns the current topology information (read-only access)
|
|
func (at *ActiveTopology) GetTopologyInfo() *master_pb.TopologyInfo {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
return at.topologyInfo
|
|
}
|
|
|
|
// GetNodeDisks returns all disks for a specific node
|
|
func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
node, exists := at.nodes[nodeID]
|
|
if !exists {
|
|
return nil
|
|
}
|
|
|
|
var disks []*DiskInfo
|
|
for _, disk := range node.disks {
|
|
diskCopy := *disk.DiskInfo
|
|
diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
|
|
disks = append(disks, &diskCopy)
|
|
}
|
|
|
|
return disks
|
|
}
|
|
|
|
// GetDiskCount returns the total number of disks in the active topology
|
|
func (at *ActiveTopology) GetDiskCount() int {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
return len(at.disks)
|
|
}
|
|
|
|
// rebuildIndexes rebuilds the volume and EC shard indexes for O(1) lookups
|
|
func (at *ActiveTopology) rebuildIndexes() {
|
|
// Nil-safety guard: return early if topology is not valid
|
|
if at.topologyInfo == nil || at.topologyInfo.DataCenterInfos == nil {
|
|
glog.V(1).Infof("rebuildIndexes: skipping rebuild due to nil topology or DataCenterInfos")
|
|
return
|
|
}
|
|
|
|
// Clear existing indexes
|
|
at.volumeIndex = make(map[uint32][]string)
|
|
at.ecShardIndex = make(map[uint32][]string)
|
|
|
|
// Rebuild indexes from current topology
|
|
for _, dc := range at.topologyInfo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
for _, nodeInfo := range rack.DataNodeInfos {
|
|
for _, diskInfo := range nodeInfo.DiskInfos {
|
|
diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
|
|
|
|
// Index volumes
|
|
for _, volumeInfo := range diskInfo.VolumeInfos {
|
|
volumeID := volumeInfo.Id
|
|
at.volumeIndex[volumeID] = append(at.volumeIndex[volumeID], diskKey)
|
|
}
|
|
|
|
// Index EC shards
|
|
for _, ecShardInfo := range diskInfo.EcShardInfos {
|
|
volumeID := ecShardInfo.Id
|
|
at.ecShardIndex[volumeID] = append(at.ecShardIndex[volumeID], diskKey)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetVolumeLocations returns the disk locations for a volume using O(1) lookup
|
|
func (at *ActiveTopology) GetVolumeLocations(volumeID uint32, collection string) []VolumeReplica {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
diskKeys, exists := at.volumeIndex[volumeID]
|
|
if !exists {
|
|
return []VolumeReplica{}
|
|
}
|
|
|
|
var replicas []VolumeReplica
|
|
for _, diskKey := range diskKeys {
|
|
if disk, diskExists := at.disks[diskKey]; diskExists {
|
|
// Verify collection matches (since index doesn't include collection)
|
|
if at.volumeMatchesCollection(disk, volumeID, collection) {
|
|
replicas = append(replicas, VolumeReplica{
|
|
ServerID: disk.NodeID,
|
|
DiskID: disk.DiskID,
|
|
DataCenter: disk.DataCenter,
|
|
Rack: disk.Rack,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
return replicas
|
|
}
|
|
|
|
// GetECShardLocations returns the disk locations for EC shards using O(1) lookup
|
|
func (at *ActiveTopology) GetECShardLocations(volumeID uint32, collection string) []VolumeReplica {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
diskKeys, exists := at.ecShardIndex[volumeID]
|
|
if !exists {
|
|
return []VolumeReplica{}
|
|
}
|
|
|
|
var ecShards []VolumeReplica
|
|
for _, diskKey := range diskKeys {
|
|
if disk, diskExists := at.disks[diskKey]; diskExists {
|
|
// Verify collection matches (since index doesn't include collection)
|
|
if at.ecShardMatchesCollection(disk, volumeID, collection) {
|
|
ecShards = append(ecShards, VolumeReplica{
|
|
ServerID: disk.NodeID,
|
|
DiskID: disk.DiskID,
|
|
DataCenter: disk.DataCenter,
|
|
Rack: disk.Rack,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
return ecShards
|
|
}
|
|
|
|
// volumeMatchesCollection checks if a volume on a disk matches the given collection
|
|
func (at *ActiveTopology) volumeMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool {
|
|
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
|
|
return false
|
|
}
|
|
|
|
for _, volumeInfo := range disk.DiskInfo.DiskInfo.VolumeInfos {
|
|
if volumeInfo.Id == volumeID && volumeInfo.Collection == collection {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// ecShardMatchesCollection checks if EC shards on a disk match the given collection
|
|
func (at *ActiveTopology) ecShardMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool {
|
|
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
|
|
return false
|
|
}
|
|
|
|
for _, ecShardInfo := range disk.DiskInfo.DiskInfo.EcShardInfos {
|
|
if ecShardInfo.Id == volumeID && ecShardInfo.Collection == collection {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|