* refactoring * add ec shard size * address comments * passing task id There seems to be a disconnect between the pending tasks created in ActiveTopology and the TaskDetectionResult returned by this function. A taskID is generated locally and used to create pending tasks via AddPendingECShardTask, but this taskID is not stored in the TaskDetectionResult or passed along in any way. This makes it impossible for the worker that eventually executes the task to know which pending task in ActiveTopology it corresponds to. Without the correct taskID, the worker cannot call AssignTask or CompleteTask on the master, breaking the entire task lifecycle and capacity management feature. A potential solution is to add a TaskID field to TaskDetectionResult and worker_pb.TaskParams, ensuring the ID is propagated from detection to execution. * 1 source multiple destinations * task supports multi source and destination * ec needs to clean up previous shards * use erasure coding constants * getPlanningCapacityUnsafe getEffectiveAvailableCapacityUnsafe should return StorageSlotChange for calculation * use CanAccommodate to calculate * remove dead code * address comments * fix Mutex Copying in Protobuf Structs * use constants * fix estimatedSize The calculation for estimatedSize only considers source.EstimatedSize and dest.StorageChange, but omits dest.EstimatedSize. The TaskDestination struct has an EstimatedSize field, which seems to be ignored here. This could lead to an incorrect estimation of the total size of data involved in tasks on a disk. The loop should probably also include estimatedSize += dest.EstimatedSize. * at.assignTaskToDisk(task) * refactoring * Update weed/admin/topology/internal.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * fail fast * fix compilation * Update weed/worker/tasks/erasure_coding/detection.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * indexes for volume and shard locations * dedup with ToVolumeSlots * return an additional boolean to indicate success, or an error * Update abstract_sql_store.go * fix * Update weed/worker/tasks/erasure_coding/detection.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/admin/topology/task_management.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * faster findVolumeDisk * Update weed/worker/tasks/erasure_coding/detection.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/admin/topology/storage_slot_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * refactor * simplify * remove unused GetDiskStorageImpact function * refactor * add comments * Update weed/admin/topology/storage_impact.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/admin/topology/storage_slot_test.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update storage_impact.go * AddPendingTask The unified AddPendingTask function now serves as the single entry point for all task creation, successfully consolidating the previously separate functions while maintaining full functionality and improving code organization. --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
254 lines
6.9 KiB
Go
254 lines
6.9 KiB
Go
package topology
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
)
|
|
|
|
// UpdateTopology updates the topology information from master
|
|
func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error {
|
|
at.mutex.Lock()
|
|
defer at.mutex.Unlock()
|
|
|
|
at.topologyInfo = topologyInfo
|
|
at.lastUpdated = time.Now()
|
|
|
|
// Rebuild structured topology
|
|
at.nodes = make(map[string]*activeNode)
|
|
at.disks = make(map[string]*activeDisk)
|
|
|
|
for _, dc := range topologyInfo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
for _, nodeInfo := range rack.DataNodeInfos {
|
|
node := &activeNode{
|
|
nodeID: nodeInfo.Id,
|
|
dataCenter: dc.Id,
|
|
rack: rack.Id,
|
|
nodeInfo: nodeInfo,
|
|
disks: make(map[uint32]*activeDisk),
|
|
}
|
|
|
|
// Add disks for this node
|
|
for diskType, diskInfo := range nodeInfo.DiskInfos {
|
|
disk := &activeDisk{
|
|
DiskInfo: &DiskInfo{
|
|
NodeID: nodeInfo.Id,
|
|
DiskID: diskInfo.DiskId,
|
|
DiskType: diskType,
|
|
DataCenter: dc.Id,
|
|
Rack: rack.Id,
|
|
DiskInfo: diskInfo,
|
|
},
|
|
}
|
|
|
|
diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
|
|
node.disks[diskInfo.DiskId] = disk
|
|
at.disks[diskKey] = disk
|
|
}
|
|
|
|
at.nodes[nodeInfo.Id] = node
|
|
}
|
|
}
|
|
}
|
|
|
|
// Rebuild performance indexes for O(1) lookups
|
|
at.rebuildIndexes()
|
|
|
|
// Reassign task states to updated topology
|
|
at.reassignTaskStates()
|
|
|
|
glog.V(1).Infof("ActiveTopology updated: %d nodes, %d disks, %d volume entries, %d EC shard entries",
|
|
len(at.nodes), len(at.disks), len(at.volumeIndex), len(at.ecShardIndex))
|
|
return nil
|
|
}
|
|
|
|
// GetAvailableDisks returns disks that can accept new tasks of the given type
|
|
// NOTE: For capacity-aware operations, prefer GetDisksWithEffectiveCapacity
|
|
func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID string) []*DiskInfo {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
var available []*DiskInfo
|
|
|
|
for _, disk := range at.disks {
|
|
if disk.NodeID == excludeNodeID {
|
|
continue // Skip excluded node
|
|
}
|
|
|
|
if at.isDiskAvailable(disk, taskType) {
|
|
// Create a copy with current load count and effective capacity
|
|
diskCopy := *disk.DiskInfo
|
|
diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
|
|
available = append(available, &diskCopy)
|
|
}
|
|
}
|
|
|
|
return available
|
|
}
|
|
|
|
// HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection)
|
|
func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
for _, task := range at.recentTasks {
|
|
if task.VolumeID == volumeID && task.TaskType == taskType {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// GetAllNodes returns information about all nodes (public interface)
|
|
func (at *ActiveTopology) GetAllNodes() map[string]*master_pb.DataNodeInfo {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
result := make(map[string]*master_pb.DataNodeInfo)
|
|
for nodeID, node := range at.nodes {
|
|
result[nodeID] = node.nodeInfo
|
|
}
|
|
return result
|
|
}
|
|
|
|
// GetTopologyInfo returns the current topology information (read-only access)
|
|
func (at *ActiveTopology) GetTopologyInfo() *master_pb.TopologyInfo {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
return at.topologyInfo
|
|
}
|
|
|
|
// GetNodeDisks returns all disks for a specific node
|
|
func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
node, exists := at.nodes[nodeID]
|
|
if !exists {
|
|
return nil
|
|
}
|
|
|
|
var disks []*DiskInfo
|
|
for _, disk := range node.disks {
|
|
diskCopy := *disk.DiskInfo
|
|
diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
|
|
disks = append(disks, &diskCopy)
|
|
}
|
|
|
|
return disks
|
|
}
|
|
|
|
// rebuildIndexes rebuilds the volume and EC shard indexes for O(1) lookups
|
|
func (at *ActiveTopology) rebuildIndexes() {
|
|
// Clear existing indexes
|
|
at.volumeIndex = make(map[uint32][]string)
|
|
at.ecShardIndex = make(map[uint32][]string)
|
|
|
|
// Rebuild indexes from current topology
|
|
for _, dc := range at.topologyInfo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
for _, nodeInfo := range rack.DataNodeInfos {
|
|
for _, diskInfo := range nodeInfo.DiskInfos {
|
|
diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
|
|
|
|
// Index volumes
|
|
for _, volumeInfo := range diskInfo.VolumeInfos {
|
|
volumeID := volumeInfo.Id
|
|
at.volumeIndex[volumeID] = append(at.volumeIndex[volumeID], diskKey)
|
|
}
|
|
|
|
// Index EC shards
|
|
for _, ecShardInfo := range diskInfo.EcShardInfos {
|
|
volumeID := ecShardInfo.Id
|
|
at.ecShardIndex[volumeID] = append(at.ecShardIndex[volumeID], diskKey)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetVolumeLocations returns the disk locations for a volume using O(1) lookup
|
|
func (at *ActiveTopology) GetVolumeLocations(volumeID uint32, collection string) []VolumeReplica {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
diskKeys, exists := at.volumeIndex[volumeID]
|
|
if !exists {
|
|
return []VolumeReplica{}
|
|
}
|
|
|
|
var replicas []VolumeReplica
|
|
for _, diskKey := range diskKeys {
|
|
if disk, diskExists := at.disks[diskKey]; diskExists {
|
|
// Verify collection matches (since index doesn't include collection)
|
|
if at.volumeMatchesCollection(disk, volumeID, collection) {
|
|
replicas = append(replicas, VolumeReplica{
|
|
ServerID: disk.NodeID,
|
|
DiskID: disk.DiskID,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
return replicas
|
|
}
|
|
|
|
// GetECShardLocations returns the disk locations for EC shards using O(1) lookup
|
|
func (at *ActiveTopology) GetECShardLocations(volumeID uint32, collection string) []VolumeReplica {
|
|
at.mutex.RLock()
|
|
defer at.mutex.RUnlock()
|
|
|
|
diskKeys, exists := at.ecShardIndex[volumeID]
|
|
if !exists {
|
|
return []VolumeReplica{}
|
|
}
|
|
|
|
var ecShards []VolumeReplica
|
|
for _, diskKey := range diskKeys {
|
|
if disk, diskExists := at.disks[diskKey]; diskExists {
|
|
// Verify collection matches (since index doesn't include collection)
|
|
if at.ecShardMatchesCollection(disk, volumeID, collection) {
|
|
ecShards = append(ecShards, VolumeReplica{
|
|
ServerID: disk.NodeID,
|
|
DiskID: disk.DiskID,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
return ecShards
|
|
}
|
|
|
|
// volumeMatchesCollection checks if a volume on a disk matches the given collection
|
|
func (at *ActiveTopology) volumeMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool {
|
|
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
|
|
return false
|
|
}
|
|
|
|
for _, volumeInfo := range disk.DiskInfo.DiskInfo.VolumeInfos {
|
|
if volumeInfo.Id == volumeID && volumeInfo.Collection == collection {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// ecShardMatchesCollection checks if EC shards on a disk match the given collection
|
|
func (at *ActiveTopology) ecShardMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool {
|
|
if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
|
|
return false
|
|
}
|
|
|
|
for _, ecShardInfo := range disk.DiskInfo.DiskInfo.EcShardInfos {
|
|
if ecShardInfo.Id == volumeID && ecShardInfo.Collection == collection {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|