* chore: remove unreachable dead code across the codebase Remove ~50,000 lines of unreachable code identified by static analysis. Major removals: - weed/filer/redis_lua: entire unused Redis Lua filer store implementation - weed/wdclient/net2, resource_pool: unused connection/resource pool packages - weed/plugin/worker/lifecycle: unused lifecycle plugin worker - weed/s3api: unused S3 policy templates, presigned URL IAM, streaming copy, multipart IAM, key rotation, and various SSE helper functions - weed/mq/kafka: unused partition mapping, compression, schema, and protocol functions - weed/mq/offset: unused SQL storage and migration code - weed/worker: unused registry, task, and monitoring functions - weed/query: unused SQL engine, parquet scanner, and type functions - weed/shell: unused EC proportional rebalance functions - weed/storage/erasure_coding/distribution: unused distribution analysis functions - Individual unreachable functions removed from 150+ files across admin, credential, filer, iam, kms, mount, mq, operation, pb, s3api, server, shell, storage, topology, and util packages * fix(s3): reset shared memory store in IAM test to prevent flaky failure TestLoadIAMManagerFromConfig_EmptyConfigWithFallbackKey was flaky because the MemoryStore credential backend is a singleton registered via init(). Earlier tests that create anonymous identities pollute the shared store, causing LookupAnonymous() to unexpectedly return true. Fix by calling Reset() on the memory store before the test runs. * style: run gofmt on changed files * fix: restore KMS functions used by integration tests * fix(plugin): prevent panic on send to closed worker session channel The Plugin.sendToWorker method could panic with "send on closed channel" when a worker disconnected while a message was being sent. The race was between streamSession.close() closing the outgoing channel and sendToWorker writing to it concurrently. Add a done channel to streamSession that is closed before the outgoing channel, and check it in sendToWorker's select to safely detect closed sessions without panicking.
375 lines
12 KiB
Go
375 lines
12 KiB
Go
// Package placement provides consolidated EC shard placement logic used by
|
|
// both shell commands and worker tasks.
|
|
//
|
|
// This package encapsulates the algorithms for:
|
|
// - Selecting destination nodes/disks for EC shards
|
|
// - Ensuring proper spread across racks, servers, and disks
|
|
// - Balancing shards across the cluster
|
|
package placement
|
|
|
|
import (
|
|
"fmt"
|
|
"sort"
|
|
)
|
|
|
|
// DiskCandidate represents a disk that can receive EC shards
|
|
type DiskCandidate struct {
|
|
NodeID string
|
|
DiskID uint32
|
|
DataCenter string
|
|
Rack string
|
|
|
|
// Capacity information
|
|
VolumeCount int64
|
|
MaxVolumeCount int64
|
|
ShardCount int // Current number of EC shards on this disk
|
|
FreeSlots int // Available slots for new shards
|
|
|
|
// Load information
|
|
LoadCount int // Number of active tasks on this disk
|
|
}
|
|
|
|
// NodeCandidate represents a server node that can receive EC shards
|
|
type NodeCandidate struct {
|
|
NodeID string
|
|
DataCenter string
|
|
Rack string
|
|
FreeSlots int
|
|
ShardCount int // Total shards across all disks
|
|
Disks []*DiskCandidate // All disks on this node
|
|
}
|
|
|
|
// PlacementRequest configures EC shard placement behavior
|
|
type PlacementRequest struct {
|
|
// ShardsNeeded is the total number of shards to place
|
|
ShardsNeeded int
|
|
|
|
// MaxShardsPerServer limits how many shards can be placed on a single server
|
|
// 0 means no limit (but prefer spreading when possible)
|
|
MaxShardsPerServer int
|
|
|
|
// MaxShardsPerRack limits how many shards can be placed in a single rack
|
|
// 0 means no limit
|
|
MaxShardsPerRack int
|
|
|
|
// MaxTaskLoad is the maximum task load count for a disk to be considered
|
|
MaxTaskLoad int
|
|
|
|
// PreferDifferentServers when true, spreads shards across different servers
|
|
// before using multiple disks on the same server
|
|
PreferDifferentServers bool
|
|
|
|
// PreferDifferentRacks when true, spreads shards across different racks
|
|
// before using multiple servers in the same rack
|
|
PreferDifferentRacks bool
|
|
}
|
|
|
|
// PlacementResult contains the selected destinations for EC shards
|
|
type PlacementResult struct {
|
|
SelectedDisks []*DiskCandidate
|
|
|
|
// Statistics
|
|
ServersUsed int
|
|
RacksUsed int
|
|
DCsUsed int
|
|
|
|
// Distribution maps
|
|
ShardsPerServer map[string]int
|
|
ShardsPerRack map[string]int
|
|
ShardsPerDC map[string]int
|
|
}
|
|
|
|
// SelectDestinations selects the best disks for EC shard placement.
|
|
// This is the main entry point for EC placement logic.
|
|
//
|
|
// The algorithm works in multiple passes:
|
|
// 1. First pass: Select one disk from each rack (maximize rack diversity)
|
|
// 2. Second pass: Select one disk from each unused server in used racks (maximize server diversity)
|
|
// 3. Third pass: Select additional disks from servers already used (maximize disk diversity)
|
|
func SelectDestinations(disks []*DiskCandidate, config PlacementRequest) (*PlacementResult, error) {
|
|
if len(disks) == 0 {
|
|
return nil, fmt.Errorf("no disk candidates provided")
|
|
}
|
|
if config.ShardsNeeded <= 0 {
|
|
return nil, fmt.Errorf("shardsNeeded must be positive, got %d", config.ShardsNeeded)
|
|
}
|
|
|
|
// Filter suitable disks
|
|
suitable := filterSuitableDisks(disks, config)
|
|
if len(suitable) == 0 {
|
|
return nil, fmt.Errorf("no suitable disks found after filtering")
|
|
}
|
|
|
|
// Build indexes for efficient lookup
|
|
rackToDisks := groupDisksByRack(suitable)
|
|
|
|
result := &PlacementResult{
|
|
SelectedDisks: make([]*DiskCandidate, 0, config.ShardsNeeded),
|
|
ShardsPerServer: make(map[string]int),
|
|
ShardsPerRack: make(map[string]int),
|
|
ShardsPerDC: make(map[string]int),
|
|
}
|
|
|
|
usedDisks := make(map[string]bool) // "nodeID:diskID" -> bool
|
|
usedServers := make(map[string]bool) // nodeID -> bool
|
|
usedRacks := make(map[string]bool) // "dc:rack" -> bool
|
|
|
|
// Pass 1: Select one disk from each rack (maximize rack diversity)
|
|
if config.PreferDifferentRacks {
|
|
// Sort racks by number of available servers (descending) to prioritize racks with more options
|
|
sortedRacks := sortRacksByServerCount(rackToDisks)
|
|
for _, rackKey := range sortedRacks {
|
|
if len(result.SelectedDisks) >= config.ShardsNeeded {
|
|
break
|
|
}
|
|
rackDisks := rackToDisks[rackKey]
|
|
// Select best disk from this rack, preferring a new server
|
|
disk := selectBestDiskFromRack(rackDisks, usedServers, usedDisks, config)
|
|
if disk != nil {
|
|
addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Pass 2: Select disks from unused servers in already-used racks
|
|
if config.PreferDifferentServers && len(result.SelectedDisks) < config.ShardsNeeded {
|
|
for _, rackKey := range getSortedRackKeys(rackToDisks) {
|
|
if len(result.SelectedDisks) >= config.ShardsNeeded {
|
|
break
|
|
}
|
|
rackDisks := rackToDisks[rackKey]
|
|
for _, disk := range sortDisksByScore(rackDisks) {
|
|
if len(result.SelectedDisks) >= config.ShardsNeeded {
|
|
break
|
|
}
|
|
diskKey := getDiskKey(disk)
|
|
if usedDisks[diskKey] {
|
|
continue
|
|
}
|
|
// Skip if server already used (we want different servers in this pass)
|
|
if usedServers[disk.NodeID] {
|
|
continue
|
|
}
|
|
// Check server limit
|
|
if config.MaxShardsPerServer > 0 && result.ShardsPerServer[disk.NodeID] >= config.MaxShardsPerServer {
|
|
continue
|
|
}
|
|
// Check rack limit
|
|
if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack {
|
|
continue
|
|
}
|
|
addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Pass 3: Fill remaining slots from already-used servers (different disks)
|
|
// Use round-robin across servers to balance shards evenly
|
|
if len(result.SelectedDisks) < config.ShardsNeeded {
|
|
// Group remaining disks by server
|
|
serverToRemainingDisks := make(map[string][]*DiskCandidate)
|
|
for _, disk := range suitable {
|
|
if !usedDisks[getDiskKey(disk)] {
|
|
serverToRemainingDisks[disk.NodeID] = append(serverToRemainingDisks[disk.NodeID], disk)
|
|
}
|
|
}
|
|
|
|
// Sort each server's disks by score
|
|
for serverID := range serverToRemainingDisks {
|
|
serverToRemainingDisks[serverID] = sortDisksByScore(serverToRemainingDisks[serverID])
|
|
}
|
|
|
|
// Round-robin: repeatedly select from the server with the fewest shards
|
|
for len(result.SelectedDisks) < config.ShardsNeeded {
|
|
// Find server with fewest shards that still has available disks
|
|
var bestServer string
|
|
minShards := -1
|
|
for serverID, disks := range serverToRemainingDisks {
|
|
if len(disks) == 0 {
|
|
continue
|
|
}
|
|
// Check server limit
|
|
if config.MaxShardsPerServer > 0 && result.ShardsPerServer[serverID] >= config.MaxShardsPerServer {
|
|
continue
|
|
}
|
|
shardCount := result.ShardsPerServer[serverID]
|
|
if minShards == -1 || shardCount < minShards {
|
|
minShards = shardCount
|
|
bestServer = serverID
|
|
} else if shardCount == minShards && serverID < bestServer {
|
|
// Tie-break by server name for determinism
|
|
bestServer = serverID
|
|
}
|
|
}
|
|
|
|
if bestServer == "" {
|
|
// No more servers with available disks
|
|
break
|
|
}
|
|
|
|
// Pop the best disk from this server
|
|
disks := serverToRemainingDisks[bestServer]
|
|
disk := disks[0]
|
|
serverToRemainingDisks[bestServer] = disks[1:]
|
|
|
|
// Check rack limit
|
|
if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack {
|
|
continue
|
|
}
|
|
|
|
addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
|
|
}
|
|
}
|
|
|
|
// Calculate final statistics
|
|
result.ServersUsed = len(usedServers)
|
|
result.RacksUsed = len(usedRacks)
|
|
dcSet := make(map[string]bool)
|
|
for _, disk := range result.SelectedDisks {
|
|
dcSet[disk.DataCenter] = true
|
|
}
|
|
result.DCsUsed = len(dcSet)
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// filterSuitableDisks filters disks that are suitable for EC placement
|
|
func filterSuitableDisks(disks []*DiskCandidate, config PlacementRequest) []*DiskCandidate {
|
|
var suitable []*DiskCandidate
|
|
for _, disk := range disks {
|
|
if disk.FreeSlots <= 0 {
|
|
continue
|
|
}
|
|
if config.MaxTaskLoad > 0 && disk.LoadCount > config.MaxTaskLoad {
|
|
continue
|
|
}
|
|
suitable = append(suitable, disk)
|
|
}
|
|
return suitable
|
|
}
|
|
|
|
// groupDisksByRack groups disks by their rack (dc:rack key)
|
|
func groupDisksByRack(disks []*DiskCandidate) map[string][]*DiskCandidate {
|
|
result := make(map[string][]*DiskCandidate)
|
|
for _, disk := range disks {
|
|
key := getRackKey(disk)
|
|
result[key] = append(result[key], disk)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// getRackKey returns the unique key for a rack (dc:rack)
|
|
func getRackKey(disk *DiskCandidate) string {
|
|
return fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
|
|
}
|
|
|
|
// getDiskKey returns the unique key for a disk (nodeID:diskID)
|
|
func getDiskKey(disk *DiskCandidate) string {
|
|
return fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID)
|
|
}
|
|
|
|
// sortRacksByServerCount returns rack keys sorted by number of servers (ascending)
|
|
func sortRacksByServerCount(rackToDisks map[string][]*DiskCandidate) []string {
|
|
// Count unique servers per rack
|
|
rackServerCount := make(map[string]int)
|
|
for rackKey, disks := range rackToDisks {
|
|
servers := make(map[string]bool)
|
|
for _, disk := range disks {
|
|
servers[disk.NodeID] = true
|
|
}
|
|
rackServerCount[rackKey] = len(servers)
|
|
}
|
|
|
|
keys := getSortedRackKeys(rackToDisks)
|
|
sort.Slice(keys, func(i, j int) bool {
|
|
// Sort by server count (descending) to pick from racks with more options first
|
|
return rackServerCount[keys[i]] > rackServerCount[keys[j]]
|
|
})
|
|
return keys
|
|
}
|
|
|
|
// getSortedRackKeys returns rack keys in a deterministic order
|
|
func getSortedRackKeys(rackToDisks map[string][]*DiskCandidate) []string {
|
|
keys := make([]string, 0, len(rackToDisks))
|
|
for k := range rackToDisks {
|
|
keys = append(keys, k)
|
|
}
|
|
sort.Strings(keys)
|
|
return keys
|
|
}
|
|
|
|
// selectBestDiskFromRack selects the best disk from a rack for EC placement
|
|
// It prefers servers that haven't been used yet
|
|
func selectBestDiskFromRack(disks []*DiskCandidate, usedServers, usedDisks map[string]bool, config PlacementRequest) *DiskCandidate {
|
|
var bestDisk *DiskCandidate
|
|
bestScore := -1.0
|
|
bestIsFromUnusedServer := false
|
|
|
|
for _, disk := range disks {
|
|
if usedDisks[getDiskKey(disk)] {
|
|
continue
|
|
}
|
|
isFromUnusedServer := !usedServers[disk.NodeID]
|
|
score := calculateDiskScore(disk)
|
|
|
|
// Prefer unused servers
|
|
if isFromUnusedServer && !bestIsFromUnusedServer {
|
|
bestDisk = disk
|
|
bestScore = score
|
|
bestIsFromUnusedServer = true
|
|
} else if isFromUnusedServer == bestIsFromUnusedServer && score > bestScore {
|
|
bestDisk = disk
|
|
bestScore = score
|
|
}
|
|
}
|
|
|
|
return bestDisk
|
|
}
|
|
|
|
// sortDisksByScore returns disks sorted by score (best first)
|
|
func sortDisksByScore(disks []*DiskCandidate) []*DiskCandidate {
|
|
sorted := make([]*DiskCandidate, len(disks))
|
|
copy(sorted, disks)
|
|
sort.Slice(sorted, func(i, j int) bool {
|
|
return calculateDiskScore(sorted[i]) > calculateDiskScore(sorted[j])
|
|
})
|
|
return sorted
|
|
}
|
|
|
|
// calculateDiskScore calculates a score for a disk candidate
|
|
// Higher score is better
|
|
func calculateDiskScore(disk *DiskCandidate) float64 {
|
|
score := 0.0
|
|
|
|
// Primary factor: available capacity (lower utilization is better)
|
|
if disk.MaxVolumeCount > 0 {
|
|
utilization := float64(disk.VolumeCount) / float64(disk.MaxVolumeCount)
|
|
score += (1.0 - utilization) * 60.0 // Up to 60 points
|
|
} else {
|
|
score += 30.0 // Default if no max count
|
|
}
|
|
|
|
// Secondary factor: fewer shards already on this disk is better
|
|
score += float64(10-disk.ShardCount) * 2.0 // Up to 20 points
|
|
|
|
// Tertiary factor: lower load is better
|
|
score += float64(10 - disk.LoadCount) // Up to 10 points
|
|
|
|
return score
|
|
}
|
|
|
|
// addDiskToResult adds a disk to the result and updates tracking maps
|
|
func addDiskToResult(result *PlacementResult, disk *DiskCandidate,
|
|
usedDisks, usedServers, usedRacks map[string]bool) {
|
|
diskKey := getDiskKey(disk)
|
|
rackKey := getRackKey(disk)
|
|
|
|
result.SelectedDisks = append(result.SelectedDisks, disk)
|
|
usedDisks[diskKey] = true
|
|
usedServers[disk.NodeID] = true
|
|
usedRacks[rackKey] = true
|
|
result.ShardsPerServer[disk.NodeID]++
|
|
result.ShardsPerRack[rackKey]++
|
|
result.ShardsPerDC[disk.DataCenter]++
|
|
}
|