use constants

This commit is contained in:
Chris Lu
2026-03-13 18:11:08 -07:00
parent f48725a31d
commit 89ccb6d825
5 changed files with 47 additions and 29 deletions

View File

@@ -24,10 +24,23 @@ import (
const ( const (
defaultBalanceTimeoutSeconds = int32(10 * 60) defaultBalanceTimeoutSeconds = int32(10 * 60)
maxProposalStringLength = 200 maxProposalStringLength = 200
)
// Collection filter mode constants. // collectionFilterMode controls how collections are handled during balance detection.
collectionFilterAll = "ALL_COLLECTIONS" type collectionFilterMode string
collectionFilterEach = "EACH_COLLECTION"
const (
collectionFilterAll collectionFilterMode = "ALL_COLLECTIONS"
collectionFilterEach collectionFilterMode = "EACH_COLLECTION"
)
// volumeState controls which volumes participate in balance detection.
type volumeState string
const (
volumeStateAll volumeState = "ALL"
volumeStateActive volumeState = "ACTIVE"
volumeStateFull volumeState = "FULL"
) )
func init() { func init() {
@@ -102,9 +115,9 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_ENUM, FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_ENUM,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_SELECT, Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_SELECT,
Options: []*plugin_pb.ConfigOption{ Options: []*plugin_pb.ConfigOption{
{Value: "ALL", Label: "All Volumes"}, {Value: string(volumeStateAll), Label: "All Volumes"},
{Value: "ACTIVE", Label: "Active (writable)"}, {Value: string(volumeStateActive), Label: "Active (writable)"},
{Value: "FULL", Label: "Full (read-only)"}, {Value: string(volumeStateFull), Label: "Full (read-only)"},
}, },
}, },
{ {
@@ -139,7 +152,7 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}, Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
}, },
"volume_state": { "volume_state": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "ALL"}, Kind: &plugin_pb.ConfigValue_StringValue{StringValue: string(volumeStateAll)},
}, },
"data_center_filter": { "data_center_filter": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}, Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
@@ -319,8 +332,8 @@ func (h *VolumeBalanceHandler) Detect(
return err return err
} }
volumeState := strings.ToUpper(strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "volume_state", "ALL"))) volState := volumeState(strings.ToUpper(strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "volume_state", string(volumeStateAll)))))
metrics = filterMetricsByVolumeState(metrics, volumeState) metrics = filterMetricsByVolumeState(metrics, volState)
dataCenterFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "data_center_filter", "")) dataCenterFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "data_center_filter", ""))
rackFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "rack_filter", "")) rackFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "rack_filter", ""))
@@ -343,7 +356,7 @@ func (h *VolumeBalanceHandler) Detect(
var results []*workertypes.TaskDetectionResult var results []*workertypes.TaskDetectionResult
var hasMore bool var hasMore bool
if collectionFilter == collectionFilterEach { if collectionFilterMode(collectionFilter) == collectionFilterEach {
// Group metrics by collection in a single pass (O(N) instead of O(C*N)) // Group metrics by collection in a single pass (O(N) instead of O(C*N))
metricsByCollection := make(map[string][]*workertypes.VolumeHealthMetrics) metricsByCollection := make(map[string][]*workertypes.VolumeHealthMetrics)
for _, m := range metrics { for _, m := range metrics {
@@ -664,16 +677,16 @@ func emitVolumeBalanceDetectionDecisionTrace(
// "ACTIVE" keeps volumes with FullnessRatio < 1.01 (writable, below size limit). // "ACTIVE" keeps volumes with FullnessRatio < 1.01 (writable, below size limit).
// "FULL" keeps volumes with FullnessRatio >= 1.01 (read-only, above size limit). // "FULL" keeps volumes with FullnessRatio >= 1.01 (read-only, above size limit).
// "ALL" or any other value returns all metrics unfiltered. // "ALL" or any other value returns all metrics unfiltered.
func filterMetricsByVolumeState(metrics []*workertypes.VolumeHealthMetrics, volumeState string) []*workertypes.VolumeHealthMetrics { func filterMetricsByVolumeState(metrics []*workertypes.VolumeHealthMetrics, state volumeState) []*workertypes.VolumeHealthMetrics {
const fullnessThreshold = 1.01 const fullnessThreshold = 1.01
var predicate func(m *workertypes.VolumeHealthMetrics) bool var predicate func(m *workertypes.VolumeHealthMetrics) bool
switch volumeState { switch state {
case "ACTIVE": case volumeStateActive:
predicate = func(m *workertypes.VolumeHealthMetrics) bool { predicate = func(m *workertypes.VolumeHealthMetrics) bool {
return m.FullnessRatio < fullnessThreshold return m.FullnessRatio < fullnessThreshold
} }
case "FULL": case volumeStateFull:
predicate = func(m *workertypes.VolumeHealthMetrics) bool { predicate = func(m *workertypes.VolumeHealthMetrics) bool {
return m.FullnessRatio >= fullnessThreshold return m.FullnessRatio >= fullnessThreshold
} }

View File

@@ -712,7 +712,7 @@ func TestFilterMetricsByVolumeState(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
result := filterMetricsByVolumeState(metrics, tt.state) result := filterMetricsByVolumeState(metrics, volumeState(tt.state))
if len(result) != len(tt.expectedIDs) { if len(result) != len(tt.expectedIDs) {
t.Fatalf("expected %d metrics, got %d", len(tt.expectedIDs), len(result)) t.Fatalf("expected %d metrics, got %d", len(tt.expectedIDs), len(result))
} }
@@ -732,23 +732,23 @@ func TestFilterMetricsByVolumeState_NilElement(t *testing.T) {
nil, nil,
{VolumeID: 2, FullnessRatio: 1.5}, {VolumeID: 2, FullnessRatio: 1.5},
} }
result := filterMetricsByVolumeState(metrics, "ACTIVE") result := filterMetricsByVolumeState(metrics, volumeStateActive)
if len(result) != 1 || result[0].VolumeID != 1 { if len(result) != 1 || result[0].VolumeID != 1 {
t.Fatalf("expected [vol 1] for ACTIVE with nil elements, got %d results", len(result)) t.Fatalf("expected [vol 1] for ACTIVE with nil elements, got %d results", len(result))
} }
result = filterMetricsByVolumeState(metrics, "FULL") result = filterMetricsByVolumeState(metrics, volumeStateFull)
if len(result) != 1 || result[0].VolumeID != 2 { if len(result) != 1 || result[0].VolumeID != 2 {
t.Fatalf("expected [vol 2] for FULL with nil elements, got %d results", len(result)) t.Fatalf("expected [vol 2] for FULL with nil elements, got %d results", len(result))
} }
} }
func TestFilterMetricsByVolumeState_EmptyInput(t *testing.T) { func TestFilterMetricsByVolumeState_EmptyInput(t *testing.T) {
result := filterMetricsByVolumeState(nil, "ACTIVE") result := filterMetricsByVolumeState(nil, volumeStateActive)
if len(result) != 0 { if len(result) != 0 {
t.Fatalf("expected 0 metrics for nil input, got %d", len(result)) t.Fatalf("expected 0 metrics for nil input, got %d", len(result))
} }
result = filterMetricsByVolumeState([]*workertypes.VolumeHealthMetrics{}, "FULL") result = filterMetricsByVolumeState([]*workertypes.VolumeHealthMetrics{}, volumeStateFull)
if len(result) != 0 { if len(result) != 0 {
t.Fatalf("expected 0 metrics for empty input, got %d", len(result)) t.Fatalf("expected 0 metrics for empty input, got %d", len(result))
} }

View File

@@ -101,7 +101,8 @@ func buildVolumeMetrics(
var collectionRegex *regexp.Regexp var collectionRegex *regexp.Regexp
trimmedFilter := strings.TrimSpace(collectionFilter) trimmedFilter := strings.TrimSpace(collectionFilter)
if trimmedFilter != "" && trimmedFilter != collectionFilterAll && trimmedFilter != collectionFilterEach && trimmedFilter != "*" { filterMode := collectionFilterMode(trimmedFilter)
if trimmedFilter != "" && filterMode != collectionFilterAll && filterMode != collectionFilterEach && trimmedFilter != "*" {
var err error var err error
collectionRegex, err = regexp.Compile(trimmedFilter) collectionRegex, err = regexp.Compile(trimmedFilter)
if err != nil { if err != nil {

View File

@@ -53,7 +53,7 @@ func TestBuildVolumeMetricsAllCollections(t *testing.T) {
&master_pb.VolumeInformationMessage{Id: 1, Collection: "photos", Size: 100}, &master_pb.VolumeInformationMessage{Id: 1, Collection: "photos", Size: 100},
&master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200}, &master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200},
) )
metrics, _, _, err := buildVolumeMetrics(resp, collectionFilterAll) metrics, _, _, err := buildVolumeMetrics(resp, string(collectionFilterAll))
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@@ -68,7 +68,7 @@ func TestBuildVolumeMetricsEachCollection(t *testing.T) {
&master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200}, &master_pb.VolumeInformationMessage{Id: 2, Collection: "videos", Size: 200},
) )
// EACH_COLLECTION passes all volumes through; filtering happens in the handler // EACH_COLLECTION passes all volumes through; filtering happens in the handler
metrics, _, _, err := buildVolumeMetrics(resp, collectionFilterEach) metrics, _, _, err := buildVolumeMetrics(resp, string(collectionFilterEach))
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }

View File

@@ -7,11 +7,15 @@ import (
"github.com/seaweedfs/seaweedfs/weed/worker/types" "github.com/seaweedfs/seaweedfs/weed/worker/types"
) )
const fullnessThreshold = 1.01 const (
fullnessThreshold = 1.01
stateActive = "ACTIVE"
stateFull = "FULL"
)
// filterByState filters metrics by volume state for testing. // filterByState filters metrics by volume state for testing.
func filterByState(metrics []*types.VolumeHealthMetrics, state string) []*types.VolumeHealthMetrics { func filterByState(metrics []*types.VolumeHealthMetrics, state string) []*types.VolumeHealthMetrics {
if state != "ACTIVE" && state != "FULL" { if state != stateActive && state != stateFull {
return metrics return metrics
} }
var out []*types.VolumeHealthMetrics var out []*types.VolumeHealthMetrics
@@ -19,10 +23,10 @@ func filterByState(metrics []*types.VolumeHealthMetrics, state string) []*types.
if m == nil { if m == nil {
continue continue
} }
if state == "ACTIVE" && m.FullnessRatio < fullnessThreshold { if state == stateActive && m.FullnessRatio < fullnessThreshold {
out = append(out, m) out = append(out, m)
} }
if state == "FULL" && m.FullnessRatio >= fullnessThreshold { if state == stateFull && m.FullnessRatio >= fullnessThreshold {
out = append(out, m) out = append(out, m)
} }
} }
@@ -87,7 +91,7 @@ func TestIntegration_DCFilterWithVolumeState(t *testing.T) {
allMetrics = append(allMetrics, makeVolumesWith("node-c", "hdd", "dc2", "rack1", "c1", 300, 50, withFullness(0.5))...) allMetrics = append(allMetrics, makeVolumesWith("node-c", "hdd", "dc2", "rack1", "c1", 300, 50, withFullness(0.5))...)
// Apply volume state filter (ACTIVE only) first // Apply volume state filter (ACTIVE only) first
activeMetrics := filterByState(allMetrics, "ACTIVE") activeMetrics := filterByState(allMetrics, stateActive)
// Then apply DC filter // Then apply DC filter
dcMetrics := make([]*types.VolumeHealthMetrics, 0) dcMetrics := make([]*types.VolumeHealthMetrics, 0)
for _, m := range activeMetrics { for _, m := range activeMetrics {
@@ -394,7 +398,7 @@ func TestIntegration_AllFactors(t *testing.T) {
} }
// Apply all filters: ACTIVE state, dc1, rack1 // Apply all filters: ACTIVE state, dc1, rack1
filtered := filterByState(allMetrics, "ACTIVE") filtered := filterByState(allMetrics, stateActive)
var finalMetrics []*types.VolumeHealthMetrics var finalMetrics []*types.VolumeHealthMetrics
for _, m := range filtered { for _, m := range filtered {
if m.DataCenter == "dc1" && m.Rack == "rack1" { if m.DataCenter == "dc1" && m.Rack == "rack1" {
@@ -482,7 +486,7 @@ func TestIntegration_FullVolumesOnlyBalancing(t *testing.T) {
allMetrics = append(allMetrics, makeVolumesWith("node-b", "hdd", "dc1", "rack1", "c1", 300, 5, withFullness(1.5))...) allMetrics = append(allMetrics, makeVolumesWith("node-b", "hdd", "dc1", "rack1", "c1", 300, 5, withFullness(1.5))...)
// Filter to FULL only // Filter to FULL only
fullMetrics := filterByState(allMetrics, "FULL") fullMetrics := filterByState(allMetrics, stateFull)
at := buildTopology(servers, allMetrics) at := buildTopology(servers, allMetrics)
clusterInfo := &types.ClusterInfo{ActiveTopology: at} clusterInfo := &types.ClusterInfo{ActiveTopology: at}