* proto: add BalanceMoveSpec and batch fields to BalanceTaskParams Add BalanceMoveSpec message for encoding individual volume moves, and max_concurrent_moves + repeated moves fields to BalanceTaskParams to support batching multiple volume moves in a single job. * balance handler: add batch execution with concurrent volume moves Refactor Execute() into executeSingleMove() (backward compatible) and executeBatchMoves() which runs multiple volume moves concurrently using a semaphore-bounded goroutine pool. When BalanceTaskParams.Moves is populated, the batch path is taken; otherwise the single-move path. Includes aggregate progress reporting across concurrent moves, per-move error collection, and partial failure support. * balance handler: add batch config fields to Descriptor and worker config Add max_concurrent_moves and batch_size fields to the worker config form and deriveBalanceWorkerConfig(). These control how many volume moves run concurrently within a batch job and the maximum batch size. * balance handler: group detection proposals into batch jobs When batch_size > 1, the Detect method groups detection results into batch proposals where each proposal encodes multiple BalanceMoveSpec entries in BalanceTaskParams.Moves. Single-result batches fall back to the existing single-move proposal format for backward compatibility. * admin UI: add volume balance execution plan and batch badge Add renderBalanceExecutionPlan() for rich rendering of volume balance jobs in the job detail modal. Single-move jobs show source/target/volume info; batch jobs show a moves table with all volume moves. Add batch badge (e.g., "5 moves") next to job type in the execution jobs table when the job has batch=true label. * Update plugin_templ.go * fix: detection algorithm uses greedy target instead of divergent topology scores The detection loop tracked effective volume counts via an adjustments map, but createBalanceTask independently called planBalanceDestination which used the topology's LoadCount — a separate, unadjusted source of truth. This divergence caused multiple moves to pile onto the same server. Changes: - Add resolveBalanceDestination to resolve the detection loop's greedy target (minServer) rather than independently picking a destination - Add oscillation guard: stop when max-min <= 1 since no single move can improve the balance beyond that point - Track unseeded destinations: if a target server wasn't in the initial serverVolumeCounts, add it so subsequent iterations include it - Add TestDetection_UnseededDestinationDoesNotOverload * fix: handler force_move propagation, partial failure, deterministic dedupe - Propagate ForceMove from outer BalanceTaskParams to individual move TaskParams so batch moves respect the force_move flag - Fix partial failure: mark job successful if at least one move succeeded (succeeded > 0 || failed == 0) to avoid re-running already-completed moves on retry - Use SHA-256 hash for deterministic dedupe key fallback instead of time.Now().UnixNano() which is non-deterministic - Remove unused successDetails variable - Extract maxProposalStringLength constant to replace magic number 200 * admin UI: use template literals in balance execution plan rendering * fix: integration test handles batch proposals from batched detection With batch_size=20, all moves are grouped into a single proposal containing BalanceParams.Moves instead of top-level Sources/Targets. Update assertions to handle both batch and single-move proposal formats. * fix: verify volume size on target before deleting source during balance Add a pre-delete safety check that reads the volume file status on both source and target, then compares .dat file size and file count. If they don't match, the move is aborted — leaving the source intact rather than risking irreversible data loss. Also removes the redundant mountVolume call since VolumeCopy already mounts the volume on the target server. * fix: clamp maxConcurrent, serialize progress sends, validate config as int64 - Clamp maxConcurrentMoves to defaultMaxConcurrentMoves before creating the semaphore so a stale or malicious job cannot request unbounded concurrent volume moves - Extend progressMu to cover sender.SendProgress calls since the underlying gRPC stream is not safe for concurrent writes - Perform bounds checks on max_concurrent_moves and batch_size in int64 space before casting to int, avoiding potential overflow on 32-bit * fix: check disk capacity in resolveBalanceDestination Skip disks where VolumeCount >= MaxVolumeCount so the detection loop does not propose moves to a full disk that would fail at execution time. * test: rename unseeded destination test to match actual behavior The test exercises a server with 0 volumes that IS seeded from topology (matching disk type), not an unseeded destination. Rename to TestDetection_ZeroVolumeServerIncludedInBalance and fix comments. * test: tighten integration test to assert exactly one batch proposal With default batch_size=20, all moves should be grouped into a single batch proposal. Assert len(proposals)==1 and require BalanceParams with Moves, removing the legacy single-move else branch. * fix: propagate ctx to RPCs and restore source writability on abort - All helper methods (markVolumeReadonly, copyVolume, tailVolume, readVolumeFileStatus, deleteVolume) now accept a context parameter instead of using context.Background(), so Execute's ctx propagates cancellation and timeouts into every volume server RPC - Add deferred cleanup that restores the source volume to writable if any step after markVolumeReadonly fails, preventing the source from being left permanently readonly on abort - Add markVolumeWritable helper using VolumeMarkWritableRequest * fix: deep-copy protobuf messages in test recording sender Use proto.Clone in recordingExecutionSender to store immutable snapshots of JobProgressUpdate and JobCompleted, preventing assertions from observing mutations if the handler reuses message pointers. * fix: add VolumeMarkWritable and ReadVolumeFileStatus to fake volume server The balance task now calls ReadVolumeFileStatus for pre-delete verification and VolumeMarkWritable to restore writability on abort. Add both RPCs to the test fake, and drop the mountCalls assertion since BalanceTask no longer calls VolumeMount directly (VolumeCopy handles it). * fix: use maxConcurrentMovesLimit (50) for clamp, not defaultMaxConcurrentMoves defaultMaxConcurrentMoves (5) is the fallback when the field is unset, not an upper bound. Clamping to it silently overrides valid config values like 10/20/50. Introduce maxConcurrentMovesLimit (50) matching the descriptor's MaxValue and clamp to that instead. * fix: cancel batch moves on progress stream failure Derive a cancellable batchCtx from the caller's ctx. If sender.SendProgress returns an error (client disconnect, context cancelled), capture it, skip further sends, and cancel batchCtx so in-flight moves abort via their propagated context rather than running blind to completion. * fix: bound cleanup timeout and validate batch move fields - Use a 30-second timeout for the deferred markVolumeWritable cleanup instead of context.Background() which can block indefinitely if the volume server is unreachable - Validate required fields (VolumeID, SourceNode, TargetNode) before appending moves to a batch proposal, skipping invalid entries - Fall back to a single-move proposal when filtering leaves only one valid move in a batch * fix: cancel task execution on SendProgress stream failure All handler progress callbacks previously ignored SendProgress errors, allowing tasks to continue executing after the client disconnected. Now each handler creates a derived cancellable context and cancels it on the first SendProgress error, stopping the in-flight task promptly. Handlers fixed: erasure_coding, vacuum, volume_balance (single-move), and admin_script (breaks command loop on send failure). * fix: validate batch moves before scheduling in executeBatchMoves Reject empty batches, enforce a hard upper bound (100 moves), and filter out nil or incomplete move specs (missing source/target/volume) before allocating progress tracking and launching goroutines. * test: add batch balance execution integration test Tests the batch move path with 3 volumes, max concurrency 2, using fake volume servers. Verifies all moves complete with correct readonly, copy, tail, and delete RPC counts. * test: add MarkWritableCount and ReadFileStatusCount accessors Expose the markWritableCalls and readFileStatusCalls counters on the fake volume server, following the existing MarkReadonlyCount pattern. * fix: oscillation guard uses global effective counts for heterogeneous capacity The oscillation guard (max-min <= 1) previously used maxServer/minServer which are determined by utilization ratio. With heterogeneous capacity, maxServer by utilization can have fewer raw volumes than minServer, producing a negative diff and incorrectly triggering the guard. Now scans all servers' effective counts to find the true global max/min volume counts, so the guard works correctly regardless of whether utilization-based or raw-count balancing is used. * fix: admin script handler breaks outer loop on SendProgress failure The break on SendProgress error inside the shell.Commands scan only exited the inner loop, letting the outer command loop continue executing commands on a broken stream. Use a sendBroken flag to propagate the break to the outer execCommands loop.
1333 lines
41 KiB
Go
1333 lines
41 KiB
Go
package pluginworker
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
|
balancetask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
|
|
workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
const (
|
|
defaultBalanceTimeoutSeconds = int32(10 * 60)
|
|
maxProposalStringLength = 200
|
|
)
|
|
|
|
func init() {
|
|
RegisterHandler(HandlerFactory{
|
|
JobType: "volume_balance",
|
|
Category: CategoryDefault,
|
|
Aliases: []string{"balance", "volume.balance", "volume-balance"},
|
|
Build: func(opts HandlerBuildOptions) (JobHandler, error) {
|
|
return NewVolumeBalanceHandler(opts.GrpcDialOption), nil
|
|
},
|
|
})
|
|
}
|
|
|
|
type volumeBalanceWorkerConfig struct {
|
|
TaskConfig *balancetask.Config
|
|
MinIntervalSeconds int
|
|
MaxConcurrentMoves int
|
|
BatchSize int
|
|
}
|
|
|
|
// VolumeBalanceHandler is the plugin job handler for volume balancing.
|
|
type VolumeBalanceHandler struct {
|
|
grpcDialOption grpc.DialOption
|
|
}
|
|
|
|
func NewVolumeBalanceHandler(grpcDialOption grpc.DialOption) *VolumeBalanceHandler {
|
|
return &VolumeBalanceHandler{grpcDialOption: grpcDialOption}
|
|
}
|
|
|
|
func (h *VolumeBalanceHandler) Capability() *plugin_pb.JobTypeCapability {
|
|
return &plugin_pb.JobTypeCapability{
|
|
JobType: "volume_balance",
|
|
CanDetect: true,
|
|
CanExecute: true,
|
|
MaxDetectionConcurrency: 1,
|
|
MaxExecutionConcurrency: 1,
|
|
DisplayName: "Volume Balance",
|
|
Description: "Moves volumes between servers to reduce skew in volume distribution",
|
|
Weight: 50,
|
|
}
|
|
}
|
|
|
|
func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
|
|
return &plugin_pb.JobTypeDescriptor{
|
|
JobType: "volume_balance",
|
|
DisplayName: "Volume Balance",
|
|
Description: "Detect and execute volume moves to balance server load",
|
|
Icon: "fas fa-balance-scale",
|
|
DescriptorVersion: 1,
|
|
AdminConfigForm: &plugin_pb.ConfigForm{
|
|
FormId: "volume-balance-admin",
|
|
Title: "Volume Balance Admin Config",
|
|
Description: "Admin-side controls for volume balance detection scope.",
|
|
Sections: []*plugin_pb.ConfigSection{
|
|
{
|
|
SectionId: "scope",
|
|
Title: "Scope",
|
|
Description: "Optional filters applied before balance detection.",
|
|
Fields: []*plugin_pb.ConfigField{
|
|
{
|
|
Name: "collection_filter",
|
|
Label: "Collection Filter",
|
|
Description: "Only detect balance opportunities in this collection when set.",
|
|
Placeholder: "all collections",
|
|
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
|
|
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
DefaultValues: map[string]*plugin_pb.ConfigValue{
|
|
"collection_filter": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
|
|
},
|
|
},
|
|
},
|
|
WorkerConfigForm: &plugin_pb.ConfigForm{
|
|
FormId: "volume-balance-worker",
|
|
Title: "Volume Balance Worker Config",
|
|
Description: "Worker-side balance thresholds.",
|
|
Sections: []*plugin_pb.ConfigSection{
|
|
{
|
|
SectionId: "thresholds",
|
|
Title: "Detection Thresholds",
|
|
Description: "Controls for when balance jobs should be proposed.",
|
|
Fields: []*plugin_pb.ConfigField{
|
|
{
|
|
Name: "imbalance_threshold",
|
|
Label: "Imbalance Threshold",
|
|
Description: "Detect when skew exceeds this ratio.",
|
|
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_DOUBLE,
|
|
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
|
|
Required: true,
|
|
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0}},
|
|
MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 1}},
|
|
},
|
|
{
|
|
Name: "min_server_count",
|
|
Label: "Minimum Server Count",
|
|
Description: "Require at least this many servers for balancing.",
|
|
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
|
|
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
|
|
Required: true,
|
|
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2}},
|
|
},
|
|
{
|
|
Name: "min_interval_seconds",
|
|
Label: "Minimum Detection Interval (s)",
|
|
Description: "Skip detection if the last successful run is more recent than this interval.",
|
|
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
|
|
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
|
|
Required: true,
|
|
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
SectionId: "batch_execution",
|
|
Title: "Batch Execution",
|
|
Description: "Controls for running multiple volume moves per job. The worker coordinates moves via gRPC and is not on the data path.",
|
|
Fields: []*plugin_pb.ConfigField{
|
|
{
|
|
Name: "max_concurrent_moves",
|
|
Label: "Max Concurrent Moves",
|
|
Description: "Maximum number of volume moves to run concurrently within a single batch job.",
|
|
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
|
|
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
|
|
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
|
|
MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 50}},
|
|
},
|
|
{
|
|
Name: "batch_size",
|
|
Label: "Batch Size",
|
|
Description: "Maximum number of volume moves to group into a single job. Set to 1 to disable batching.",
|
|
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
|
|
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
|
|
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
|
|
MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 100}},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
DefaultValues: map[string]*plugin_pb.ConfigValue{
|
|
"imbalance_threshold": {
|
|
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2},
|
|
},
|
|
"min_server_count": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2},
|
|
},
|
|
"min_interval_seconds": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30 * 60},
|
|
},
|
|
"max_concurrent_moves": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(defaultMaxConcurrentMoves)},
|
|
},
|
|
"batch_size": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 20},
|
|
},
|
|
},
|
|
},
|
|
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
|
|
Enabled: true,
|
|
DetectionIntervalSeconds: 30 * 60,
|
|
DetectionTimeoutSeconds: 120,
|
|
MaxJobsPerDetection: 100,
|
|
GlobalExecutionConcurrency: 16,
|
|
PerWorkerExecutionConcurrency: 4,
|
|
RetryLimit: 1,
|
|
RetryBackoffSeconds: 15,
|
|
JobTypeMaxRuntimeSeconds: 1800,
|
|
},
|
|
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
|
|
"imbalance_threshold": {
|
|
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2},
|
|
},
|
|
"min_server_count": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2},
|
|
},
|
|
"min_interval_seconds": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30 * 60},
|
|
},
|
|
"max_concurrent_moves": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(defaultMaxConcurrentMoves)},
|
|
},
|
|
"batch_size": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 20},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (h *VolumeBalanceHandler) Detect(
|
|
ctx context.Context,
|
|
request *plugin_pb.RunDetectionRequest,
|
|
sender DetectionSender,
|
|
) error {
|
|
if request == nil {
|
|
return fmt.Errorf("run detection request is nil")
|
|
}
|
|
if sender == nil {
|
|
return fmt.Errorf("detection sender is nil")
|
|
}
|
|
if request.JobType != "" && request.JobType != "volume_balance" {
|
|
return fmt.Errorf("job type %q is not handled by volume_balance worker", request.JobType)
|
|
}
|
|
|
|
workerConfig := deriveBalanceWorkerConfig(request.GetWorkerConfigValues())
|
|
if ShouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), workerConfig.MinIntervalSeconds) {
|
|
minInterval := time.Duration(workerConfig.MinIntervalSeconds) * time.Second
|
|
_ = sender.SendActivity(BuildDetectorActivity(
|
|
"skipped_by_interval",
|
|
fmt.Sprintf("VOLUME BALANCE: Detection skipped due to min interval (%s)", minInterval),
|
|
map[string]*plugin_pb.ConfigValue{
|
|
"min_interval_seconds": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(workerConfig.MinIntervalSeconds)},
|
|
},
|
|
},
|
|
))
|
|
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
|
|
JobType: "volume_balance",
|
|
Proposals: []*plugin_pb.JobProposal{},
|
|
HasMore: false,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
return sender.SendComplete(&plugin_pb.DetectionComplete{
|
|
JobType: "volume_balance",
|
|
Success: true,
|
|
TotalProposals: 0,
|
|
})
|
|
}
|
|
|
|
collectionFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "collection_filter", ""))
|
|
masters := make([]string, 0)
|
|
if request.ClusterContext != nil {
|
|
masters = append(masters, request.ClusterContext.MasterGrpcAddresses...)
|
|
}
|
|
|
|
metrics, activeTopology, err := h.collectVolumeMetrics(ctx, masters, collectionFilter)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology}
|
|
maxResults := int(request.MaxResults)
|
|
results, hasMore, err := balancetask.Detection(metrics, clusterInfo, workerConfig.TaskConfig, maxResults)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if traceErr := emitVolumeBalanceDetectionDecisionTrace(sender, metrics, activeTopology, workerConfig.TaskConfig, results); traceErr != nil {
|
|
glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr)
|
|
}
|
|
|
|
var proposals []*plugin_pb.JobProposal
|
|
if workerConfig.BatchSize > 1 && len(results) > 1 {
|
|
proposals = buildBatchVolumeBalanceProposals(results, workerConfig.BatchSize, workerConfig.MaxConcurrentMoves)
|
|
} else {
|
|
proposals = make([]*plugin_pb.JobProposal, 0, len(results))
|
|
for _, result := range results {
|
|
proposal, proposalErr := buildVolumeBalanceProposal(result)
|
|
if proposalErr != nil {
|
|
glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", proposalErr)
|
|
continue
|
|
}
|
|
proposals = append(proposals, proposal)
|
|
}
|
|
}
|
|
|
|
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
|
|
JobType: "volume_balance",
|
|
Proposals: proposals,
|
|
HasMore: hasMore,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return sender.SendComplete(&plugin_pb.DetectionComplete{
|
|
JobType: "volume_balance",
|
|
Success: true,
|
|
TotalProposals: int32(len(proposals)),
|
|
})
|
|
}
|
|
|
|
func emitVolumeBalanceDetectionDecisionTrace(
|
|
sender DetectionSender,
|
|
metrics []*workertypes.VolumeHealthMetrics,
|
|
activeTopology *topology.ActiveTopology,
|
|
taskConfig *balancetask.Config,
|
|
results []*workertypes.TaskDetectionResult,
|
|
) error {
|
|
if sender == nil || taskConfig == nil {
|
|
return nil
|
|
}
|
|
|
|
totalVolumes := len(metrics)
|
|
summaryMessage := ""
|
|
if len(results) == 0 {
|
|
summaryMessage = fmt.Sprintf(
|
|
"BALANCE: No tasks created for %d volumes across %d disk type(s). Threshold=%.1f%%, MinServers=%d",
|
|
totalVolumes,
|
|
countBalanceDiskTypes(metrics),
|
|
taskConfig.ImbalanceThreshold*100,
|
|
taskConfig.MinServerCount,
|
|
)
|
|
} else {
|
|
summaryMessage = fmt.Sprintf(
|
|
"BALANCE: Created %d task(s) for %d volumes across %d disk type(s). Threshold=%.1f%%, MinServers=%d",
|
|
len(results),
|
|
totalVolumes,
|
|
countBalanceDiskTypes(metrics),
|
|
taskConfig.ImbalanceThreshold*100,
|
|
taskConfig.MinServerCount,
|
|
)
|
|
}
|
|
|
|
if err := sender.SendActivity(BuildDetectorActivity("decision_summary", summaryMessage, map[string]*plugin_pb.ConfigValue{
|
|
"total_volumes": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalVolumes)},
|
|
},
|
|
"selected_tasks": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(results))},
|
|
},
|
|
"imbalance_threshold_percent": {
|
|
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: taskConfig.ImbalanceThreshold * 100},
|
|
},
|
|
"min_server_count": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.MinServerCount)},
|
|
},
|
|
})); err != nil {
|
|
return err
|
|
}
|
|
|
|
volumesByDiskType := make(map[string][]*workertypes.VolumeHealthMetrics)
|
|
for _, metric := range metrics {
|
|
if metric == nil {
|
|
continue
|
|
}
|
|
diskType := strings.TrimSpace(metric.DiskType)
|
|
if diskType == "" {
|
|
diskType = "unknown"
|
|
}
|
|
volumesByDiskType[diskType] = append(volumesByDiskType[diskType], metric)
|
|
}
|
|
|
|
diskTypes := make([]string, 0, len(volumesByDiskType))
|
|
for diskType := range volumesByDiskType {
|
|
diskTypes = append(diskTypes, diskType)
|
|
}
|
|
sort.Strings(diskTypes)
|
|
|
|
const minVolumeCount = 2
|
|
detailCount := 0
|
|
for _, diskType := range diskTypes {
|
|
diskMetrics := volumesByDiskType[diskType]
|
|
volumeCount := len(diskMetrics)
|
|
if volumeCount < minVolumeCount {
|
|
message := fmt.Sprintf(
|
|
"BALANCE [%s]: No tasks created - cluster too small (%d volumes, need ≥%d)",
|
|
diskType,
|
|
volumeCount,
|
|
minVolumeCount,
|
|
)
|
|
if err := sender.SendActivity(BuildDetectorActivity("decision_disk_type", message, map[string]*plugin_pb.ConfigValue{
|
|
"disk_type": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: diskType},
|
|
},
|
|
"volume_count": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(volumeCount)},
|
|
},
|
|
"required_min_volume_count": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: minVolumeCount},
|
|
},
|
|
})); err != nil {
|
|
return err
|
|
}
|
|
detailCount++
|
|
if detailCount >= 3 {
|
|
break
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Seed server counts from topology so zero-volume servers are included,
|
|
// matching the same logic used in balancetask.Detection.
|
|
serverVolumeCounts := make(map[string]int)
|
|
if activeTopology != nil {
|
|
topologyInfo := activeTopology.GetTopologyInfo()
|
|
if topologyInfo != nil {
|
|
for _, dc := range topologyInfo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
for _, node := range rack.DataNodeInfos {
|
|
for diskTypeName := range node.DiskInfos {
|
|
if diskTypeName == diskType {
|
|
serverVolumeCounts[node.Id] = 0
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
for _, metric := range diskMetrics {
|
|
serverVolumeCounts[metric.Server]++
|
|
}
|
|
if len(serverVolumeCounts) < taskConfig.MinServerCount {
|
|
message := fmt.Sprintf(
|
|
"BALANCE [%s]: No tasks created - too few servers (%d servers, need ≥%d)",
|
|
diskType,
|
|
len(serverVolumeCounts),
|
|
taskConfig.MinServerCount,
|
|
)
|
|
if err := sender.SendActivity(BuildDetectorActivity("decision_disk_type", message, map[string]*plugin_pb.ConfigValue{
|
|
"disk_type": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: diskType},
|
|
},
|
|
"server_count": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(serverVolumeCounts))},
|
|
},
|
|
"required_min_server_count": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.MinServerCount)},
|
|
},
|
|
})); err != nil {
|
|
return err
|
|
}
|
|
detailCount++
|
|
if detailCount >= 3 {
|
|
break
|
|
}
|
|
continue
|
|
}
|
|
|
|
totalDiskTypeVolumes := len(diskMetrics)
|
|
avgVolumesPerServer := float64(totalDiskTypeVolumes) / float64(len(serverVolumeCounts))
|
|
maxVolumes := 0
|
|
minVolumes := totalDiskTypeVolumes
|
|
maxServer := ""
|
|
minServer := ""
|
|
for server, count := range serverVolumeCounts {
|
|
if count > maxVolumes {
|
|
maxVolumes = count
|
|
maxServer = server
|
|
}
|
|
if count < minVolumes {
|
|
minVolumes = count
|
|
minServer = server
|
|
}
|
|
}
|
|
|
|
imbalanceRatio := 0.0
|
|
if avgVolumesPerServer > 0 {
|
|
imbalanceRatio = float64(maxVolumes-minVolumes) / avgVolumesPerServer
|
|
}
|
|
|
|
stage := "decision_disk_type"
|
|
message := ""
|
|
if imbalanceRatio <= taskConfig.ImbalanceThreshold {
|
|
message = fmt.Sprintf(
|
|
"BALANCE [%s]: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f",
|
|
diskType,
|
|
imbalanceRatio*100,
|
|
taskConfig.ImbalanceThreshold*100,
|
|
maxVolumes,
|
|
maxServer,
|
|
minVolumes,
|
|
minServer,
|
|
avgVolumesPerServer,
|
|
)
|
|
} else {
|
|
stage = "decision_candidate"
|
|
message = fmt.Sprintf(
|
|
"BALANCE [%s]: Candidate detected. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f",
|
|
diskType,
|
|
imbalanceRatio*100,
|
|
taskConfig.ImbalanceThreshold*100,
|
|
maxVolumes,
|
|
maxServer,
|
|
minVolumes,
|
|
minServer,
|
|
avgVolumesPerServer,
|
|
)
|
|
}
|
|
|
|
if err := sender.SendActivity(BuildDetectorActivity(stage, message, map[string]*plugin_pb.ConfigValue{
|
|
"disk_type": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: diskType},
|
|
},
|
|
"volume_count": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalDiskTypeVolumes)},
|
|
},
|
|
"server_count": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(serverVolumeCounts))},
|
|
},
|
|
"imbalance_percent": {
|
|
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: imbalanceRatio * 100},
|
|
},
|
|
"threshold_percent": {
|
|
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: taskConfig.ImbalanceThreshold * 100},
|
|
},
|
|
"max_volumes": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(maxVolumes)},
|
|
},
|
|
"min_volumes": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(minVolumes)},
|
|
},
|
|
"avg_volumes_per_server": {
|
|
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: avgVolumesPerServer},
|
|
},
|
|
})); err != nil {
|
|
return err
|
|
}
|
|
|
|
detailCount++
|
|
if detailCount >= 3 {
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func countBalanceDiskTypes(metrics []*workertypes.VolumeHealthMetrics) int {
|
|
diskTypes := make(map[string]struct{})
|
|
for _, metric := range metrics {
|
|
if metric == nil {
|
|
continue
|
|
}
|
|
diskType := strings.TrimSpace(metric.DiskType)
|
|
if diskType == "" {
|
|
diskType = "unknown"
|
|
}
|
|
diskTypes[diskType] = struct{}{}
|
|
}
|
|
return len(diskTypes)
|
|
}
|
|
|
|
const (
|
|
defaultMaxConcurrentMoves = 5
|
|
maxConcurrentMovesLimit = 50
|
|
maxBatchMoves = 100
|
|
)
|
|
|
|
func (h *VolumeBalanceHandler) Execute(
|
|
ctx context.Context,
|
|
request *plugin_pb.ExecuteJobRequest,
|
|
sender ExecutionSender,
|
|
) error {
|
|
if request == nil || request.Job == nil {
|
|
return fmt.Errorf("execute request/job is nil")
|
|
}
|
|
if sender == nil {
|
|
return fmt.Errorf("execution sender is nil")
|
|
}
|
|
if request.Job.JobType != "" && request.Job.JobType != "volume_balance" {
|
|
return fmt.Errorf("job type %q is not handled by volume_balance worker", request.Job.JobType)
|
|
}
|
|
|
|
params, err := decodeVolumeBalanceTaskParams(request.Job)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
applyBalanceExecutionDefaults(params)
|
|
|
|
// Batch path: if BalanceTaskParams has moves, execute them concurrently
|
|
if bp := params.GetBalanceParams(); bp != nil && len(bp.Moves) > 0 {
|
|
return h.executeBatchMoves(ctx, request, params, sender)
|
|
}
|
|
|
|
// Single-move path (backward compatible)
|
|
return h.executeSingleMove(ctx, request, params, sender)
|
|
}
|
|
|
|
func (h *VolumeBalanceHandler) executeSingleMove(
|
|
ctx context.Context,
|
|
request *plugin_pb.ExecuteJobRequest,
|
|
params *worker_pb.TaskParams,
|
|
sender ExecutionSender,
|
|
) error {
|
|
if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" {
|
|
return fmt.Errorf("volume balance source node is required")
|
|
}
|
|
if len(params.Targets) == 0 || strings.TrimSpace(params.Targets[0].Node) == "" {
|
|
return fmt.Errorf("volume balance target node is required")
|
|
}
|
|
|
|
task := balancetask.NewBalanceTask(
|
|
request.Job.JobId,
|
|
params.Sources[0].Node,
|
|
params.VolumeId,
|
|
params.Collection,
|
|
h.grpcDialOption,
|
|
)
|
|
execCtx, execCancel := context.WithCancel(ctx)
|
|
defer execCancel()
|
|
task.SetProgressCallback(func(progress float64, stage string) {
|
|
message := fmt.Sprintf("balance progress %.0f%%", progress)
|
|
if strings.TrimSpace(stage) != "" {
|
|
message = stage
|
|
}
|
|
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
|
|
JobId: request.Job.JobId,
|
|
JobType: request.Job.JobType,
|
|
State: plugin_pb.JobState_JOB_STATE_RUNNING,
|
|
ProgressPercent: progress,
|
|
Stage: stage,
|
|
Message: message,
|
|
Activities: []*plugin_pb.ActivityEvent{
|
|
BuildExecutorActivity(stage, message),
|
|
},
|
|
}); err != nil {
|
|
execCancel()
|
|
}
|
|
})
|
|
|
|
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
|
|
JobId: request.Job.JobId,
|
|
JobType: request.Job.JobType,
|
|
State: plugin_pb.JobState_JOB_STATE_ASSIGNED,
|
|
ProgressPercent: 0,
|
|
Stage: "assigned",
|
|
Message: "volume balance job accepted",
|
|
Activities: []*plugin_pb.ActivityEvent{
|
|
BuildExecutorActivity("assigned", "volume balance job accepted"),
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := task.Execute(execCtx, params); err != nil {
|
|
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
|
|
JobId: request.Job.JobId,
|
|
JobType: request.Job.JobType,
|
|
State: plugin_pb.JobState_JOB_STATE_FAILED,
|
|
ProgressPercent: 100,
|
|
Stage: "failed",
|
|
Message: err.Error(),
|
|
Activities: []*plugin_pb.ActivityEvent{
|
|
BuildExecutorActivity("failed", err.Error()),
|
|
},
|
|
})
|
|
return err
|
|
}
|
|
|
|
sourceNode := params.Sources[0].Node
|
|
targetNode := params.Targets[0].Node
|
|
resultSummary := fmt.Sprintf("volume %d moved from %s to %s", params.VolumeId, sourceNode, targetNode)
|
|
|
|
return sender.SendCompleted(&plugin_pb.JobCompleted{
|
|
JobId: request.Job.JobId,
|
|
JobType: request.Job.JobType,
|
|
Success: true,
|
|
Result: &plugin_pb.JobResult{
|
|
Summary: resultSummary,
|
|
OutputValues: map[string]*plugin_pb.ConfigValue{
|
|
"volume_id": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(params.VolumeId)},
|
|
},
|
|
"source_server": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode},
|
|
},
|
|
"target_server": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: targetNode},
|
|
},
|
|
},
|
|
},
|
|
Activities: []*plugin_pb.ActivityEvent{
|
|
BuildExecutorActivity("completed", resultSummary),
|
|
},
|
|
})
|
|
}
|
|
|
|
// executeBatchMoves runs multiple volume moves concurrently within a single job.
|
|
func (h *VolumeBalanceHandler) executeBatchMoves(
|
|
ctx context.Context,
|
|
request *plugin_pb.ExecuteJobRequest,
|
|
params *worker_pb.TaskParams,
|
|
sender ExecutionSender,
|
|
) error {
|
|
bp := params.GetBalanceParams()
|
|
if len(bp.Moves) == 0 {
|
|
return fmt.Errorf("batch balance job has no moves")
|
|
}
|
|
if len(bp.Moves) > maxBatchMoves {
|
|
return fmt.Errorf("batch balance job has %d moves, exceeding limit of %d", len(bp.Moves), maxBatchMoves)
|
|
}
|
|
|
|
// Filter out nil or incomplete moves before scheduling.
|
|
validMoves := make([]*worker_pb.BalanceMoveSpec, 0, len(bp.Moves))
|
|
for _, m := range bp.Moves {
|
|
if m == nil {
|
|
continue
|
|
}
|
|
if strings.TrimSpace(m.SourceNode) == "" || strings.TrimSpace(m.TargetNode) == "" || m.VolumeId == 0 {
|
|
glog.Warningf("batch balance: skipping invalid move (vol:%d src:%q tgt:%q)", m.VolumeId, m.SourceNode, m.TargetNode)
|
|
continue
|
|
}
|
|
validMoves = append(validMoves, m)
|
|
}
|
|
if len(validMoves) == 0 {
|
|
return fmt.Errorf("batch balance job has no valid moves after validation")
|
|
}
|
|
moves := validMoves
|
|
|
|
maxConcurrent := int(bp.MaxConcurrentMoves)
|
|
if maxConcurrent <= 0 {
|
|
maxConcurrent = defaultMaxConcurrentMoves
|
|
}
|
|
// Clamp to the worker-side upper bound so a stale or malicious job
|
|
// cannot request unbounded fan-out of concurrent volume moves.
|
|
if maxConcurrent > maxConcurrentMovesLimit {
|
|
maxConcurrent = maxConcurrentMovesLimit
|
|
}
|
|
|
|
totalMoves := len(moves)
|
|
glog.Infof("batch volume balance: %d moves, max concurrent %d", totalMoves, maxConcurrent)
|
|
|
|
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
|
|
JobId: request.Job.JobId,
|
|
JobType: request.Job.JobType,
|
|
State: plugin_pb.JobState_JOB_STATE_ASSIGNED,
|
|
ProgressPercent: 0,
|
|
Stage: "assigned",
|
|
Message: fmt.Sprintf("batch volume balance accepted: %d moves", totalMoves),
|
|
Activities: []*plugin_pb.ActivityEvent{
|
|
BuildExecutorActivity("assigned", fmt.Sprintf("batch volume balance: %d moves, concurrency %d", totalMoves, maxConcurrent)),
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Derive a cancellable context so we can abort remaining moves if the
|
|
// progress stream breaks (client disconnect, context cancelled).
|
|
batchCtx, batchCancel := context.WithCancel(ctx)
|
|
defer batchCancel()
|
|
|
|
// Per-move progress tracking. The mutex serializes both the progress
|
|
// bookkeeping and the sender.SendProgress call, since the underlying
|
|
// gRPC stream is not safe for concurrent writes.
|
|
var progressMu sync.Mutex
|
|
moveProgress := make([]float64, totalMoves)
|
|
var sendErr error // first progress send error
|
|
|
|
reportAggregate := func(moveIndex int, progress float64, stage string) {
|
|
progressMu.Lock()
|
|
defer progressMu.Unlock()
|
|
|
|
if sendErr != nil {
|
|
return // stream already broken, skip further sends
|
|
}
|
|
|
|
moveProgress[moveIndex] = progress
|
|
total := 0.0
|
|
for _, p := range moveProgress {
|
|
total += p
|
|
}
|
|
|
|
aggregate := total / float64(totalMoves)
|
|
move := moves[moveIndex]
|
|
message := fmt.Sprintf("[Move %d/%d vol:%d] %s", moveIndex+1, totalMoves, move.VolumeId, stage)
|
|
|
|
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
|
|
JobId: request.Job.JobId,
|
|
JobType: request.Job.JobType,
|
|
State: plugin_pb.JobState_JOB_STATE_RUNNING,
|
|
ProgressPercent: aggregate,
|
|
Stage: fmt.Sprintf("move %d/%d", moveIndex+1, totalMoves),
|
|
Message: message,
|
|
Activities: []*plugin_pb.ActivityEvent{
|
|
BuildExecutorActivity(fmt.Sprintf("move-%d", moveIndex+1), message),
|
|
},
|
|
}); err != nil {
|
|
sendErr = err
|
|
batchCancel() // cancel in-flight and pending moves
|
|
}
|
|
}
|
|
|
|
type moveResult struct {
|
|
index int
|
|
volumeID uint32
|
|
source string
|
|
target string
|
|
err error
|
|
}
|
|
|
|
sem := make(chan struct{}, maxConcurrent)
|
|
results := make(chan moveResult, totalMoves)
|
|
|
|
for i, move := range moves {
|
|
sem <- struct{}{} // acquire slot
|
|
go func(idx int, m *worker_pb.BalanceMoveSpec) {
|
|
defer func() { <-sem }() // release slot
|
|
|
|
task := balancetask.NewBalanceTask(
|
|
fmt.Sprintf("%s-move-%d", request.Job.JobId, idx),
|
|
m.SourceNode,
|
|
m.VolumeId,
|
|
m.Collection,
|
|
h.grpcDialOption,
|
|
)
|
|
task.SetProgressCallback(func(progress float64, stage string) {
|
|
reportAggregate(idx, progress, stage)
|
|
})
|
|
|
|
moveParams := buildMoveTaskParams(m, bp)
|
|
err := task.Execute(batchCtx, moveParams)
|
|
results <- moveResult{
|
|
index: idx,
|
|
volumeID: m.VolumeId,
|
|
source: m.SourceNode,
|
|
target: m.TargetNode,
|
|
err: err,
|
|
}
|
|
}(i, move)
|
|
}
|
|
|
|
// Collect all results
|
|
var succeeded, failed int
|
|
var errMessages []string
|
|
for range moves {
|
|
r := <-results
|
|
if r.err != nil {
|
|
failed++
|
|
errMessages = append(errMessages, fmt.Sprintf("volume %d (%s→%s): %v", r.volumeID, r.source, r.target, r.err))
|
|
glog.Warningf("batch balance move %d failed: volume %d %s→%s: %v", r.index, r.volumeID, r.source, r.target, r.err)
|
|
} else {
|
|
succeeded++
|
|
}
|
|
}
|
|
|
|
summary := fmt.Sprintf("%d/%d volumes moved successfully", succeeded, totalMoves)
|
|
if failed > 0 {
|
|
summary += fmt.Sprintf("; %d failed", failed)
|
|
}
|
|
|
|
// Mark the job as successful if at least one move succeeded. This avoids
|
|
// the standard retry path re-running already-completed moves. The failed
|
|
// move details are available in ErrorMessage and result metadata so a
|
|
// retry mechanism can operate only on the failed items.
|
|
success := succeeded > 0 || failed == 0
|
|
var errMsg string
|
|
if failed > 0 {
|
|
errMsg = strings.Join(errMessages, "; ")
|
|
}
|
|
|
|
return sender.SendCompleted(&plugin_pb.JobCompleted{
|
|
JobId: request.Job.JobId,
|
|
JobType: request.Job.JobType,
|
|
Success: success,
|
|
ErrorMessage: errMsg,
|
|
Result: &plugin_pb.JobResult{
|
|
Summary: summary,
|
|
OutputValues: map[string]*plugin_pb.ConfigValue{
|
|
"total_moves": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalMoves)},
|
|
},
|
|
"succeeded": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(succeeded)},
|
|
},
|
|
"failed": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(failed)},
|
|
},
|
|
},
|
|
},
|
|
Activities: []*plugin_pb.ActivityEvent{
|
|
BuildExecutorActivity("completed", summary),
|
|
},
|
|
})
|
|
}
|
|
|
|
// buildMoveTaskParams constructs a TaskParams for a single move within a batch.
|
|
func buildMoveTaskParams(move *worker_pb.BalanceMoveSpec, outerParams *worker_pb.BalanceTaskParams) *worker_pb.TaskParams {
|
|
timeoutSeconds := defaultBalanceTimeoutSeconds
|
|
forceMove := false
|
|
if outerParams != nil {
|
|
if outerParams.TimeoutSeconds > 0 {
|
|
timeoutSeconds = outerParams.TimeoutSeconds
|
|
}
|
|
forceMove = outerParams.ForceMove
|
|
}
|
|
return &worker_pb.TaskParams{
|
|
VolumeId: move.VolumeId,
|
|
Collection: move.Collection,
|
|
VolumeSize: move.VolumeSize,
|
|
Sources: []*worker_pb.TaskSource{
|
|
{Node: move.SourceNode, VolumeId: move.VolumeId},
|
|
},
|
|
Targets: []*worker_pb.TaskTarget{
|
|
{Node: move.TargetNode, VolumeId: move.VolumeId},
|
|
},
|
|
TaskParams: &worker_pb.TaskParams_BalanceParams{
|
|
BalanceParams: &worker_pb.BalanceTaskParams{
|
|
ForceMove: forceMove,
|
|
TimeoutSeconds: timeoutSeconds,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (h *VolumeBalanceHandler) collectVolumeMetrics(
|
|
ctx context.Context,
|
|
masterAddresses []string,
|
|
collectionFilter string,
|
|
) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) {
|
|
// Reuse the same master topology fetch/build flow used by the vacuum handler.
|
|
helper := &VacuumHandler{grpcDialOption: h.grpcDialOption}
|
|
return helper.collectVolumeMetrics(ctx, masterAddresses, collectionFilter)
|
|
}
|
|
|
|
func deriveBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *volumeBalanceWorkerConfig {
|
|
taskConfig := balancetask.NewDefaultConfig()
|
|
|
|
imbalanceThreshold := readDoubleConfig(values, "imbalance_threshold", taskConfig.ImbalanceThreshold)
|
|
if imbalanceThreshold < 0 {
|
|
imbalanceThreshold = 0
|
|
}
|
|
if imbalanceThreshold > 1 {
|
|
imbalanceThreshold = 1
|
|
}
|
|
taskConfig.ImbalanceThreshold = imbalanceThreshold
|
|
|
|
minServerCount := int(readInt64Config(values, "min_server_count", int64(taskConfig.MinServerCount)))
|
|
if minServerCount < 2 {
|
|
minServerCount = 2
|
|
}
|
|
taskConfig.MinServerCount = minServerCount
|
|
|
|
minIntervalSeconds := int(readInt64Config(values, "min_interval_seconds", 0))
|
|
if minIntervalSeconds < 0 {
|
|
minIntervalSeconds = 0
|
|
}
|
|
|
|
maxConcurrentMoves64 := readInt64Config(values, "max_concurrent_moves", int64(defaultMaxConcurrentMoves))
|
|
if maxConcurrentMoves64 < 1 {
|
|
maxConcurrentMoves64 = 1
|
|
}
|
|
if maxConcurrentMoves64 > 50 {
|
|
maxConcurrentMoves64 = 50
|
|
}
|
|
maxConcurrentMoves := int(maxConcurrentMoves64)
|
|
|
|
batchSize64 := readInt64Config(values, "batch_size", 20)
|
|
if batchSize64 < 1 {
|
|
batchSize64 = 1
|
|
}
|
|
if batchSize64 > 100 {
|
|
batchSize64 = 100
|
|
}
|
|
batchSize := int(batchSize64)
|
|
|
|
return &volumeBalanceWorkerConfig{
|
|
TaskConfig: taskConfig,
|
|
MinIntervalSeconds: minIntervalSeconds,
|
|
MaxConcurrentMoves: maxConcurrentMoves,
|
|
BatchSize: batchSize,
|
|
}
|
|
}
|
|
|
|
func buildVolumeBalanceProposal(
|
|
result *workertypes.TaskDetectionResult,
|
|
) (*plugin_pb.JobProposal, error) {
|
|
if result == nil {
|
|
return nil, fmt.Errorf("task detection result is nil")
|
|
}
|
|
if result.TypedParams == nil {
|
|
return nil, fmt.Errorf("missing typed params for volume %d", result.VolumeID)
|
|
}
|
|
|
|
params := proto.Clone(result.TypedParams).(*worker_pb.TaskParams)
|
|
applyBalanceExecutionDefaults(params)
|
|
|
|
paramsPayload, err := proto.Marshal(params)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshal task params: %w", err)
|
|
}
|
|
|
|
proposalID := strings.TrimSpace(result.TaskID)
|
|
if proposalID == "" {
|
|
proposalID = fmt.Sprintf("volume-balance-%d-%d", result.VolumeID, time.Now().UnixNano())
|
|
}
|
|
|
|
dedupeKey := fmt.Sprintf("volume_balance:%d", result.VolumeID)
|
|
if result.Collection != "" {
|
|
dedupeKey += ":" + result.Collection
|
|
}
|
|
|
|
sourceNode := ""
|
|
if len(params.Sources) > 0 {
|
|
sourceNode = strings.TrimSpace(params.Sources[0].Node)
|
|
}
|
|
targetNode := ""
|
|
if len(params.Targets) > 0 {
|
|
targetNode = strings.TrimSpace(params.Targets[0].Node)
|
|
}
|
|
|
|
summary := fmt.Sprintf("Balance volume %d", result.VolumeID)
|
|
if sourceNode != "" && targetNode != "" {
|
|
summary = fmt.Sprintf("Move volume %d from %s to %s", result.VolumeID, sourceNode, targetNode)
|
|
}
|
|
|
|
return &plugin_pb.JobProposal{
|
|
ProposalId: proposalID,
|
|
DedupeKey: dedupeKey,
|
|
JobType: "volume_balance",
|
|
Priority: mapTaskPriority(result.Priority),
|
|
Summary: summary,
|
|
Detail: strings.TrimSpace(result.Reason),
|
|
Parameters: map[string]*plugin_pb.ConfigValue{
|
|
"task_params_pb": {
|
|
Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: paramsPayload},
|
|
},
|
|
"volume_id": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(result.VolumeID)},
|
|
},
|
|
"source_server": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode},
|
|
},
|
|
"target_server": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: targetNode},
|
|
},
|
|
"collection": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection},
|
|
},
|
|
},
|
|
Labels: map[string]string{
|
|
"task_type": "balance",
|
|
"volume_id": fmt.Sprintf("%d", result.VolumeID),
|
|
"collection": result.Collection,
|
|
"source_node": sourceNode,
|
|
"target_node": targetNode,
|
|
"source_server": sourceNode,
|
|
"target_server": targetNode,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// buildBatchVolumeBalanceProposals groups detection results into batch proposals.
|
|
// Each batch proposal encodes multiple moves in BalanceTaskParams.Moves.
|
|
func buildBatchVolumeBalanceProposals(
|
|
results []*workertypes.TaskDetectionResult,
|
|
batchSize int,
|
|
maxConcurrentMoves int,
|
|
) []*plugin_pb.JobProposal {
|
|
if batchSize <= 0 {
|
|
batchSize = 1
|
|
}
|
|
if maxConcurrentMoves <= 0 {
|
|
maxConcurrentMoves = defaultMaxConcurrentMoves
|
|
}
|
|
|
|
var proposals []*plugin_pb.JobProposal
|
|
|
|
for batchStart := 0; batchStart < len(results); batchStart += batchSize {
|
|
batchEnd := batchStart + batchSize
|
|
if batchEnd > len(results) {
|
|
batchEnd = len(results)
|
|
}
|
|
batch := results[batchStart:batchEnd]
|
|
|
|
// If only one result in this batch, emit a single-move proposal
|
|
if len(batch) == 1 {
|
|
proposal, err := buildVolumeBalanceProposal(batch[0])
|
|
if err != nil {
|
|
glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", err)
|
|
continue
|
|
}
|
|
proposals = append(proposals, proposal)
|
|
continue
|
|
}
|
|
|
|
// Build batch proposal with BalanceMoveSpec entries
|
|
moves := make([]*worker_pb.BalanceMoveSpec, 0, len(batch))
|
|
var volumeIDs []string
|
|
var dedupeKeys []string
|
|
highestPriority := workertypes.TaskPriorityLow
|
|
|
|
for _, result := range batch {
|
|
if result == nil || result.TypedParams == nil {
|
|
continue
|
|
}
|
|
sourceNode := ""
|
|
targetNode := ""
|
|
if len(result.TypedParams.Sources) > 0 {
|
|
sourceNode = result.TypedParams.Sources[0].Node
|
|
}
|
|
if len(result.TypedParams.Targets) > 0 {
|
|
targetNode = result.TypedParams.Targets[0].Node
|
|
}
|
|
// Skip moves with missing required fields that would fail at execution time.
|
|
if result.VolumeID == 0 || sourceNode == "" || targetNode == "" {
|
|
glog.Warningf("Plugin worker skip invalid batch move: volume=%d source=%q target=%q", result.VolumeID, sourceNode, targetNode)
|
|
continue
|
|
}
|
|
moves = append(moves, &worker_pb.BalanceMoveSpec{
|
|
VolumeId: uint32(result.VolumeID),
|
|
SourceNode: sourceNode,
|
|
TargetNode: targetNode,
|
|
Collection: result.Collection,
|
|
VolumeSize: result.TypedParams.VolumeSize,
|
|
})
|
|
volumeIDs = append(volumeIDs, fmt.Sprintf("%d", result.VolumeID))
|
|
|
|
dedupeKey := fmt.Sprintf("volume_balance:%d", result.VolumeID)
|
|
if result.Collection != "" {
|
|
dedupeKey += ":" + result.Collection
|
|
}
|
|
dedupeKeys = append(dedupeKeys, dedupeKey)
|
|
|
|
if result.Priority > highestPriority {
|
|
highestPriority = result.Priority
|
|
}
|
|
}
|
|
|
|
if len(moves) == 0 {
|
|
continue
|
|
}
|
|
|
|
// After filtering, if only one valid move remains, emit a single-move
|
|
// proposal instead of a batch to preserve the simpler execution path.
|
|
if len(moves) == 1 {
|
|
// Find the matching result for the single valid move
|
|
for _, result := range batch {
|
|
if result != nil && uint32(result.VolumeID) == moves[0].VolumeId {
|
|
proposal, err := buildVolumeBalanceProposal(result)
|
|
if err != nil {
|
|
glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", err)
|
|
} else {
|
|
proposals = append(proposals, proposal)
|
|
}
|
|
break
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Serialize batch params
|
|
taskParams := &worker_pb.TaskParams{
|
|
TaskParams: &worker_pb.TaskParams_BalanceParams{
|
|
BalanceParams: &worker_pb.BalanceTaskParams{
|
|
TimeoutSeconds: defaultBalanceTimeoutSeconds,
|
|
MaxConcurrentMoves: int32(maxConcurrentMoves),
|
|
Moves: moves,
|
|
},
|
|
},
|
|
}
|
|
payload, err := proto.Marshal(taskParams)
|
|
if err != nil {
|
|
glog.Warningf("Plugin worker failed to marshal batch balance proposal: %v", err)
|
|
continue
|
|
}
|
|
|
|
proposalID := fmt.Sprintf("volume-balance-batch-%d-%d", batchStart, time.Now().UnixNano())
|
|
summary := fmt.Sprintf("Batch balance %d volumes (%s)", len(moves), strings.Join(volumeIDs, ","))
|
|
if len(summary) > maxProposalStringLength {
|
|
summary = fmt.Sprintf("Batch balance %d volumes", len(moves))
|
|
}
|
|
|
|
// Use composite dedupe key for the batch. When the full key exceeds
|
|
// the length limit, fall back to a deterministic hash of the sorted
|
|
// keys so the same batch always produces the same dedupe key.
|
|
sort.Strings(dedupeKeys)
|
|
compositeDedupeKey := fmt.Sprintf("volume_balance_batch:%s", strings.Join(dedupeKeys, "+"))
|
|
if len(compositeDedupeKey) > maxProposalStringLength {
|
|
h := sha256.Sum256([]byte(strings.Join(dedupeKeys, "+")))
|
|
compositeDedupeKey = fmt.Sprintf("volume_balance_batch:%d-%s", batchStart, hex.EncodeToString(h[:12]))
|
|
}
|
|
|
|
proposals = append(proposals, &plugin_pb.JobProposal{
|
|
ProposalId: proposalID,
|
|
DedupeKey: compositeDedupeKey,
|
|
JobType: "volume_balance",
|
|
Priority: mapTaskPriority(highestPriority),
|
|
Summary: summary,
|
|
Detail: fmt.Sprintf("Batch of %d volume moves with concurrency %d", len(moves), maxConcurrentMoves),
|
|
Parameters: map[string]*plugin_pb.ConfigValue{
|
|
"task_params_pb": {
|
|
Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload},
|
|
},
|
|
"batch_size": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(moves))},
|
|
},
|
|
},
|
|
Labels: map[string]string{
|
|
"task_type": "balance",
|
|
"batch": "true",
|
|
"batch_size": fmt.Sprintf("%d", len(moves)),
|
|
},
|
|
})
|
|
}
|
|
|
|
return proposals
|
|
}
|
|
|
|
func decodeVolumeBalanceTaskParams(job *plugin_pb.JobSpec) (*worker_pb.TaskParams, error) {
|
|
if job == nil {
|
|
return nil, fmt.Errorf("job spec is nil")
|
|
}
|
|
|
|
if payload := readBytesConfig(job.Parameters, "task_params_pb"); len(payload) > 0 {
|
|
params := &worker_pb.TaskParams{}
|
|
if err := proto.Unmarshal(payload, params); err != nil {
|
|
return nil, fmt.Errorf("unmarshal task_params_pb: %w", err)
|
|
}
|
|
if params.TaskId == "" {
|
|
params.TaskId = job.JobId
|
|
}
|
|
return params, nil
|
|
}
|
|
|
|
volumeID := readInt64Config(job.Parameters, "volume_id", 0)
|
|
sourceNode := strings.TrimSpace(readStringConfig(job.Parameters, "source_server", ""))
|
|
if sourceNode == "" {
|
|
sourceNode = strings.TrimSpace(readStringConfig(job.Parameters, "server", ""))
|
|
}
|
|
targetNode := strings.TrimSpace(readStringConfig(job.Parameters, "target_server", ""))
|
|
if targetNode == "" {
|
|
targetNode = strings.TrimSpace(readStringConfig(job.Parameters, "target", ""))
|
|
}
|
|
collection := readStringConfig(job.Parameters, "collection", "")
|
|
timeoutSeconds := int32(readInt64Config(job.Parameters, "timeout_seconds", int64(defaultBalanceTimeoutSeconds)))
|
|
if timeoutSeconds <= 0 {
|
|
timeoutSeconds = defaultBalanceTimeoutSeconds
|
|
}
|
|
forceMove := readBoolConfig(job.Parameters, "force_move", false)
|
|
|
|
if volumeID <= 0 {
|
|
return nil, fmt.Errorf("missing volume_id in job parameters")
|
|
}
|
|
if sourceNode == "" {
|
|
return nil, fmt.Errorf("missing source_server in job parameters")
|
|
}
|
|
if targetNode == "" {
|
|
return nil, fmt.Errorf("missing target_server in job parameters")
|
|
}
|
|
|
|
return &worker_pb.TaskParams{
|
|
TaskId: job.JobId,
|
|
VolumeId: uint32(volumeID),
|
|
Collection: collection,
|
|
Sources: []*worker_pb.TaskSource{
|
|
{
|
|
Node: sourceNode,
|
|
VolumeId: uint32(volumeID),
|
|
},
|
|
},
|
|
Targets: []*worker_pb.TaskTarget{
|
|
{
|
|
Node: targetNode,
|
|
VolumeId: uint32(volumeID),
|
|
},
|
|
},
|
|
TaskParams: &worker_pb.TaskParams_BalanceParams{
|
|
BalanceParams: &worker_pb.BalanceTaskParams{
|
|
ForceMove: forceMove,
|
|
TimeoutSeconds: timeoutSeconds,
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func applyBalanceExecutionDefaults(params *worker_pb.TaskParams) {
|
|
if params == nil {
|
|
return
|
|
}
|
|
|
|
balanceParams := params.GetBalanceParams()
|
|
if balanceParams == nil {
|
|
params.TaskParams = &worker_pb.TaskParams_BalanceParams{
|
|
BalanceParams: &worker_pb.BalanceTaskParams{
|
|
ForceMove: false,
|
|
TimeoutSeconds: defaultBalanceTimeoutSeconds,
|
|
},
|
|
}
|
|
return
|
|
}
|
|
|
|
if balanceParams.TimeoutSeconds <= 0 {
|
|
balanceParams.TimeoutSeconds = defaultBalanceTimeoutSeconds
|
|
}
|
|
}
|
|
|
|
func readBoolConfig(values map[string]*plugin_pb.ConfigValue, field string, fallback bool) bool {
|
|
if values == nil {
|
|
return fallback
|
|
}
|
|
value := values[field]
|
|
if value == nil {
|
|
return fallback
|
|
}
|
|
switch kind := value.Kind.(type) {
|
|
case *plugin_pb.ConfigValue_BoolValue:
|
|
return kind.BoolValue
|
|
case *plugin_pb.ConfigValue_Int64Value:
|
|
return kind.Int64Value != 0
|
|
case *plugin_pb.ConfigValue_DoubleValue:
|
|
return kind.DoubleValue != 0
|
|
case *plugin_pb.ConfigValue_StringValue:
|
|
text := strings.TrimSpace(strings.ToLower(kind.StringValue))
|
|
switch text {
|
|
case "1", "true", "yes", "on":
|
|
return true
|
|
case "0", "false", "no", "off":
|
|
return false
|
|
}
|
|
}
|
|
return fallback
|
|
}
|