diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index 5d49cc784..b37a19f54 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -24,10 +24,23 @@ import ( const ( defaultBalanceTimeoutSeconds = int32(10 * 60) maxProposalStringLength = 200 +) - // Collection filter mode constants. - collectionFilterAll = "ALL_COLLECTIONS" - collectionFilterEach = "EACH_COLLECTION" +// collectionFilterMode controls how collections are handled during balance detection. +type collectionFilterMode string + +const ( + collectionFilterAll collectionFilterMode = "ALL_COLLECTIONS" + collectionFilterEach collectionFilterMode = "EACH_COLLECTION" +) + +// volumeState controls which volumes participate in balance detection. +type volumeState string + +const ( + volumeStateAll volumeState = "ALL" + volumeStateActive volumeState = "ACTIVE" + volumeStateFull volumeState = "FULL" ) func init() { @@ -102,9 +115,9 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_ENUM, Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_SELECT, Options: []*plugin_pb.ConfigOption{ - {Value: "ALL", Label: "All Volumes"}, - {Value: "ACTIVE", Label: "Active (writable)"}, - {Value: "FULL", Label: "Full (read-only)"}, + {Value: string(volumeStateAll), Label: "All Volumes"}, + {Value: string(volumeStateActive), Label: "Active (writable)"}, + {Value: string(volumeStateFull), Label: "Full (read-only)"}, }, }, { @@ -139,7 +152,7 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}, }, "volume_state": { - Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "ALL"}, + Kind: &plugin_pb.ConfigValue_StringValue{StringValue: string(volumeStateAll)}, }, "data_center_filter": { Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}, @@ -319,8 +332,8 @@ func (h *VolumeBalanceHandler) Detect( return err } - volumeState := strings.ToUpper(strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "volume_state", "ALL"))) - metrics = filterMetricsByVolumeState(metrics, volumeState) + volState := volumeState(strings.ToUpper(strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "volume_state", string(volumeStateAll))))) + metrics = filterMetricsByVolumeState(metrics, volState) dataCenterFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "data_center_filter", "")) rackFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "rack_filter", "")) @@ -343,7 +356,7 @@ func (h *VolumeBalanceHandler) Detect( var results []*workertypes.TaskDetectionResult var hasMore bool - if collectionFilter == collectionFilterEach { + if collectionFilterMode(collectionFilter) == collectionFilterEach { // Group metrics by collection in a single pass (O(N) instead of O(C*N)) metricsByCollection := make(map[string][]*workertypes.VolumeHealthMetrics) for _, m := range metrics { @@ -664,16 +677,16 @@ func emitVolumeBalanceDetectionDecisionTrace( // "ACTIVE" keeps volumes with FullnessRatio < 1.01 (writable, below size limit). // "FULL" keeps volumes with FullnessRatio >= 1.01 (read-only, above size limit). // "ALL" or any other value returns all metrics unfiltered. -func filterMetricsByVolumeState(metrics []*workertypes.VolumeHealthMetrics, volumeState string) []*workertypes.VolumeHealthMetrics { +func filterMetricsByVolumeState(metrics []*workertypes.VolumeHealthMetrics, state volumeState) []*workertypes.VolumeHealthMetrics { const fullnessThreshold = 1.01 var predicate func(m *workertypes.VolumeHealthMetrics) bool - switch volumeState { - case "ACTIVE": + switch state { + case volumeStateActive: predicate = func(m *workertypes.VolumeHealthMetrics) bool { return m.FullnessRatio < fullnessThreshold } - case "FULL": + case volumeStateFull: predicate = func(m *workertypes.VolumeHealthMetrics) bool { return m.FullnessRatio >= fullnessThreshold } diff --git a/weed/plugin/worker/volume_balance_handler_test.go b/weed/plugin/worker/volume_balance_handler_test.go index f65fb164e..857a5edff 100644 --- a/weed/plugin/worker/volume_balance_handler_test.go +++ b/weed/plugin/worker/volume_balance_handler_test.go @@ -712,7 +712,7 @@ func TestFilterMetricsByVolumeState(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := filterMetricsByVolumeState(metrics, tt.state) + result := filterMetricsByVolumeState(metrics, volumeState(tt.state)) if len(result) != len(tt.expectedIDs) { t.Fatalf("expected %d metrics, got %d", len(tt.expectedIDs), len(result)) } @@ -732,23 +732,23 @@ func TestFilterMetricsByVolumeState_NilElement(t *testing.T) { nil, {VolumeID: 2, FullnessRatio: 1.5}, } - result := filterMetricsByVolumeState(metrics, "ACTIVE") + result := filterMetricsByVolumeState(metrics, volumeStateActive) if len(result) != 1 || result[0].VolumeID != 1 { t.Fatalf("expected [vol 1] for ACTIVE with nil elements, got %d results", len(result)) } - result = filterMetricsByVolumeState(metrics, "FULL") + result = filterMetricsByVolumeState(metrics, volumeStateFull) if len(result) != 1 || result[0].VolumeID != 2 { t.Fatalf("expected [vol 2] for FULL with nil elements, got %d results", len(result)) } } func TestFilterMetricsByVolumeState_EmptyInput(t *testing.T) { - result := filterMetricsByVolumeState(nil, "ACTIVE") + result := filterMetricsByVolumeState(nil, volumeStateActive) if len(result) != 0 { t.Fatalf("expected 0 metrics for nil input, got %d", len(result)) } - result = filterMetricsByVolumeState([]*workertypes.VolumeHealthMetrics{}, "FULL") + result = filterMetricsByVolumeState([]*workertypes.VolumeHealthMetrics{}, volumeStateFull) if len(result) != 0 { t.Fatalf("expected 0 metrics for empty input, got %d", len(result)) } diff --git a/weed/plugin/worker/volume_metrics.go b/weed/plugin/worker/volume_metrics.go index 7cc184119..aef4c0ff7 100644 --- a/weed/plugin/worker/volume_metrics.go +++ b/weed/plugin/worker/volume_metrics.go @@ -101,7 +101,8 @@ func buildVolumeMetrics( var collectionRegex *regexp.Regexp trimmedFilter := strings.TrimSpace(collectionFilter) - if trimmedFilter != "" && trimmedFilter != collectionFilterAll && trimmedFilter != collectionFilterEach && trimmedFilter != "*" { + filterMode := collectionFilterMode(trimmedFilter) + if trimmedFilter != "" && filterMode != collectionFilterAll && filterMode != collectionFilterEach && trimmedFilter != "*" { var err error collectionRegex, err = regexp.Compile(trimmedFilter) if err != nil { diff --git a/weed/plugin/worker/volume_metrics_test.go b/weed/plugin/worker/volume_metrics_test.go index 17710724e..01f5ea01f 100644 --- a/weed/plugin/worker/volume_metrics_test.go +++ b/weed/plugin/worker/volume_metrics_test.go @@ -53,7 +53,7 @@ func TestBuildVolumeMetricsAllCollections(t *testing.T) { &master_pb.VolumeInformationMessage{Id: 1, Collection: "photos", Size: 100}, &master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200}, ) - metrics, _, _, err := buildVolumeMetrics(resp, collectionFilterAll) + metrics, _, _, err := buildVolumeMetrics(resp, string(collectionFilterAll)) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -68,7 +68,7 @@ func TestBuildVolumeMetricsEachCollection(t *testing.T) { &master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200}, ) // EACH_COLLECTION passes all volumes through; filtering happens in the handler - metrics, _, _, err := buildVolumeMetrics(resp, collectionFilterEach) + metrics, _, _, err := buildVolumeMetrics(resp, string(collectionFilterEach)) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/weed/worker/tasks/balance/detection_integration_test.go b/weed/worker/tasks/balance/detection_integration_test.go index 94dcc0965..1230e82e3 100644 --- a/weed/worker/tasks/balance/detection_integration_test.go +++ b/weed/worker/tasks/balance/detection_integration_test.go @@ -7,11 +7,15 @@ import ( "github.com/seaweedfs/seaweedfs/weed/worker/types" ) -const fullnessThreshold = 1.01 +const ( + fullnessThreshold = 1.01 + stateActive = "ACTIVE" + stateFull = "FULL" +) // filterByState filters metrics by volume state for testing. func filterByState(metrics []*types.VolumeHealthMetrics, state string) []*types.VolumeHealthMetrics { - if state != "ACTIVE" && state != "FULL" { + if state != stateActive && state != stateFull { return metrics } var out []*types.VolumeHealthMetrics @@ -19,10 +23,10 @@ func filterByState(metrics []*types.VolumeHealthMetrics, state string) []*types. if m == nil { continue } - if state == "ACTIVE" && m.FullnessRatio < fullnessThreshold { + if state == stateActive && m.FullnessRatio < fullnessThreshold { out = append(out, m) } - if state == "FULL" && m.FullnessRatio >= fullnessThreshold { + if state == stateFull && m.FullnessRatio >= fullnessThreshold { out = append(out, m) } } @@ -87,7 +91,7 @@ func TestIntegration_DCFilterWithVolumeState(t *testing.T) { allMetrics = append(allMetrics, makeVolumesWith("node-c", "hdd", "dc2", "rack1", "c1", 300, 50, withFullness(0.5))...) // Apply volume state filter (ACTIVE only) first - activeMetrics := filterByState(allMetrics, "ACTIVE") + activeMetrics := filterByState(allMetrics, stateActive) // Then apply DC filter dcMetrics := make([]*types.VolumeHealthMetrics, 0) for _, m := range activeMetrics { @@ -394,7 +398,7 @@ func TestIntegration_AllFactors(t *testing.T) { } // Apply all filters: ACTIVE state, dc1, rack1 - filtered := filterByState(allMetrics, "ACTIVE") + filtered := filterByState(allMetrics, stateActive) var finalMetrics []*types.VolumeHealthMetrics for _, m := range filtered { if m.DataCenter == "dc1" && m.Rack == "rack1" { @@ -482,7 +486,7 @@ func TestIntegration_FullVolumesOnlyBalancing(t *testing.T) { allMetrics = append(allMetrics, makeVolumesWith("node-b", "hdd", "dc1", "rack1", "c1", 300, 5, withFullness(1.5))...) // Filter to FULL only - fullMetrics := filterByState(allMetrics, "FULL") + fullMetrics := filterByState(allMetrics, stateFull) at := buildTopology(servers, allMetrics) clusterInfo := &types.ClusterInfo{ActiveTopology: at}