Files
seaweedFS/weed/plugin/worker/volume_balance_handler.go
Chris Lu 78a3441b30 fix: volume balance detection returns multiple tasks per run (#8559)
* fix: volume balance detection now returns multiple tasks per run (#8551)

Previously, detectForDiskType() returned at most 1 balance task per disk
type, making the MaxJobsPerDetection setting ineffective. The detection
loop now iterates within each disk type, planning multiple moves until
the imbalance drops below threshold or maxResults is reached. Effective
volume counts are adjusted after each planned move so the algorithm
correctly re-evaluates which server is overloaded.

* fix: factor pending tasks into destination scoring and use UnixNano for task IDs

- Use UnixNano instead of Unix for task IDs to avoid collisions when
  multiple tasks are created within the same second
- Adjust calculateBalanceScore to include LoadCount (pending + assigned
  tasks) in the utilization estimate, so the destination picker avoids
  stacking multiple planned moves onto the same target disk

* test: add comprehensive balance detection tests for complex scenarios

Cover multi-server convergence, max-server shifting, destination
spreading, pre-existing pending task skipping, no-duplicate-volume
invariant, and parameterized convergence verification across different
cluster shapes and thresholds.

* fix: address PR review findings in balance detection

- hasMore flag: compute from len(results) >= maxResults so the scheduler
  knows more pages may exist, matching vacuum/EC handler pattern
- Exhausted server fallthrough: when no eligible volumes remain on the
  current maxServer (all have pending tasks) or destination planning
  fails, mark the server as exhausted and continue to the next
  overloaded server instead of stopping the entire detection loop
- Return canonical destination server ID directly from createBalanceTask
  instead of resolving via findServerIDByAddress, eliminating the
  fragile address→ID lookup for adjustment tracking
- Fix bestScore sentinel: use math.Inf(-1) instead of -1.0 so disks
  with negative scores (high pending load, same rack/DC) are still
  selected as the best available destination
- Add TestDetection_ExhaustedServerFallsThrough covering the scenario
  where the top server's volumes are all blocked by pre-existing tasks

* test: fix computeEffectiveCounts and add len guard in no-duplicate test

- computeEffectiveCounts now takes a servers slice to seed counts for all
  known servers (including empty ones) and uses an address→ID map from
  the topology spec instead of scanning metrics, so destination servers
  with zero initial volumes are tracked correctly
- TestDetection_NoDuplicateVolumesAcrossIterations now asserts len > 1
  before checking duplicates, so the test actually fails if Detection
  regresses to returning a single task

* fix: remove redundant HasAnyTask check in createBalanceTask

The HasAnyTask check in createBalanceTask duplicated the same check
already performed in detectForDiskType's volume selection loop.
Since detection runs single-threaded (MaxDetectionConcurrency: 1),
no race can occur between the two points.

* fix: consistent hasMore pattern and remove double-counted LoadCount in scoring

- Adopt vacuum_handler's hasMore pattern: over-fetch by 1, check
  len > maxResults, and truncate — consistent truncation semantics
- Remove direct LoadCount penalty in calculateBalanceScore since
  LoadCount is already factored into effectiveVolumeCount for
  utilization scoring; bump utilization weight from 40 to 50 to
  compensate for the removed 10-point load penalty

* fix: handle zero maxResults as no-cap, emit trace after trim, seed empty servers

- When MaxResults is 0 (omitted), treat as no explicit cap instead of
  defaulting to 1; only apply the +1 over-fetch probe when caller
  supplies a positive limit
- Move decision trace emission after hasMore/trim so the trace
  accurately reflects the returned proposals
- Seed serverVolumeCounts from ActiveTopology so servers that have a
  matching disk type but zero volumes are included in the imbalance
  calculation and MinServerCount check

* fix: nil-guard clusterInfo, uncap legacy DetectionFunc, deterministic disk type order

- Add early nil guard for clusterInfo in Detection to prevent panics
  in downstream helpers (detectForDiskType, createBalanceTask)
- Change register.go DetectionFunc wrapper from maxResults=1 to 0
  (no cap) so the legacy code path returns all detected tasks
- Sort disk type keys before iteration so results are deterministic
  when maxResults spans multiple disk types (HDD/SSD)

* fix: don't over-fetch in stateful detection to avoid orphaned pending tasks

Detection registers planned moves in ActiveTopology via AddPendingTask,
so requesting maxResults+1 would create an extra pending task that gets
discarded during trim. Use len(results) >= maxResults as the hasMore
signal instead, which is correct since Detection already caps internally.

* fix: return explicit truncated flag from Detection instead of approximating

Detection now returns (results, truncated, error) where truncated is true
only when the loop stopped because it hit maxResults, not when it ran out
of work naturally. This eliminates false hasMore signals when detection
happens to produce exactly maxResults results by resolving the imbalance.

* cleanup: simplify detection logic and remove redundancies

- Remove redundant clusterInfo nil check in detectForDiskType since
  Detection already guards against nil clusterInfo
- Remove adjustments loop for destination servers not in
  serverVolumeCounts — topology seeding ensures all servers with
  matching disk type are already present
- Merge two-loop min/max calculation into a single loop: min across
  all servers, max only among non-exhausted servers
- Replace magic number 100 with len(metrics) for minC initialization
  in convergence test

* fix: accurate truncation flag, deterministic server order, indexed volume lookup

- Track balanced flag to distinguish "hit maxResults cap" from "cluster
  balanced at exactly maxResults" — truncated is only true when there's
  genuinely more work to do
- Sort servers for deterministic iteration and tie-breaking when
  multiple servers have equal volume counts
- Pre-index volumes by server with per-server cursors to avoid
  O(maxResults * volumes) rescanning on each iteration
- Add truncation flag assertions to RespectsMaxResults test: true when
  capped, false when detection finishes naturally

* fix: seed trace server counts from ActiveTopology to match detection logic

The decision trace was building serverVolumeCounts only from metrics,
missing zero-volume servers seeded from ActiveTopology by Detection.
This could cause the trace to report wrong server counts, incorrect
imbalance ratios, or spurious "too few servers" messages. Pass
activeTopology into the trace function and seed server counts the
same way Detection does.

* fix: don't exhaust server on per-volume planning failure, sort volumes by ID

- When createBalanceTask returns nil, continue to the next volume on
  the same server instead of marking the entire server as exhausted.
  The failure may be volume-specific (not found in topology, pending
  task registration failed) and other volumes on the server may still
  be viable candidates.
- Sort each server's volume slice by VolumeID after pre-indexing so
  volume selection is fully deterministic regardless of input order.

* fix: use require instead of assert to prevent nil dereference panic in CORS test

The test used assert.NoError (non-fatal) for GetBucketCors, then
immediately accessed getResp.CORSRules. When the API returns an error,
getResp is nil causing a panic. Switch to require.NoError/NotNil/Len
so the test stops before dereferencing a nil response.

* fix: deterministic disk tie-breaking and stronger pre-existing task test

- Sort available disks by NodeID then DiskID before scoring so
  destination selection is deterministic when two disks score equally
- Add task count bounds assertion to SkipsPreExistingPendingTasks test:
  with 15 of 20 volumes already having pending tasks, at most 5 new
  tasks should be created and at least 1 (imbalance still exists)

* fix: seed adjustments from existing pending/assigned tasks to prevent over-scheduling

Detection now calls ActiveTopology.GetTaskServerAdjustments() to
initialize the adjustments map with source/destination deltas from
existing pending and assigned balance tasks. This ensures
effectiveCounts reflects in-flight moves, preventing the algorithm
from planning additional moves in the same direction when prior
moves already address the imbalance.

Added GetTaskServerAdjustments(taskType) to ActiveTopology which
iterates pending and assigned tasks, decrementing source servers
and incrementing destination servers for the given task type.
2026-03-08 21:34:03 -07:00

855 lines
26 KiB
Go

package pluginworker
import (
"context"
"fmt"
"sort"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
balancetask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)
const (
defaultBalanceTimeoutSeconds = int32(10 * 60)
)
func init() {
RegisterHandler(HandlerFactory{
JobType: "volume_balance",
Category: CategoryDefault,
Aliases: []string{"balance", "volume.balance", "volume-balance"},
Build: func(opts HandlerBuildOptions) (JobHandler, error) {
return NewVolumeBalanceHandler(opts.GrpcDialOption), nil
},
})
}
type volumeBalanceWorkerConfig struct {
TaskConfig *balancetask.Config
MinIntervalSeconds int
}
// VolumeBalanceHandler is the plugin job handler for volume balancing.
type VolumeBalanceHandler struct {
grpcDialOption grpc.DialOption
}
func NewVolumeBalanceHandler(grpcDialOption grpc.DialOption) *VolumeBalanceHandler {
return &VolumeBalanceHandler{grpcDialOption: grpcDialOption}
}
func (h *VolumeBalanceHandler) Capability() *plugin_pb.JobTypeCapability {
return &plugin_pb.JobTypeCapability{
JobType: "volume_balance",
CanDetect: true,
CanExecute: true,
MaxDetectionConcurrency: 1,
MaxExecutionConcurrency: 1,
DisplayName: "Volume Balance",
Description: "Moves volumes between servers to reduce skew in volume distribution",
Weight: 50,
}
}
func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
return &plugin_pb.JobTypeDescriptor{
JobType: "volume_balance",
DisplayName: "Volume Balance",
Description: "Detect and execute volume moves to balance server load",
Icon: "fas fa-balance-scale",
DescriptorVersion: 1,
AdminConfigForm: &plugin_pb.ConfigForm{
FormId: "volume-balance-admin",
Title: "Volume Balance Admin Config",
Description: "Admin-side controls for volume balance detection scope.",
Sections: []*plugin_pb.ConfigSection{
{
SectionId: "scope",
Title: "Scope",
Description: "Optional filters applied before balance detection.",
Fields: []*plugin_pb.ConfigField{
{
Name: "collection_filter",
Label: "Collection Filter",
Description: "Only detect balance opportunities in this collection when set.",
Placeholder: "all collections",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
},
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"collection_filter": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
},
},
WorkerConfigForm: &plugin_pb.ConfigForm{
FormId: "volume-balance-worker",
Title: "Volume Balance Worker Config",
Description: "Worker-side balance thresholds.",
Sections: []*plugin_pb.ConfigSection{
{
SectionId: "thresholds",
Title: "Detection Thresholds",
Description: "Controls for when balance jobs should be proposed.",
Fields: []*plugin_pb.ConfigField{
{
Name: "imbalance_threshold",
Label: "Imbalance Threshold",
Description: "Detect when skew exceeds this ratio.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_DOUBLE,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0}},
MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 1}},
},
{
Name: "min_server_count",
Label: "Minimum Server Count",
Description: "Require at least this many servers for balancing.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2}},
},
{
Name: "min_interval_seconds",
Label: "Minimum Detection Interval (s)",
Description: "Skip detection if the last successful run is more recent than this interval.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
},
},
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"imbalance_threshold": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2},
},
"min_server_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2},
},
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30 * 60},
},
},
},
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
Enabled: true,
DetectionIntervalSeconds: 30 * 60,
DetectionTimeoutSeconds: 120,
MaxJobsPerDetection: 100,
GlobalExecutionConcurrency: 16,
PerWorkerExecutionConcurrency: 4,
RetryLimit: 1,
RetryBackoffSeconds: 15,
JobTypeMaxRuntimeSeconds: 1800,
},
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
"imbalance_threshold": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2},
},
"min_server_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2},
},
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30 * 60},
},
},
}
}
func (h *VolumeBalanceHandler) Detect(
ctx context.Context,
request *plugin_pb.RunDetectionRequest,
sender DetectionSender,
) error {
if request == nil {
return fmt.Errorf("run detection request is nil")
}
if sender == nil {
return fmt.Errorf("detection sender is nil")
}
if request.JobType != "" && request.JobType != "volume_balance" {
return fmt.Errorf("job type %q is not handled by volume_balance worker", request.JobType)
}
workerConfig := deriveBalanceWorkerConfig(request.GetWorkerConfigValues())
if ShouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), workerConfig.MinIntervalSeconds) {
minInterval := time.Duration(workerConfig.MinIntervalSeconds) * time.Second
_ = sender.SendActivity(BuildDetectorActivity(
"skipped_by_interval",
fmt.Sprintf("VOLUME BALANCE: Detection skipped due to min interval (%s)", minInterval),
map[string]*plugin_pb.ConfigValue{
"min_interval_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(workerConfig.MinIntervalSeconds)},
},
},
))
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
JobType: "volume_balance",
Proposals: []*plugin_pb.JobProposal{},
HasMore: false,
}); err != nil {
return err
}
return sender.SendComplete(&plugin_pb.DetectionComplete{
JobType: "volume_balance",
Success: true,
TotalProposals: 0,
})
}
collectionFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "collection_filter", ""))
masters := make([]string, 0)
if request.ClusterContext != nil {
masters = append(masters, request.ClusterContext.MasterGrpcAddresses...)
}
metrics, activeTopology, err := h.collectVolumeMetrics(ctx, masters, collectionFilter)
if err != nil {
return err
}
clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology}
maxResults := int(request.MaxResults)
results, hasMore, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults)
if err != nil {
return err
}
if traceErr := emitVolumeBalanceDetectionDecisionTrace(sender, metrics, activeTopology, workerConfig.TaskConfig, results); traceErr != nil {
glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr)
}
proposals := make([]*plugin_pb.JobProposal, 0, len(results))
for _, result := range results {
proposal, proposalErr := buildVolumeBalanceProposal(result)
if proposalErr != nil {
glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", proposalErr)
continue
}
proposals = append(proposals, proposal)
}
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
JobType: "volume_balance",
Proposals: proposals,
HasMore: hasMore,
}); err != nil {
return err
}
return sender.SendComplete(&plugin_pb.DetectionComplete{
JobType: "volume_balance",
Success: true,
TotalProposals: int32(len(proposals)),
})
}
func emitVolumeBalanceDetectionDecisionTrace(
sender DetectionSender,
metrics []*workertypes.VolumeHealthMetrics,
activeTopology *topology.ActiveTopology,
taskConfig *balancetask.Config,
results []*workertypes.TaskDetectionResult,
) error {
if sender == nil || taskConfig == nil {
return nil
}
totalVolumes := len(metrics)
summaryMessage := ""
if len(results) == 0 {
summaryMessage = fmt.Sprintf(
"BALANCE: No tasks created for %d volumes across %d disk type(s). Threshold=%.1f%%, MinServers=%d",
totalVolumes,
countBalanceDiskTypes(metrics),
taskConfig.ImbalanceThreshold*100,
taskConfig.MinServerCount,
)
} else {
summaryMessage = fmt.Sprintf(
"BALANCE: Created %d task(s) for %d volumes across %d disk type(s). Threshold=%.1f%%, MinServers=%d",
len(results),
totalVolumes,
countBalanceDiskTypes(metrics),
taskConfig.ImbalanceThreshold*100,
taskConfig.MinServerCount,
)
}
if err := sender.SendActivity(BuildDetectorActivity("decision_summary", summaryMessage, map[string]*plugin_pb.ConfigValue{
"total_volumes": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalVolumes)},
},
"selected_tasks": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(results))},
},
"imbalance_threshold_percent": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: taskConfig.ImbalanceThreshold * 100},
},
"min_server_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.MinServerCount)},
},
})); err != nil {
return err
}
volumesByDiskType := make(map[string][]*workertypes.VolumeHealthMetrics)
for _, metric := range metrics {
if metric == nil {
continue
}
diskType := strings.TrimSpace(metric.DiskType)
if diskType == "" {
diskType = "unknown"
}
volumesByDiskType[diskType] = append(volumesByDiskType[diskType], metric)
}
diskTypes := make([]string, 0, len(volumesByDiskType))
for diskType := range volumesByDiskType {
diskTypes = append(diskTypes, diskType)
}
sort.Strings(diskTypes)
const minVolumeCount = 2
detailCount := 0
for _, diskType := range diskTypes {
diskMetrics := volumesByDiskType[diskType]
volumeCount := len(diskMetrics)
if volumeCount < minVolumeCount {
message := fmt.Sprintf(
"BALANCE [%s]: No tasks created - cluster too small (%d volumes, need ≥%d)",
diskType,
volumeCount,
minVolumeCount,
)
if err := sender.SendActivity(BuildDetectorActivity("decision_disk_type", message, map[string]*plugin_pb.ConfigValue{
"disk_type": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: diskType},
},
"volume_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(volumeCount)},
},
"required_min_volume_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: minVolumeCount},
},
})); err != nil {
return err
}
detailCount++
if detailCount >= 3 {
break
}
continue
}
// Seed server counts from topology so zero-volume servers are included,
// matching the same logic used in balancetask.Detection.
serverVolumeCounts := make(map[string]int)
if activeTopology != nil {
topologyInfo := activeTopology.GetTopologyInfo()
if topologyInfo != nil {
for _, dc := range topologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
for diskTypeName := range node.DiskInfos {
if diskTypeName == diskType {
serverVolumeCounts[node.Id] = 0
}
}
}
}
}
}
}
for _, metric := range diskMetrics {
serverVolumeCounts[metric.Server]++
}
if len(serverVolumeCounts) < taskConfig.MinServerCount {
message := fmt.Sprintf(
"BALANCE [%s]: No tasks created - too few servers (%d servers, need ≥%d)",
diskType,
len(serverVolumeCounts),
taskConfig.MinServerCount,
)
if err := sender.SendActivity(BuildDetectorActivity("decision_disk_type", message, map[string]*plugin_pb.ConfigValue{
"disk_type": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: diskType},
},
"server_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(serverVolumeCounts))},
},
"required_min_server_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.MinServerCount)},
},
})); err != nil {
return err
}
detailCount++
if detailCount >= 3 {
break
}
continue
}
totalDiskTypeVolumes := len(diskMetrics)
avgVolumesPerServer := float64(totalDiskTypeVolumes) / float64(len(serverVolumeCounts))
maxVolumes := 0
minVolumes := totalDiskTypeVolumes
maxServer := ""
minServer := ""
for server, count := range serverVolumeCounts {
if count > maxVolumes {
maxVolumes = count
maxServer = server
}
if count < minVolumes {
minVolumes = count
minServer = server
}
}
imbalanceRatio := 0.0
if avgVolumesPerServer > 0 {
imbalanceRatio = float64(maxVolumes-minVolumes) / avgVolumesPerServer
}
stage := "decision_disk_type"
message := ""
if imbalanceRatio <= taskConfig.ImbalanceThreshold {
message = fmt.Sprintf(
"BALANCE [%s]: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f",
diskType,
imbalanceRatio*100,
taskConfig.ImbalanceThreshold*100,
maxVolumes,
maxServer,
minVolumes,
minServer,
avgVolumesPerServer,
)
} else {
stage = "decision_candidate"
message = fmt.Sprintf(
"BALANCE [%s]: Candidate detected. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f",
diskType,
imbalanceRatio*100,
taskConfig.ImbalanceThreshold*100,
maxVolumes,
maxServer,
minVolumes,
minServer,
avgVolumesPerServer,
)
}
if err := sender.SendActivity(BuildDetectorActivity(stage, message, map[string]*plugin_pb.ConfigValue{
"disk_type": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: diskType},
},
"volume_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalDiskTypeVolumes)},
},
"server_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(serverVolumeCounts))},
},
"imbalance_percent": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: imbalanceRatio * 100},
},
"threshold_percent": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: taskConfig.ImbalanceThreshold * 100},
},
"max_volumes": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(maxVolumes)},
},
"min_volumes": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(minVolumes)},
},
"avg_volumes_per_server": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: avgVolumesPerServer},
},
})); err != nil {
return err
}
detailCount++
if detailCount >= 3 {
break
}
}
return nil
}
func countBalanceDiskTypes(metrics []*workertypes.VolumeHealthMetrics) int {
diskTypes := make(map[string]struct{})
for _, metric := range metrics {
if metric == nil {
continue
}
diskType := strings.TrimSpace(metric.DiskType)
if diskType == "" {
diskType = "unknown"
}
diskTypes[diskType] = struct{}{}
}
return len(diskTypes)
}
func (h *VolumeBalanceHandler) Execute(
ctx context.Context,
request *plugin_pb.ExecuteJobRequest,
sender ExecutionSender,
) error {
if request == nil || request.Job == nil {
return fmt.Errorf("execute request/job is nil")
}
if sender == nil {
return fmt.Errorf("execution sender is nil")
}
if request.Job.JobType != "" && request.Job.JobType != "volume_balance" {
return fmt.Errorf("job type %q is not handled by volume_balance worker", request.Job.JobType)
}
params, err := decodeVolumeBalanceTaskParams(request.Job)
if err != nil {
return err
}
if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" {
return fmt.Errorf("volume balance source node is required")
}
if len(params.Targets) == 0 || strings.TrimSpace(params.Targets[0].Node) == "" {
return fmt.Errorf("volume balance target node is required")
}
applyBalanceExecutionDefaults(params)
task := balancetask.NewBalanceTask(
request.Job.JobId,
params.Sources[0].Node,
params.VolumeId,
params.Collection,
h.grpcDialOption,
)
task.SetProgressCallback(func(progress float64, stage string) {
message := fmt.Sprintf("balance progress %.0f%%", progress)
if strings.TrimSpace(stage) != "" {
message = stage
}
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_RUNNING,
ProgressPercent: progress,
Stage: stage,
Message: message,
Activities: []*plugin_pb.ActivityEvent{
BuildExecutorActivity(stage, message),
},
})
})
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_ASSIGNED,
ProgressPercent: 0,
Stage: "assigned",
Message: "volume balance job accepted",
Activities: []*plugin_pb.ActivityEvent{
BuildExecutorActivity("assigned", "volume balance job accepted"),
},
}); err != nil {
return err
}
if err := task.Execute(ctx, params); err != nil {
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_FAILED,
ProgressPercent: 100,
Stage: "failed",
Message: err.Error(),
Activities: []*plugin_pb.ActivityEvent{
BuildExecutorActivity("failed", err.Error()),
},
})
return err
}
sourceNode := params.Sources[0].Node
targetNode := params.Targets[0].Node
resultSummary := fmt.Sprintf("volume %d moved from %s to %s", params.VolumeId, sourceNode, targetNode)
return sender.SendCompleted(&plugin_pb.JobCompleted{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
Success: true,
Result: &plugin_pb.JobResult{
Summary: resultSummary,
OutputValues: map[string]*plugin_pb.ConfigValue{
"volume_id": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(params.VolumeId)},
},
"source_server": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode},
},
"target_server": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: targetNode},
},
},
},
Activities: []*plugin_pb.ActivityEvent{
BuildExecutorActivity("completed", resultSummary),
},
})
}
func (h *VolumeBalanceHandler) collectVolumeMetrics(
ctx context.Context,
masterAddresses []string,
collectionFilter string,
) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) {
// Reuse the same master topology fetch/build flow used by the vacuum handler.
helper := &VacuumHandler{grpcDialOption: h.grpcDialOption}
return helper.collectVolumeMetrics(ctx, masterAddresses, collectionFilter)
}
func deriveBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *volumeBalanceWorkerConfig {
taskConfig := balancetask.NewDefaultConfig()
imbalanceThreshold := readDoubleConfig(values, "imbalance_threshold", taskConfig.ImbalanceThreshold)
if imbalanceThreshold < 0 {
imbalanceThreshold = 0
}
if imbalanceThreshold > 1 {
imbalanceThreshold = 1
}
taskConfig.ImbalanceThreshold = imbalanceThreshold
minServerCount := int(readInt64Config(values, "min_server_count", int64(taskConfig.MinServerCount)))
if minServerCount < 2 {
minServerCount = 2
}
taskConfig.MinServerCount = minServerCount
minIntervalSeconds := int(readInt64Config(values, "min_interval_seconds", 0))
if minIntervalSeconds < 0 {
minIntervalSeconds = 0
}
return &volumeBalanceWorkerConfig{
TaskConfig: taskConfig,
MinIntervalSeconds: minIntervalSeconds,
}
}
func buildVolumeBalanceProposal(
result *workertypes.TaskDetectionResult,
) (*plugin_pb.JobProposal, error) {
if result == nil {
return nil, fmt.Errorf("task detection result is nil")
}
if result.TypedParams == nil {
return nil, fmt.Errorf("missing typed params for volume %d", result.VolumeID)
}
params := proto.Clone(result.TypedParams).(*worker_pb.TaskParams)
applyBalanceExecutionDefaults(params)
paramsPayload, err := proto.Marshal(params)
if err != nil {
return nil, fmt.Errorf("marshal task params: %w", err)
}
proposalID := strings.TrimSpace(result.TaskID)
if proposalID == "" {
proposalID = fmt.Sprintf("volume-balance-%d-%d", result.VolumeID, time.Now().UnixNano())
}
dedupeKey := fmt.Sprintf("volume_balance:%d", result.VolumeID)
if result.Collection != "" {
dedupeKey += ":" + result.Collection
}
sourceNode := ""
if len(params.Sources) > 0 {
sourceNode = strings.TrimSpace(params.Sources[0].Node)
}
targetNode := ""
if len(params.Targets) > 0 {
targetNode = strings.TrimSpace(params.Targets[0].Node)
}
summary := fmt.Sprintf("Balance volume %d", result.VolumeID)
if sourceNode != "" && targetNode != "" {
summary = fmt.Sprintf("Move volume %d from %s to %s", result.VolumeID, sourceNode, targetNode)
}
return &plugin_pb.JobProposal{
ProposalId: proposalID,
DedupeKey: dedupeKey,
JobType: "volume_balance",
Priority: mapTaskPriority(result.Priority),
Summary: summary,
Detail: strings.TrimSpace(result.Reason),
Parameters: map[string]*plugin_pb.ConfigValue{
"task_params_pb": {
Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: paramsPayload},
},
"volume_id": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(result.VolumeID)},
},
"source_server": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode},
},
"target_server": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: targetNode},
},
"collection": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection},
},
},
Labels: map[string]string{
"task_type": "balance",
"volume_id": fmt.Sprintf("%d", result.VolumeID),
"collection": result.Collection,
"source_node": sourceNode,
"target_node": targetNode,
"source_server": sourceNode,
"target_server": targetNode,
},
}, nil
}
func decodeVolumeBalanceTaskParams(job *plugin_pb.JobSpec) (*worker_pb.TaskParams, error) {
if job == nil {
return nil, fmt.Errorf("job spec is nil")
}
if payload := readBytesConfig(job.Parameters, "task_params_pb"); len(payload) > 0 {
params := &worker_pb.TaskParams{}
if err := proto.Unmarshal(payload, params); err != nil {
return nil, fmt.Errorf("unmarshal task_params_pb: %w", err)
}
if params.TaskId == "" {
params.TaskId = job.JobId
}
return params, nil
}
volumeID := readInt64Config(job.Parameters, "volume_id", 0)
sourceNode := strings.TrimSpace(readStringConfig(job.Parameters, "source_server", ""))
if sourceNode == "" {
sourceNode = strings.TrimSpace(readStringConfig(job.Parameters, "server", ""))
}
targetNode := strings.TrimSpace(readStringConfig(job.Parameters, "target_server", ""))
if targetNode == "" {
targetNode = strings.TrimSpace(readStringConfig(job.Parameters, "target", ""))
}
collection := readStringConfig(job.Parameters, "collection", "")
timeoutSeconds := int32(readInt64Config(job.Parameters, "timeout_seconds", int64(defaultBalanceTimeoutSeconds)))
if timeoutSeconds <= 0 {
timeoutSeconds = defaultBalanceTimeoutSeconds
}
forceMove := readBoolConfig(job.Parameters, "force_move", false)
if volumeID <= 0 {
return nil, fmt.Errorf("missing volume_id in job parameters")
}
if sourceNode == "" {
return nil, fmt.Errorf("missing source_server in job parameters")
}
if targetNode == "" {
return nil, fmt.Errorf("missing target_server in job parameters")
}
return &worker_pb.TaskParams{
TaskId: job.JobId,
VolumeId: uint32(volumeID),
Collection: collection,
Sources: []*worker_pb.TaskSource{
{
Node: sourceNode,
VolumeId: uint32(volumeID),
},
},
Targets: []*worker_pb.TaskTarget{
{
Node: targetNode,
VolumeId: uint32(volumeID),
},
},
TaskParams: &worker_pb.TaskParams_BalanceParams{
BalanceParams: &worker_pb.BalanceTaskParams{
ForceMove: forceMove,
TimeoutSeconds: timeoutSeconds,
},
},
}, nil
}
func applyBalanceExecutionDefaults(params *worker_pb.TaskParams) {
if params == nil {
return
}
balanceParams := params.GetBalanceParams()
if balanceParams == nil {
params.TaskParams = &worker_pb.TaskParams_BalanceParams{
BalanceParams: &worker_pb.BalanceTaskParams{
ForceMove: false,
TimeoutSeconds: defaultBalanceTimeoutSeconds,
},
}
return
}
if balanceParams.TimeoutSeconds <= 0 {
balanceParams.TimeoutSeconds = defaultBalanceTimeoutSeconds
}
}
func readBoolConfig(values map[string]*plugin_pb.ConfigValue, field string, fallback bool) bool {
if values == nil {
return fallback
}
value := values[field]
if value == nil {
return fallback
}
switch kind := value.Kind.(type) {
case *plugin_pb.ConfigValue_BoolValue:
return kind.BoolValue
case *plugin_pb.ConfigValue_Int64Value:
return kind.Int64Value != 0
case *plugin_pb.ConfigValue_DoubleValue:
return kind.DoubleValue != 0
case *plugin_pb.ConfigValue_StringValue:
text := strings.TrimSpace(strings.ToLower(kind.StringValue))
switch text {
case "1", "true", "yes", "on":
return true
case "0", "false", "no", "off":
return false
}
}
return fallback
}