diff --git a/weed/worker/tasks/balance/detection_integration_test.go b/weed/worker/tasks/balance/detection_integration_test.go new file mode 100644 index 000000000..94dcc0965 --- /dev/null +++ b/weed/worker/tasks/balance/detection_integration_test.go @@ -0,0 +1,509 @@ +package balance + +import ( + "strings" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +const fullnessThreshold = 1.01 + +// filterByState filters metrics by volume state for testing. +func filterByState(metrics []*types.VolumeHealthMetrics, state string) []*types.VolumeHealthMetrics { + if state != "ACTIVE" && state != "FULL" { + return metrics + } + var out []*types.VolumeHealthMetrics + for _, m := range metrics { + if m == nil { + continue + } + if state == "ACTIVE" && m.FullnessRatio < fullnessThreshold { + out = append(out, m) + } + if state == "FULL" && m.FullnessRatio >= fullnessThreshold { + out = append(out, m) + } + } + return out +} + +// Integration tests that exercise multiple features together: +// DC/rack/node filters, volume state filtering, replica placement validation, +// and collection scoping all interacting within a single detection run. + +// makeVolumesWithOptions generates metrics with additional options. +type volumeOption func(m *types.VolumeHealthMetrics) + +func withFullness(ratio float64) volumeOption { + return func(m *types.VolumeHealthMetrics) { m.FullnessRatio = ratio } +} + +func withReplicas(rp int) volumeOption { + return func(m *types.VolumeHealthMetrics) { m.ExpectedReplicas = rp } +} + +func makeVolumesWith(server, diskType, dc, rack, collection string, volumeIDBase uint32, n int, opts ...volumeOption) []*types.VolumeHealthMetrics { + vols := makeVolumes(server, diskType, dc, rack, collection, volumeIDBase, n) + for _, v := range vols { + for _, opt := range opts { + opt(v) + } + } + return vols +} + +// buildReplicaMap builds a VolumeReplicaMap from metrics (each metric is one replica location). +func buildReplicaMap(metrics []*types.VolumeHealthMetrics) map[uint32][]types.ReplicaLocation { + m := make(map[uint32][]types.ReplicaLocation) + for _, metric := range metrics { + m[metric.VolumeID] = append(m[metric.VolumeID], types.ReplicaLocation{ + DataCenter: metric.DataCenter, + Rack: metric.Rack, + NodeID: metric.Server, + }) + } + return m +} + +// TestIntegration_DCFilterWithVolumeState tests that DC filtering and volume +// state filtering compose correctly: only ACTIVE volumes in the specified DC +// participate in balancing. +func TestIntegration_DCFilterWithVolumeState(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc2", rack: "rack1"}, + } + + var allMetrics []*types.VolumeHealthMetrics + // dc1: node-a has 40 active volumes, node-b has 10 active volumes + allMetrics = append(allMetrics, makeVolumesWith("node-a", "hdd", "dc1", "rack1", "c1", 1, 40, withFullness(0.5))...) + allMetrics = append(allMetrics, makeVolumesWith("node-b", "hdd", "dc1", "rack1", "c1", 100, 10, withFullness(0.5))...) + // dc1: node-a also has 20 FULL volumes that should be excluded + allMetrics = append(allMetrics, makeVolumesWith("node-a", "hdd", "dc1", "rack1", "c1", 200, 20, withFullness(1.5))...) + // dc2: node-c has 50 active volumes (should be excluded by DC filter) + allMetrics = append(allMetrics, makeVolumesWith("node-c", "hdd", "dc2", "rack1", "c1", 300, 50, withFullness(0.5))...) + + // Apply volume state filter (ACTIVE only) first + activeMetrics := filterByState(allMetrics, "ACTIVE") + // Then apply DC filter + dcMetrics := make([]*types.VolumeHealthMetrics, 0) + for _, m := range activeMetrics { + if m.DataCenter == "dc1" { + dcMetrics = append(dcMetrics, m) + } + } + + at := buildTopology(servers, allMetrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + conf := defaultConf() + conf.DataCenterFilter = "dc1" + + tasks, _, err := Detection(dcMetrics, clusterInfo, conf, 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + if len(tasks) == 0 { + t.Fatal("Expected balance tasks for 40/10 active-only imbalance in dc1, got 0") + } + + for _, task := range tasks { + if task.Server == "node-c" { + t.Error("node-c (dc2) should not be a source") + } + if task.TypedParams != nil { + for _, tgt := range task.TypedParams.Targets { + if strings.Contains(tgt.Node, "node-c") { + t.Error("node-c (dc2) should not be a target") + } + } + } + } + + // Verify convergence uses only the 50 active dc1 volumes (40+10), + // not the 20 full volumes + effective := computeEffectiveCounts(servers[:2], dcMetrics, tasks) + total := 0 + maxC, minC := 0, len(dcMetrics) + for _, c := range effective { + total += c + if c > maxC { + maxC = c + } + if c < minC { + minC = c + } + } + if total != 50 { + t.Errorf("Expected 50 total active volumes in dc1, got %d", total) + } + t.Logf("DC+state filter: %d tasks, effective=%v", len(tasks), effective) +} + +// TestIntegration_NodeFilterWithCollections tests that node filtering works +// correctly when volumes span multiple collections. +func TestIntegration_NodeFilterWithCollections(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"}, + } + + var allMetrics []*types.VolumeHealthMetrics + // node-a: 30 "photos" + 20 "videos" = 50 total + allMetrics = append(allMetrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "photos", 1, 30)...) + allMetrics = append(allMetrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "videos", 100, 20)...) + // node-b: 5 "photos" + 5 "videos" = 10 total + allMetrics = append(allMetrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "photos", 200, 5)...) + allMetrics = append(allMetrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "videos", 300, 5)...) + // node-c: 40 volumes (should be excluded by node filter) + allMetrics = append(allMetrics, makeVolumes("node-c", "hdd", "dc1", "rack1", "photos", 400, 40)...) + + // Apply node filter + filteredMetrics := make([]*types.VolumeHealthMetrics, 0) + for _, m := range allMetrics { + if m.Server == "node-a" || m.Server == "node-b" { + filteredMetrics = append(filteredMetrics, m) + } + } + + at := buildTopology(servers, allMetrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + conf := defaultConf() + conf.NodeFilter = "node-a,node-b" + + tasks, _, err := Detection(filteredMetrics, clusterInfo, conf, 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + if len(tasks) == 0 { + t.Fatal("Expected tasks for 50/10 imbalance within node-a,node-b") + } + + // All moves should be between node-a and node-b only + for _, task := range tasks { + if task.Server != "node-a" && task.Server != "node-b" { + t.Errorf("Source %s should be node-a or node-b", task.Server) + } + if task.TypedParams != nil { + for _, tgt := range task.TypedParams.Targets { + if !strings.Contains(tgt.Node, "node-a") && !strings.Contains(tgt.Node, "node-b") { + t.Errorf("Target %s should be node-a or node-b", tgt.Node) + } + } + } + } + + assertNoDuplicateVolumes(t, tasks) + t.Logf("Node filter with mixed collections: %d tasks", len(tasks)) +} + +// TestIntegration_ReplicaPlacementWithDCFilter tests that replica placement +// validation prevents moves that would violate replication policy even when +// DC filtering restricts the available servers. +func TestIntegration_ReplicaPlacementWithDCFilter(t *testing.T) { + // Setup: 2 DCs, volumes with rp=100 (1 replica in different DC) + // DC filter restricts to dc1 only. + // Replicas: each volume has one copy in dc1 and one in dc2. + // The balancer should NOT move volumes within dc1 to a node that would + // violate the cross-DC placement requirement. + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc2", rack: "rack1"}, + } + + // node-a: 30 volumes with rp=100, node-b: 5 volumes with rp=100 + // Each volume also has a replica on node-c (dc2) + var allMetrics []*types.VolumeHealthMetrics + allMetrics = append(allMetrics, makeVolumesWith("node-a", "hdd", "dc1", "rack1", "c1", 1, 30, withReplicas(100))...) + allMetrics = append(allMetrics, makeVolumesWith("node-b", "hdd", "dc1", "rack1", "c1", 100, 5, withReplicas(100))...) + // dc2 replicas (not part of filtered metrics, but in replica map) + dc2Replicas := makeVolumesWith("node-c", "hdd", "dc2", "rack1", "c1", 1, 30, withReplicas(100)) + + // Build replica map: volumes 1-30 have replicas on node-a AND node-c + replicaMap := buildReplicaMap(allMetrics) + for _, r := range dc2Replicas { + replicaMap[r.VolumeID] = append(replicaMap[r.VolumeID], types.ReplicaLocation{ + DataCenter: r.DataCenter, + Rack: r.Rack, + NodeID: r.Server, + }) + } + + // Filter to dc1 only + dc1Metrics := make([]*types.VolumeHealthMetrics, 0) + for _, m := range allMetrics { + if m.DataCenter == "dc1" { + dc1Metrics = append(dc1Metrics, m) + } + } + + at := buildTopology(servers, allMetrics) + clusterInfo := &types.ClusterInfo{ + ActiveTopology: at, + VolumeReplicaMap: replicaMap, + } + + conf := defaultConf() + conf.DataCenterFilter = "dc1" + + tasks, _, err := Detection(dc1Metrics, clusterInfo, conf, 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + // With rp=100, moving a volume from node-a to node-b is valid because + // the cross-DC replica on node-c is preserved. The balancer should + // produce moves that keep the dc2 replica intact. + if len(tasks) == 0 { + t.Fatal("Expected tasks: 30/5 imbalance with valid cross-DC replicas") + } + + for _, task := range tasks { + // All sources should be from dc1 + if task.Server != "node-a" && task.Server != "node-b" { + t.Errorf("Source %s should be in dc1", task.Server) + } + // All targets should be in dc1 + if task.TypedParams != nil { + for _, tgt := range task.TypedParams.Targets { + if strings.Contains(tgt.Node, "node-c") { + t.Errorf("Target should not be node-c (dc2) with dc1 filter") + } + } + } + } + + assertNoDuplicateVolumes(t, tasks) + t.Logf("Replica placement + DC filter: %d tasks", len(tasks)) +} + +// TestIntegration_RackFilterWithReplicaPlacement tests that rack filtering +// and replica placement validation (rp=010) work together. With rp=010, +// replicas must be on different racks. When rack filtering restricts to one +// rack, the balancer should still produce valid moves within that rack for +// volumes whose replicas satisfy the cross-rack requirement via nodes outside +// the filter. +func TestIntegration_RackFilterWithReplicaPlacement(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack2"}, + } + + // rack1: node-a has 30 volumes, node-b has 5 + // Each volume also has a replica on node-c (rack2), satisfying rp=010 + var allMetrics []*types.VolumeHealthMetrics + allMetrics = append(allMetrics, makeVolumesWith("node-a", "hdd", "dc1", "rack1", "c1", 1, 30, withReplicas(10))...) + allMetrics = append(allMetrics, makeVolumesWith("node-b", "hdd", "dc1", "rack1", "c1", 100, 5, withReplicas(10))...) + rack2Replicas := makeVolumesWith("node-c", "hdd", "dc1", "rack2", "c1", 1, 30, withReplicas(10)) + + replicaMap := buildReplicaMap(allMetrics) + for _, r := range rack2Replicas { + replicaMap[r.VolumeID] = append(replicaMap[r.VolumeID], types.ReplicaLocation{ + DataCenter: r.DataCenter, + Rack: r.Rack, + NodeID: r.Server, + }) + } + + // Filter to rack1 only + rack1Metrics := make([]*types.VolumeHealthMetrics, 0) + for _, m := range allMetrics { + if m.Rack == "rack1" { + rack1Metrics = append(rack1Metrics, m) + } + } + + at := buildTopology(servers, allMetrics) + clusterInfo := &types.ClusterInfo{ + ActiveTopology: at, + VolumeReplicaMap: replicaMap, + } + + conf := defaultConf() + conf.RackFilter = "rack1" + + tasks, _, err := Detection(rack1Metrics, clusterInfo, conf, 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + // Moving within rack1 (node-a → node-b) is valid because the cross-rack + // replica on node-c (rack2) is preserved. + if len(tasks) == 0 { + t.Fatal("Expected tasks for 30/5 imbalance within rack1") + } + + for _, task := range tasks { + if task.Server == "node-c" { + t.Error("node-c (rack2) should not be a source with rack1 filter") + } + if task.TypedParams != nil { + for _, tgt := range task.TypedParams.Targets { + if strings.Contains(tgt.Node, "node-c") { + t.Error("node-c (rack2) should not be a target with rack1 filter") + } + } + } + } + + assertNoDuplicateVolumes(t, tasks) + t.Logf("Rack filter + replica placement: %d tasks", len(tasks)) +} + +// TestIntegration_AllFactors exercises all filtering dimensions simultaneously: +// DC filter, rack filter, volume state filter, replica placement validation, +// and mixed collections. +func TestIntegration_AllFactors(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack2"}, // excluded by rack filter + {id: "node-d", diskType: "hdd", diskID: 4, dc: "dc2", rack: "rack1"}, // excluded by DC filter + } + + var allMetrics []*types.VolumeHealthMetrics + + // node-a: 25 active "photos" + 10 full "photos" (full excluded by state filter) + allMetrics = append(allMetrics, makeVolumesWith("node-a", "hdd", "dc1", "rack1", "photos", 1, 25, withFullness(0.5), withReplicas(100))...) + allMetrics = append(allMetrics, makeVolumesWith("node-a", "hdd", "dc1", "rack1", "photos", 200, 10, withFullness(1.5), withReplicas(100))...) + + // node-b: 5 active "photos" + allMetrics = append(allMetrics, makeVolumesWith("node-b", "hdd", "dc1", "rack1", "photos", 100, 5, withFullness(0.5), withReplicas(100))...) + + // node-c: 20 volumes in dc1/rack2 (excluded by rack filter) + allMetrics = append(allMetrics, makeVolumesWith("node-c", "hdd", "dc1", "rack2", "photos", 300, 20, withFullness(0.5))...) + + // node-d: 30 volumes in dc2 (excluded by DC filter, but provides cross-DC replicas) + dc2Replicas := makeVolumesWith("node-d", "hdd", "dc2", "rack1", "photos", 1, 25, withFullness(0.5), withReplicas(100)) + + // Build replica map: volumes 1-25 have cross-DC replicas on node-d + replicaMap := buildReplicaMap(allMetrics) + for _, r := range dc2Replicas { + replicaMap[r.VolumeID] = append(replicaMap[r.VolumeID], types.ReplicaLocation{ + DataCenter: r.DataCenter, + Rack: r.Rack, + NodeID: r.Server, + }) + } + + // Apply all filters: ACTIVE state, dc1, rack1 + filtered := filterByState(allMetrics, "ACTIVE") + var finalMetrics []*types.VolumeHealthMetrics + for _, m := range filtered { + if m.DataCenter == "dc1" && m.Rack == "rack1" { + finalMetrics = append(finalMetrics, m) + } + } + + at := buildTopology(servers, allMetrics) + clusterInfo := &types.ClusterInfo{ + ActiveTopology: at, + VolumeReplicaMap: replicaMap, + } + + conf := defaultConf() + conf.DataCenterFilter = "dc1" + conf.RackFilter = "rack1" + + tasks, _, err := Detection(finalMetrics, clusterInfo, conf, 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + if len(tasks) == 0 { + t.Fatal("Expected tasks for 25/5 active imbalance in dc1/rack1") + } + + // Verify all moves stay within dc1/rack1 scope + for i, task := range tasks { + if task.Server != "node-a" && task.Server != "node-b" { + t.Errorf("Task %d: source %s should be node-a or node-b", i, task.Server) + } + if task.TypedParams != nil { + for _, tgt := range task.TypedParams.Targets { + node := tgt.Node + if !strings.Contains(node, "node-a") && !strings.Contains(node, "node-b") { + t.Errorf("Task %d: target %s should be node-a or node-b", i, node) + } + } + } + } + + assertNoDuplicateVolumes(t, tasks) + + // Verify convergence + effective := computeEffectiveCounts(servers[:2], finalMetrics, tasks) + total := 0 + maxC, minC := 0, len(finalMetrics) + for _, c := range effective { + total += c + if c > maxC { + maxC = c + } + if c < minC { + minC = c + } + } + + // Should have balanced the 30 active dc1/rack1 volumes (25+5) + if total != 30 { + t.Errorf("Expected 30 total filtered volumes, got %d", total) + } + avg := float64(total) / float64(len(effective)) + imbalance := float64(maxC-minC) / avg + if imbalance > conf.ImbalanceThreshold { + t.Errorf("Still imbalanced: effective=%v, imbalance=%.1f%% (threshold=%.1f%%)", + effective, imbalance*100, conf.ImbalanceThreshold*100) + } + + t.Logf("All factors combined: %d tasks, effective=%v", len(tasks), effective) +} + +// TestIntegration_FullVolumesOnlyBalancing verifies that with FULL state filter, +// only full volumes participate in balancing. +func TestIntegration_FullVolumesOnlyBalancing(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1"}, + } + + var allMetrics []*types.VolumeHealthMetrics + // node-a: 10 active + 30 full + allMetrics = append(allMetrics, makeVolumesWith("node-a", "hdd", "dc1", "rack1", "c1", 1, 10, withFullness(0.5))...) + allMetrics = append(allMetrics, makeVolumesWith("node-a", "hdd", "dc1", "rack1", "c1", 100, 30, withFullness(1.5))...) + // node-b: 10 active + 5 full + allMetrics = append(allMetrics, makeVolumesWith("node-b", "hdd", "dc1", "rack1", "c1", 200, 10, withFullness(0.5))...) + allMetrics = append(allMetrics, makeVolumesWith("node-b", "hdd", "dc1", "rack1", "c1", 300, 5, withFullness(1.5))...) + + // Filter to FULL only + fullMetrics := filterByState(allMetrics, "FULL") + + at := buildTopology(servers, allMetrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + tasks, _, err := Detection(fullMetrics, clusterInfo, defaultConf(), 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + if len(tasks) == 0 { + t.Fatal("Expected tasks for 30/5 full volume imbalance") + } + + // Verify only full volumes are moved (IDs 100-129 from node-a, 300-304 from node-b) + for _, task := range tasks { + vid := task.VolumeID + // Active volumes are IDs 1-10 (node-a) and 200-209 (node-b) + if (vid >= 1 && vid <= 10) || (vid >= 200 && vid <= 209) { + t.Errorf("Task moved active volume %d, should only move full volumes", vid) + } + } + + assertNoDuplicateVolumes(t, tasks) + t.Logf("Full-only balancing: %d tasks", len(tasks)) +}