feat(plugin): EC shard balance handler for plugin worker (#8629)
* feat(ec_balance): add TaskTypeECBalance constant and protobuf definitions Add the ec_balance task type constant to both topology and worker type systems. Define EcBalanceTaskParams, EcShardMoveSpec, and EcBalanceTaskConfig protobuf messages for EC shard balance operations. * feat(ec_balance): add configuration for EC shard balance task Config includes imbalance threshold, min server count, collection filter, disk type, and preferred tags for tag-aware placement. * feat(ec_balance): add multi-phase EC shard balance detection algorithm Implements four detection phases adapted from the ec.balance shell command: 1. Duplicate shard detection and removal proposals 2. Cross-rack shard distribution balancing 3. Within-rack node-level shard balancing 4. Global shard count equalization across nodes Detection is side-effect-free: it builds an EC topology view from ActiveTopology and generates move proposals without executing them. * feat(ec_balance): add EC shard move task execution Implements the shard move sequence using the same VolumeEcShardsCopy, VolumeEcShardsMount, VolumeEcShardsUnmount, and VolumeEcShardsDelete RPCs as the shell ec.balance command. Supports both regular shard moves and dedup-phase deletions (unmount+delete without copy). * feat(ec_balance): add task registration and scheduling Register EC balance task definition with auto-config update support. Scheduling respects max concurrent limits and worker capabilities. * feat(ec_balance): add plugin handler for EC shard balance Implements the full plugin handler with detection, execution, admin and worker config forms, proposal building, and decision trace reporting. Supports collection/DC/disk type filtering, preferred tag placement, and configurable detection intervals. Auto-registered via init() with the handler registry. * test(ec_balance): add tests for detection algorithm and plugin handler Detection tests cover: duplicate shard detection, cross-rack imbalance, within-rack imbalance, global rebalancing, topology building, collection filtering, and edge cases. Handler tests cover: config derivation with clamping, proposal building, protobuf encode/decode round-trip, fallback parameter decoding, capability, and config policy round-trip. * fix(ec_balance): address PR review feedback and fix CI test failure - Update TestWorkerDefaultJobTypes to expect 6 handlers (was 5) - Extract threshold constants (ecBalanceMinImbalanceThreshold, etc.) to eliminate magic numbers in Descriptor and config derivation - Remove duplicate ShardIdsToUint32 helper (use erasure_coding package) - Add bounds checks for int64→int/uint32 conversions to fix CodeQL integer conversion warnings * fix(ec_balance): address code review findings storage_impact.go: - Add TaskTypeECBalance case returning shard-level reservation (ShardSlots: -1/+1) instead of falling through to default which incorrectly reserves a full volume slot on target. detection.go: - Use dc:rack composite key to avoid cross-DC rack name collisions. Only create rack entries after confirming node has matching disks. - Add exceedsImbalanceThreshold check to cross-rack, within-rack, and global phases so trivial skews below the configured threshold are ignored. Dedup phase always runs since duplicates are errors. - Reserve destination capacity after each planned move (decrement destNode.freeSlots, update rackShardCount/nodeShardCount) to prevent overbooking the same destination. - Skip nodes with freeSlots <= 0 when selecting minNode in global balance to avoid proposing moves to full nodes. - Include loop index and source/target node IDs in TaskID to guarantee uniqueness across moves with the same volumeID/shardID. ec_balance_handler.go: - Fail fast with error when shard_id is absent in fallback parameter decoding instead of silently defaulting to shard 0. ec_balance_task.go: - Delegate GetProgress() to BaseTask.GetProgress() so progress updates from ReportProgressWithStage are visible to callers. - Add fail-fast guard rejecting multiple sources/targets until batch execution is implemented. Findings verified but not changed (matches existing codebase pattern in vacuum/balance/erasure_coding handlers): - register.go globalTaskDef.Config race: same unsynchronized pattern in all 4 task packages. - CreateTask using generated ID: same fmt.Sprintf pattern in all 4 task packages. * fix(ec_balance): harden parameter decoding, progress tracking, and validation ec_balance_handler.go (decodeECBalanceTaskParams): - Validate execution-critical fields (Sources[0].Node, ShardIds, Targets[0].Node, ShardIds) after protobuf deserialization. - Require source_disk_id and target_disk_id in legacy fallback path so Targets[0].DiskId is populated for VolumeEcShardsCopyRequest. - All error messages reference decodeECBalanceTaskParams and the specific missing field (TaskParams, shard_id, Targets[0].DiskId, EcBalanceTaskParams) for debuggability. ec_balance_task.go: - Track progress in ECBalanceTask.progress field, updated via reportProgress() helper called before ReportProgressWithStage(), so GetProgress() returns real stage progress instead of stale 0. - Validate: require exactly 1 source and 1 target (mirrors Execute guard), require ShardIds on both, with error messages referencing ECBalanceTask.Validate and the specific field. * fix(ec_balance): fix dedup execution path, stale topology, collection filter, timeout, and dedupeKey detection.go: - Dedup moves now set target=source so isDedupPhase() triggers the unmount+delete-only execution path instead of attempting a copy. - Apply moves to in-memory topology between phases via applyMovesToTopology() so subsequent phases see updated shard placement and don't conflict with already-planned moves. - detectGlobalImbalance now accepts allowedVids and filters both shard counting and shard selection to respect CollectionFilter. ec_balance_task.go: - Apply EcBalanceTaskParams.TimeoutSeconds to the context via context.WithTimeout so all RPC operations respect the configured timeout instead of hanging indefinitely. ec_balance_handler.go: - Include source node ID in dedupeKey so dedup deletions from different source nodes for the same shard aren't collapsed. - Clamp minServerCountRaw and minIntervalRaw lower bounds on int64 before narrowing to int, preventing undefined overflow on 32-bit. * fix(ec_balance): log warning before cancelling on progress send failure Log the error, job ID, job type, progress percentage, and stage before calling execCancel() in the progress callback so failed progress sends are diagnosable instead of silently cancelling.
This commit is contained in:
692
weed/plugin/worker/ec_balance_handler.go
Normal file
692
weed/plugin/worker/ec_balance_handler.go
Normal file
@@ -0,0 +1,692 @@
|
||||
package pluginworker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
ecbalancetask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/ec_balance"
|
||||
workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
const (
|
||||
ecBalanceMinImbalanceThreshold = 0.05
|
||||
ecBalanceMaxImbalanceThreshold = 0.5
|
||||
ecBalanceMinServerCount = 2
|
||||
)
|
||||
|
||||
func init() {
|
||||
RegisterHandler(HandlerFactory{
|
||||
JobType: "ec_balance",
|
||||
Category: CategoryDefault,
|
||||
Aliases: []string{"ec-balance", "ec.balance", "ec_shard_balance"},
|
||||
Build: func(opts HandlerBuildOptions) (JobHandler, error) {
|
||||
return NewECBalanceHandler(opts.GrpcDialOption), nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
type ecBalanceWorkerConfig struct {
|
||||
TaskConfig *ecbalancetask.Config
|
||||
MinIntervalSeconds int
|
||||
}
|
||||
|
||||
// ECBalanceHandler is the plugin job handler for EC shard balancing.
|
||||
type ECBalanceHandler struct {
|
||||
grpcDialOption grpc.DialOption
|
||||
}
|
||||
|
||||
func NewECBalanceHandler(grpcDialOption grpc.DialOption) *ECBalanceHandler {
|
||||
return &ECBalanceHandler{grpcDialOption: grpcDialOption}
|
||||
}
|
||||
|
||||
func (h *ECBalanceHandler) Capability() *plugin_pb.JobTypeCapability {
|
||||
return &plugin_pb.JobTypeCapability{
|
||||
JobType: "ec_balance",
|
||||
CanDetect: true,
|
||||
CanExecute: true,
|
||||
MaxDetectionConcurrency: 1,
|
||||
MaxExecutionConcurrency: 3,
|
||||
DisplayName: "EC Shard Balance",
|
||||
Description: "Balance EC shard distribution across racks and nodes",
|
||||
Weight: 60,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *ECBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
|
||||
return &plugin_pb.JobTypeDescriptor{
|
||||
JobType: "ec_balance",
|
||||
DisplayName: "EC Shard Balance",
|
||||
Description: "Detect and execute EC shard rebalancing across the cluster",
|
||||
Icon: "fas fa-balance-scale-left",
|
||||
DescriptorVersion: 1,
|
||||
AdminConfigForm: &plugin_pb.ConfigForm{
|
||||
FormId: "ec-balance-admin",
|
||||
Title: "EC Shard Balance Admin Config",
|
||||
Description: "Admin-side controls for EC shard balance detection scope.",
|
||||
Sections: []*plugin_pb.ConfigSection{
|
||||
{
|
||||
SectionId: "scope",
|
||||
Title: "Scope",
|
||||
Description: "Optional filters applied before EC shard balance detection.",
|
||||
Fields: []*plugin_pb.ConfigField{
|
||||
{
|
||||
Name: "collection_filter",
|
||||
Label: "Collection Filter",
|
||||
Description: "Only balance EC shards in matching collections (wildcard supported).",
|
||||
Placeholder: "all collections",
|
||||
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
|
||||
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
|
||||
},
|
||||
{
|
||||
Name: "data_center_filter",
|
||||
Label: "Data Center Filter",
|
||||
Description: "Only balance within matching data centers (wildcard supported).",
|
||||
Placeholder: "all data centers",
|
||||
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
|
||||
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
|
||||
},
|
||||
{
|
||||
Name: "disk_type",
|
||||
Label: "Disk Type",
|
||||
Description: "Only balance EC shards on this disk type (hdd, ssd, or empty for all).",
|
||||
Placeholder: "all disk types",
|
||||
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
|
||||
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
DefaultValues: map[string]*plugin_pb.ConfigValue{
|
||||
"collection_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
|
||||
"data_center_filter": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
|
||||
"disk_type": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
|
||||
},
|
||||
},
|
||||
WorkerConfigForm: &plugin_pb.ConfigForm{
|
||||
FormId: "ec-balance-worker",
|
||||
Title: "EC Shard Balance Worker Config",
|
||||
Description: "Worker-side detection thresholds and execution controls.",
|
||||
Sections: []*plugin_pb.ConfigSection{
|
||||
{
|
||||
SectionId: "thresholds",
|
||||
Title: "Detection Thresholds",
|
||||
Description: "Controls for when EC shard balance jobs should be proposed.",
|
||||
Fields: []*plugin_pb.ConfigField{
|
||||
{
|
||||
Name: "imbalance_threshold",
|
||||
Label: "Imbalance Threshold",
|
||||
Description: "Minimum shard count imbalance ratio to trigger balancing (0.0-1.0).",
|
||||
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_DOUBLE,
|
||||
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
|
||||
Required: true,
|
||||
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: ecBalanceMinImbalanceThreshold}},
|
||||
MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: ecBalanceMaxImbalanceThreshold}},
|
||||
},
|
||||
{
|
||||
Name: "min_server_count",
|
||||
Label: "Minimum Server Count",
|
||||
Description: "Minimum servers required for EC shard balancing.",
|
||||
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
|
||||
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
|
||||
Required: true,
|
||||
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: ecBalanceMinServerCount}},
|
||||
},
|
||||
{
|
||||
Name: "min_interval_seconds",
|
||||
Label: "Minimum Detection Interval (s)",
|
||||
Description: "Skip detection if the last successful run is more recent than this interval.",
|
||||
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
|
||||
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
|
||||
Required: true,
|
||||
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
|
||||
},
|
||||
{
|
||||
Name: "preferred_tags",
|
||||
Label: "Preferred Tags",
|
||||
Description: "Comma-separated disk tags to prioritize for shard placement, ordered by preference.",
|
||||
Placeholder: "fast,ssd",
|
||||
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
|
||||
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
DefaultValues: map[string]*plugin_pb.ConfigValue{
|
||||
"imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}},
|
||||
"min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3}},
|
||||
"min_interval_seconds": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 60 * 60}},
|
||||
"preferred_tags": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
|
||||
},
|
||||
},
|
||||
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
|
||||
Enabled: true,
|
||||
DetectionIntervalSeconds: 60 * 30,
|
||||
DetectionTimeoutSeconds: 300,
|
||||
MaxJobsPerDetection: 500,
|
||||
GlobalExecutionConcurrency: 16,
|
||||
PerWorkerExecutionConcurrency: 4,
|
||||
RetryLimit: 1,
|
||||
RetryBackoffSeconds: 30,
|
||||
JobTypeMaxRuntimeSeconds: 1800,
|
||||
},
|
||||
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
|
||||
"imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.2}},
|
||||
"min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3}},
|
||||
"min_interval_seconds": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 60 * 60}},
|
||||
"preferred_tags": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (h *ECBalanceHandler) Detect(
|
||||
ctx context.Context,
|
||||
request *plugin_pb.RunDetectionRequest,
|
||||
sender DetectionSender,
|
||||
) error {
|
||||
if request == nil {
|
||||
return fmt.Errorf("run detection request is nil")
|
||||
}
|
||||
if sender == nil {
|
||||
return fmt.Errorf("detection sender is nil")
|
||||
}
|
||||
if request.JobType != "" && request.JobType != "ec_balance" {
|
||||
return fmt.Errorf("job type %q is not handled by ec_balance worker", request.JobType)
|
||||
}
|
||||
|
||||
workerConfig := deriveECBalanceWorkerConfig(request.GetWorkerConfigValues())
|
||||
if ShouldSkipDetectionByInterval(request.GetLastSuccessfulRun(), workerConfig.MinIntervalSeconds) {
|
||||
minInterval := time.Duration(workerConfig.MinIntervalSeconds) * time.Second
|
||||
_ = sender.SendActivity(BuildDetectorActivity(
|
||||
"skipped_by_interval",
|
||||
fmt.Sprintf("EC BALANCE: Detection skipped due to min interval (%s)", minInterval),
|
||||
map[string]*plugin_pb.ConfigValue{
|
||||
"min_interval_seconds": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(workerConfig.MinIntervalSeconds)},
|
||||
},
|
||||
},
|
||||
))
|
||||
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
|
||||
JobType: "ec_balance",
|
||||
Proposals: []*plugin_pb.JobProposal{},
|
||||
HasMore: false,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
return sender.SendComplete(&plugin_pb.DetectionComplete{
|
||||
JobType: "ec_balance",
|
||||
Success: true,
|
||||
TotalProposals: 0,
|
||||
})
|
||||
}
|
||||
|
||||
// Apply admin-side scope filters
|
||||
collectionFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "collection_filter", ""))
|
||||
if collectionFilter != "" {
|
||||
workerConfig.TaskConfig.CollectionFilter = collectionFilter
|
||||
}
|
||||
dcFilter := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "data_center_filter", ""))
|
||||
if dcFilter != "" {
|
||||
workerConfig.TaskConfig.DataCenterFilter = dcFilter
|
||||
}
|
||||
diskType := strings.TrimSpace(readStringConfig(request.GetAdminConfigValues(), "disk_type", ""))
|
||||
if diskType != "" {
|
||||
workerConfig.TaskConfig.DiskType = diskType
|
||||
}
|
||||
|
||||
masters := make([]string, 0)
|
||||
if request.ClusterContext != nil {
|
||||
masters = append(masters, request.ClusterContext.MasterGrpcAddresses...)
|
||||
}
|
||||
|
||||
metrics, activeTopology, err := h.collectVolumeMetrics(ctx, masters, collectionFilter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology}
|
||||
maxResults := int(request.MaxResults)
|
||||
if maxResults < 0 {
|
||||
maxResults = 0
|
||||
}
|
||||
|
||||
results, hasMore, err := ecbalancetask.Detection(ctx, metrics, clusterInfo, workerConfig.TaskConfig, maxResults)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if traceErr := emitECBalanceDecisionTrace(sender, workerConfig.TaskConfig, results, maxResults, hasMore); traceErr != nil {
|
||||
glog.Warningf("Plugin worker failed to emit ec_balance detection trace: %v", traceErr)
|
||||
}
|
||||
|
||||
proposals := make([]*plugin_pb.JobProposal, 0, len(results))
|
||||
for _, result := range results {
|
||||
proposal, proposalErr := buildECBalanceProposal(result)
|
||||
if proposalErr != nil {
|
||||
glog.Warningf("Plugin worker skip invalid ec_balance proposal: %v", proposalErr)
|
||||
continue
|
||||
}
|
||||
proposals = append(proposals, proposal)
|
||||
}
|
||||
|
||||
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
|
||||
JobType: "ec_balance",
|
||||
Proposals: proposals,
|
||||
HasMore: hasMore,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return sender.SendComplete(&plugin_pb.DetectionComplete{
|
||||
JobType: "ec_balance",
|
||||
Success: true,
|
||||
TotalProposals: int32(len(proposals)),
|
||||
})
|
||||
}
|
||||
|
||||
func (h *ECBalanceHandler) Execute(
|
||||
ctx context.Context,
|
||||
request *plugin_pb.ExecuteJobRequest,
|
||||
sender ExecutionSender,
|
||||
) error {
|
||||
if request == nil || request.Job == nil {
|
||||
return fmt.Errorf("execute request/job is nil")
|
||||
}
|
||||
if sender == nil {
|
||||
return fmt.Errorf("execution sender is nil")
|
||||
}
|
||||
if request.Job.JobType != "" && request.Job.JobType != "ec_balance" {
|
||||
return fmt.Errorf("job type %q is not handled by ec_balance worker", request.Job.JobType)
|
||||
}
|
||||
|
||||
params, err := decodeECBalanceTaskParams(request.Job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" {
|
||||
return fmt.Errorf("ec balance source node is required")
|
||||
}
|
||||
if len(params.Targets) == 0 || strings.TrimSpace(params.Targets[0].Node) == "" {
|
||||
return fmt.Errorf("ec balance target node is required")
|
||||
}
|
||||
|
||||
task := ecbalancetask.NewECBalanceTask(
|
||||
request.Job.JobId,
|
||||
params.VolumeId,
|
||||
params.Collection,
|
||||
h.grpcDialOption,
|
||||
)
|
||||
|
||||
execCtx, execCancel := context.WithCancel(ctx)
|
||||
defer execCancel()
|
||||
|
||||
task.SetProgressCallback(func(progress float64, stage string) {
|
||||
message := fmt.Sprintf("ec balance progress %.0f%%", progress)
|
||||
if strings.TrimSpace(stage) != "" {
|
||||
message = stage
|
||||
}
|
||||
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
|
||||
JobId: request.Job.JobId,
|
||||
JobType: request.Job.JobType,
|
||||
State: plugin_pb.JobState_JOB_STATE_RUNNING,
|
||||
ProgressPercent: progress,
|
||||
Stage: stage,
|
||||
Message: message,
|
||||
Activities: []*plugin_pb.ActivityEvent{
|
||||
BuildExecutorActivity(stage, message),
|
||||
},
|
||||
}); err != nil {
|
||||
glog.Warningf("EC balance job %s (%s): failed to send progress (%.0f%%, stage=%q): %v, cancelling execution",
|
||||
request.Job.JobId, request.Job.JobType, progress, stage, err)
|
||||
execCancel()
|
||||
}
|
||||
})
|
||||
|
||||
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
|
||||
JobId: request.Job.JobId,
|
||||
JobType: request.Job.JobType,
|
||||
State: plugin_pb.JobState_JOB_STATE_ASSIGNED,
|
||||
ProgressPercent: 0,
|
||||
Stage: "assigned",
|
||||
Message: "ec balance job accepted",
|
||||
Activities: []*plugin_pb.ActivityEvent{
|
||||
BuildExecutorActivity("assigned", "ec balance job accepted"),
|
||||
},
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := task.Execute(execCtx, params); err != nil {
|
||||
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
|
||||
JobId: request.Job.JobId,
|
||||
JobType: request.Job.JobType,
|
||||
State: plugin_pb.JobState_JOB_STATE_FAILED,
|
||||
ProgressPercent: 100,
|
||||
Stage: "failed",
|
||||
Message: err.Error(),
|
||||
Activities: []*plugin_pb.ActivityEvent{
|
||||
BuildExecutorActivity("failed", err.Error()),
|
||||
},
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
sourceNode := params.Sources[0].Node
|
||||
targetNode := params.Targets[0].Node
|
||||
resultSummary := fmt.Sprintf("EC shard balance completed: volume %d shards moved from %s to %s",
|
||||
params.VolumeId, sourceNode, targetNode)
|
||||
|
||||
return sender.SendCompleted(&plugin_pb.JobCompleted{
|
||||
JobId: request.Job.JobId,
|
||||
JobType: request.Job.JobType,
|
||||
Success: true,
|
||||
Result: &plugin_pb.JobResult{
|
||||
Summary: resultSummary,
|
||||
OutputValues: map[string]*plugin_pb.ConfigValue{
|
||||
"volume_id": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(params.VolumeId)},
|
||||
},
|
||||
"source_server": {
|
||||
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode},
|
||||
},
|
||||
"target_server": {
|
||||
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: targetNode},
|
||||
},
|
||||
},
|
||||
},
|
||||
Activities: []*plugin_pb.ActivityEvent{
|
||||
BuildExecutorActivity("completed", resultSummary),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (h *ECBalanceHandler) collectVolumeMetrics(
|
||||
ctx context.Context,
|
||||
masterAddresses []string,
|
||||
collectionFilter string,
|
||||
) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) {
|
||||
metrics, activeTopology, _, err := collectVolumeMetricsFromMasters(ctx, masterAddresses, collectionFilter, h.grpcDialOption)
|
||||
return metrics, activeTopology, err
|
||||
}
|
||||
|
||||
func deriveECBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *ecBalanceWorkerConfig {
|
||||
taskConfig := ecbalancetask.NewDefaultConfig()
|
||||
|
||||
imbalanceThreshold := readDoubleConfig(values, "imbalance_threshold", taskConfig.ImbalanceThreshold)
|
||||
if imbalanceThreshold < ecBalanceMinImbalanceThreshold {
|
||||
imbalanceThreshold = ecBalanceMinImbalanceThreshold
|
||||
}
|
||||
if imbalanceThreshold > ecBalanceMaxImbalanceThreshold {
|
||||
imbalanceThreshold = ecBalanceMaxImbalanceThreshold
|
||||
}
|
||||
taskConfig.ImbalanceThreshold = imbalanceThreshold
|
||||
|
||||
minServerCountRaw := readInt64Config(values, "min_server_count", int64(taskConfig.MinServerCount))
|
||||
if minServerCountRaw < int64(ecBalanceMinServerCount) {
|
||||
minServerCountRaw = int64(ecBalanceMinServerCount)
|
||||
}
|
||||
if minServerCountRaw > math.MaxInt32 {
|
||||
minServerCountRaw = math.MaxInt32
|
||||
}
|
||||
taskConfig.MinServerCount = int(minServerCountRaw)
|
||||
|
||||
minIntervalRaw := readInt64Config(values, "min_interval_seconds", 60*60)
|
||||
if minIntervalRaw < 0 {
|
||||
minIntervalRaw = 0
|
||||
}
|
||||
if minIntervalRaw > math.MaxInt32 {
|
||||
minIntervalRaw = math.MaxInt32
|
||||
}
|
||||
minIntervalSeconds := int(minIntervalRaw)
|
||||
|
||||
taskConfig.PreferredTags = util.NormalizeTagList(readStringListConfig(values, "preferred_tags"))
|
||||
|
||||
return &ecBalanceWorkerConfig{
|
||||
TaskConfig: taskConfig,
|
||||
MinIntervalSeconds: minIntervalSeconds,
|
||||
}
|
||||
}
|
||||
|
||||
func buildECBalanceProposal(result *workertypes.TaskDetectionResult) (*plugin_pb.JobProposal, error) {
|
||||
if result == nil {
|
||||
return nil, fmt.Errorf("task detection result is nil")
|
||||
}
|
||||
if result.TypedParams == nil {
|
||||
return nil, fmt.Errorf("missing typed params for volume %d", result.VolumeID)
|
||||
}
|
||||
|
||||
paramsPayload, err := proto.Marshal(result.TypedParams)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal task params: %w", err)
|
||||
}
|
||||
|
||||
proposalID := strings.TrimSpace(result.TaskID)
|
||||
if proposalID == "" {
|
||||
proposalID = fmt.Sprintf("ec-balance-%d-%d", result.VolumeID, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// Dedupe key includes volume ID, shard ID, source node, and collection
|
||||
// to distinguish moves of the same shard from different source nodes (e.g. dedup)
|
||||
dedupeKey := fmt.Sprintf("ec_balance:%d", result.VolumeID)
|
||||
if len(result.TypedParams.Sources) > 0 {
|
||||
src := result.TypedParams.Sources[0]
|
||||
if len(src.ShardIds) > 0 {
|
||||
dedupeKey = fmt.Sprintf("ec_balance:%d:%d:%s", result.VolumeID, src.ShardIds[0], src.Node)
|
||||
}
|
||||
}
|
||||
if result.Collection != "" {
|
||||
dedupeKey += ":" + result.Collection
|
||||
}
|
||||
|
||||
sourceNode := ""
|
||||
targetNode := ""
|
||||
if len(result.TypedParams.Sources) > 0 {
|
||||
sourceNode = strings.TrimSpace(result.TypedParams.Sources[0].Node)
|
||||
}
|
||||
if len(result.TypedParams.Targets) > 0 {
|
||||
targetNode = strings.TrimSpace(result.TypedParams.Targets[0].Node)
|
||||
}
|
||||
|
||||
summary := fmt.Sprintf("Balance EC shard of volume %d", result.VolumeID)
|
||||
if sourceNode != "" && targetNode != "" {
|
||||
summary = fmt.Sprintf("Move EC shard of volume %d: %s → %s", result.VolumeID, sourceNode, targetNode)
|
||||
}
|
||||
|
||||
return &plugin_pb.JobProposal{
|
||||
ProposalId: proposalID,
|
||||
DedupeKey: dedupeKey,
|
||||
JobType: "ec_balance",
|
||||
Priority: mapTaskPriority(result.Priority),
|
||||
Summary: summary,
|
||||
Detail: strings.TrimSpace(result.Reason),
|
||||
Parameters: map[string]*plugin_pb.ConfigValue{
|
||||
"task_params_pb": {
|
||||
Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: paramsPayload},
|
||||
},
|
||||
"volume_id": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(result.VolumeID)},
|
||||
},
|
||||
"source_server": {
|
||||
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode},
|
||||
},
|
||||
"target_server": {
|
||||
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: targetNode},
|
||||
},
|
||||
"collection": {
|
||||
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection},
|
||||
},
|
||||
},
|
||||
Labels: map[string]string{
|
||||
"task_type": "ec_balance",
|
||||
"volume_id": fmt.Sprintf("%d", result.VolumeID),
|
||||
"collection": result.Collection,
|
||||
"source_node": sourceNode,
|
||||
"target_node": targetNode,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func decodeECBalanceTaskParams(job *plugin_pb.JobSpec) (*worker_pb.TaskParams, error) {
|
||||
if job == nil {
|
||||
return nil, fmt.Errorf("job spec is nil")
|
||||
}
|
||||
|
||||
// Try protobuf-encoded params first (preferred path)
|
||||
if payload := readBytesConfig(job.Parameters, "task_params_pb"); len(payload) > 0 {
|
||||
params := &worker_pb.TaskParams{}
|
||||
if err := proto.Unmarshal(payload, params); err != nil {
|
||||
return nil, fmt.Errorf("decodeECBalanceTaskParams: unmarshal task_params_pb: %w", err)
|
||||
}
|
||||
if params.TaskId == "" {
|
||||
params.TaskId = job.JobId
|
||||
}
|
||||
// Validate execution-critical fields in the deserialized TaskParams
|
||||
if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" {
|
||||
return nil, fmt.Errorf("decodeECBalanceTaskParams: TaskParams missing Sources[0].Node")
|
||||
}
|
||||
if len(params.Sources[0].ShardIds) == 0 {
|
||||
return nil, fmt.Errorf("decodeECBalanceTaskParams: TaskParams missing Sources[0].ShardIds")
|
||||
}
|
||||
if len(params.Targets) == 0 || strings.TrimSpace(params.Targets[0].Node) == "" {
|
||||
return nil, fmt.Errorf("decodeECBalanceTaskParams: TaskParams missing Targets[0].Node")
|
||||
}
|
||||
if len(params.Targets[0].ShardIds) == 0 {
|
||||
return nil, fmt.Errorf("decodeECBalanceTaskParams: TaskParams missing Targets[0].ShardIds")
|
||||
}
|
||||
return params, nil
|
||||
}
|
||||
|
||||
// Legacy fallback: construct TaskParams from individual scalar parameters.
|
||||
// All execution-critical fields are required.
|
||||
volumeID := readInt64Config(job.Parameters, "volume_id", 0)
|
||||
sourceNode := strings.TrimSpace(readStringConfig(job.Parameters, "source_server", ""))
|
||||
targetNode := strings.TrimSpace(readStringConfig(job.Parameters, "target_server", ""))
|
||||
collection := readStringConfig(job.Parameters, "collection", "")
|
||||
|
||||
if volumeID <= 0 || volumeID > math.MaxUint32 {
|
||||
return nil, fmt.Errorf("decodeECBalanceTaskParams: invalid or missing volume_id: %d", volumeID)
|
||||
}
|
||||
if sourceNode == "" {
|
||||
return nil, fmt.Errorf("decodeECBalanceTaskParams: missing source_server")
|
||||
}
|
||||
if targetNode == "" {
|
||||
return nil, fmt.Errorf("decodeECBalanceTaskParams: missing target_server")
|
||||
}
|
||||
|
||||
shardIDVal, hasShardID := job.Parameters["shard_id"]
|
||||
if !hasShardID || shardIDVal == nil {
|
||||
return nil, fmt.Errorf("decodeECBalanceTaskParams: missing shard_id (required for EcBalanceTaskParams)")
|
||||
}
|
||||
shardID := readInt64Config(job.Parameters, "shard_id", -1)
|
||||
if shardID < 0 || shardID > math.MaxUint32 {
|
||||
return nil, fmt.Errorf("decodeECBalanceTaskParams: invalid shard_id: %d", shardID)
|
||||
}
|
||||
|
||||
sourceDiskID := readInt64Config(job.Parameters, "source_disk_id", 0)
|
||||
if sourceDiskID < 0 || sourceDiskID > math.MaxUint32 {
|
||||
return nil, fmt.Errorf("decodeECBalanceTaskParams: invalid source_disk_id: %d", sourceDiskID)
|
||||
}
|
||||
targetDiskID := readInt64Config(job.Parameters, "target_disk_id", 0)
|
||||
if targetDiskID < 0 || targetDiskID > math.MaxUint32 {
|
||||
return nil, fmt.Errorf("decodeECBalanceTaskParams: invalid target_disk_id: %d", targetDiskID)
|
||||
}
|
||||
|
||||
return &worker_pb.TaskParams{
|
||||
TaskId: job.JobId,
|
||||
VolumeId: uint32(volumeID),
|
||||
Collection: collection,
|
||||
Sources: []*worker_pb.TaskSource{{
|
||||
Node: sourceNode,
|
||||
DiskId: uint32(sourceDiskID),
|
||||
ShardIds: []uint32{uint32(shardID)},
|
||||
}},
|
||||
Targets: []*worker_pb.TaskTarget{{
|
||||
Node: targetNode,
|
||||
DiskId: uint32(targetDiskID),
|
||||
ShardIds: []uint32{uint32(shardID)},
|
||||
}},
|
||||
TaskParams: &worker_pb.TaskParams_EcBalanceParams{
|
||||
EcBalanceParams: &worker_pb.EcBalanceTaskParams{
|
||||
TimeoutSeconds: 600,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func emitECBalanceDecisionTrace(
|
||||
sender DetectionSender,
|
||||
taskConfig *ecbalancetask.Config,
|
||||
results []*workertypes.TaskDetectionResult,
|
||||
maxResults int,
|
||||
hasMore bool,
|
||||
) error {
|
||||
if sender == nil || taskConfig == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Count moves by phase
|
||||
phaseCounts := make(map[string]int)
|
||||
for _, result := range results {
|
||||
if result.Reason != "" {
|
||||
// Extract phase from reason string
|
||||
for _, phase := range []string{"dedup", "cross_rack", "within_rack", "global"} {
|
||||
if strings.Contains(result.Reason, phase) {
|
||||
phaseCounts[phase]++
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
summarySuffix := ""
|
||||
if hasMore {
|
||||
summarySuffix = fmt.Sprintf(" (max_results=%d reached)", maxResults)
|
||||
}
|
||||
|
||||
summaryMessage := fmt.Sprintf(
|
||||
"EC balance detection: %d moves proposed%s (dedup=%d, cross_rack=%d, within_rack=%d, global=%d)",
|
||||
len(results),
|
||||
summarySuffix,
|
||||
phaseCounts["dedup"],
|
||||
phaseCounts["cross_rack"],
|
||||
phaseCounts["within_rack"],
|
||||
phaseCounts["global"],
|
||||
)
|
||||
|
||||
return sender.SendActivity(BuildDetectorActivity("decision_summary", summaryMessage, map[string]*plugin_pb.ConfigValue{
|
||||
"total_moves": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(results))},
|
||||
},
|
||||
"has_more": {
|
||||
Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: hasMore},
|
||||
},
|
||||
"dedup_moves": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(phaseCounts["dedup"])},
|
||||
},
|
||||
"cross_rack_moves": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(phaseCounts["cross_rack"])},
|
||||
},
|
||||
"within_rack_moves": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(phaseCounts["within_rack"])},
|
||||
},
|
||||
"global_moves": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(phaseCounts["global"])},
|
||||
},
|
||||
"imbalance_threshold": {
|
||||
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: taskConfig.ImbalanceThreshold},
|
||||
},
|
||||
"min_server_count": {
|
||||
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.MinServerCount)},
|
||||
},
|
||||
}))
|
||||
}
|
||||
325
weed/plugin/worker/ec_balance_handler_test.go
Normal file
325
weed/plugin/worker/ec_balance_handler_test.go
Normal file
@@ -0,0 +1,325 @@
|
||||
package pluginworker
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
||||
ecbalancetask "github.com/seaweedfs/seaweedfs/weed/worker/tasks/ec_balance"
|
||||
workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func TestDeriveECBalanceWorkerConfig(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
values map[string]*plugin_pb.ConfigValue
|
||||
check func(t *testing.T, config *ecBalanceWorkerConfig)
|
||||
}{
|
||||
{
|
||||
name: "nil values uses defaults",
|
||||
values: nil,
|
||||
check: func(t *testing.T, config *ecBalanceWorkerConfig) {
|
||||
if config.TaskConfig.ImbalanceThreshold != 0.2 {
|
||||
t.Errorf("expected default threshold 0.2, got %f", config.TaskConfig.ImbalanceThreshold)
|
||||
}
|
||||
if config.TaskConfig.MinServerCount != 3 {
|
||||
t.Errorf("expected default min_server_count 3, got %d", config.TaskConfig.MinServerCount)
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "custom threshold",
|
||||
values: map[string]*plugin_pb.ConfigValue{
|
||||
"imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.3}},
|
||||
"min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 5}},
|
||||
},
|
||||
check: func(t *testing.T, config *ecBalanceWorkerConfig) {
|
||||
if config.TaskConfig.ImbalanceThreshold != 0.3 {
|
||||
t.Errorf("expected threshold 0.3, got %f", config.TaskConfig.ImbalanceThreshold)
|
||||
}
|
||||
if config.TaskConfig.MinServerCount != 5 {
|
||||
t.Errorf("expected min_server_count 5, got %d", config.TaskConfig.MinServerCount)
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "threshold clamped to min",
|
||||
values: map[string]*plugin_pb.ConfigValue{
|
||||
"imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.01}},
|
||||
},
|
||||
check: func(t *testing.T, config *ecBalanceWorkerConfig) {
|
||||
if config.TaskConfig.ImbalanceThreshold != 0.05 {
|
||||
t.Errorf("expected clamped threshold 0.05, got %f", config.TaskConfig.ImbalanceThreshold)
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "threshold clamped to max",
|
||||
values: map[string]*plugin_pb.ConfigValue{
|
||||
"imbalance_threshold": {Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.9}},
|
||||
},
|
||||
check: func(t *testing.T, config *ecBalanceWorkerConfig) {
|
||||
if config.TaskConfig.ImbalanceThreshold != 0.5 {
|
||||
t.Errorf("expected clamped threshold 0.5, got %f", config.TaskConfig.ImbalanceThreshold)
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "min_server_count clamped to 2",
|
||||
values: map[string]*plugin_pb.ConfigValue{
|
||||
"min_server_count": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
|
||||
},
|
||||
check: func(t *testing.T, config *ecBalanceWorkerConfig) {
|
||||
if config.TaskConfig.MinServerCount != 2 {
|
||||
t.Errorf("expected min_server_count 2, got %d", config.TaskConfig.MinServerCount)
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
config := deriveECBalanceWorkerConfig(tt.values)
|
||||
tt.check(t, config)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildECBalanceProposal(t *testing.T) {
|
||||
result := &workertypes.TaskDetectionResult{
|
||||
TaskID: "test-task-123",
|
||||
TaskType: workertypes.TaskTypeECBalance,
|
||||
VolumeID: 42,
|
||||
Collection: "test-col",
|
||||
Priority: workertypes.TaskPriorityMedium,
|
||||
Reason: "cross_rack balance",
|
||||
TypedParams: &worker_pb.TaskParams{
|
||||
VolumeId: 42,
|
||||
Collection: "test-col",
|
||||
Sources: []*worker_pb.TaskSource{{
|
||||
Node: "source:8080",
|
||||
ShardIds: []uint32{5},
|
||||
}},
|
||||
Targets: []*worker_pb.TaskTarget{{
|
||||
Node: "target:8080",
|
||||
ShardIds: []uint32{5},
|
||||
}},
|
||||
},
|
||||
}
|
||||
|
||||
proposal, err := buildECBalanceProposal(result)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if proposal.JobType != "ec_balance" {
|
||||
t.Errorf("expected job type ec_balance, got %s", proposal.JobType)
|
||||
}
|
||||
if proposal.ProposalId != "test-task-123" {
|
||||
t.Errorf("expected proposal ID test-task-123, got %s", proposal.ProposalId)
|
||||
}
|
||||
if proposal.DedupeKey != "ec_balance:42:5:source:8080:test-col" {
|
||||
t.Errorf("expected dedupe key ec_balance:42:5:source:8080:test-col, got %s", proposal.DedupeKey)
|
||||
}
|
||||
if proposal.Labels["source_node"] != "source:8080" {
|
||||
t.Errorf("expected source_node label source:8080, got %s", proposal.Labels["source_node"])
|
||||
}
|
||||
if proposal.Labels["target_node"] != "target:8080" {
|
||||
t.Errorf("expected target_node label target:8080, got %s", proposal.Labels["target_node"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildECBalanceProposalNilResult(t *testing.T) {
|
||||
_, err := buildECBalanceProposal(nil)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for nil result")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeECBalanceTaskParamsFromProtobuf(t *testing.T) {
|
||||
originalParams := &worker_pb.TaskParams{
|
||||
TaskId: "test-id",
|
||||
VolumeId: 100,
|
||||
Collection: "test-col",
|
||||
Sources: []*worker_pb.TaskSource{{
|
||||
Node: "source:8080",
|
||||
ShardIds: []uint32{3},
|
||||
}},
|
||||
Targets: []*worker_pb.TaskTarget{{
|
||||
Node: "target:8080",
|
||||
ShardIds: []uint32{3},
|
||||
}},
|
||||
TaskParams: &worker_pb.TaskParams_EcBalanceParams{
|
||||
EcBalanceParams: &worker_pb.EcBalanceTaskParams{
|
||||
DiskType: "hdd",
|
||||
TimeoutSeconds: 300,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
payload, err := proto.Marshal(originalParams)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to marshal: %v", err)
|
||||
}
|
||||
|
||||
job := &plugin_pb.JobSpec{
|
||||
JobId: "job-1",
|
||||
Parameters: map[string]*plugin_pb.ConfigValue{
|
||||
"task_params_pb": {Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}},
|
||||
},
|
||||
}
|
||||
|
||||
decoded, err := decodeECBalanceTaskParams(job)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to decode: %v", err)
|
||||
}
|
||||
|
||||
if decoded.VolumeId != 100 {
|
||||
t.Errorf("expected volume_id 100, got %d", decoded.VolumeId)
|
||||
}
|
||||
if decoded.Collection != "test-col" {
|
||||
t.Errorf("expected collection test-col, got %s", decoded.Collection)
|
||||
}
|
||||
if len(decoded.Sources) != 1 || decoded.Sources[0].Node != "source:8080" {
|
||||
t.Error("source mismatch")
|
||||
}
|
||||
if len(decoded.Targets) != 1 || decoded.Targets[0].Node != "target:8080" {
|
||||
t.Error("target mismatch")
|
||||
}
|
||||
ecParams := decoded.GetEcBalanceParams()
|
||||
if ecParams == nil {
|
||||
t.Fatal("expected ec_balance_params")
|
||||
}
|
||||
if ecParams.DiskType != "hdd" {
|
||||
t.Errorf("expected disk_type hdd, got %s", ecParams.DiskType)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeECBalanceTaskParamsFallback(t *testing.T) {
|
||||
job := &plugin_pb.JobSpec{
|
||||
JobId: "job-2",
|
||||
Parameters: map[string]*plugin_pb.ConfigValue{
|
||||
"volume_id": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 200}},
|
||||
"source_server": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "src:8080"}},
|
||||
"target_server": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "dst:8080"}},
|
||||
"collection": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "fallback-col"}},
|
||||
"shard_id": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 7}},
|
||||
"source_disk_id": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
|
||||
"target_disk_id": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2}},
|
||||
},
|
||||
}
|
||||
|
||||
decoded, err := decodeECBalanceTaskParams(job)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to decode fallback: %v", err)
|
||||
}
|
||||
|
||||
if decoded.VolumeId != 200 {
|
||||
t.Errorf("expected volume_id 200, got %d", decoded.VolumeId)
|
||||
}
|
||||
if len(decoded.Sources) != 1 || decoded.Sources[0].Node != "src:8080" {
|
||||
t.Error("source mismatch")
|
||||
}
|
||||
if decoded.Sources[0].ShardIds[0] != 7 {
|
||||
t.Errorf("expected shard_id 7, got %d", decoded.Sources[0].ShardIds[0])
|
||||
}
|
||||
if decoded.Sources[0].DiskId != 1 {
|
||||
t.Errorf("expected source disk_id 1, got %d", decoded.Sources[0].DiskId)
|
||||
}
|
||||
if decoded.Targets[0].DiskId != 2 {
|
||||
t.Errorf("expected target disk_id 2, got %d", decoded.Targets[0].DiskId)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeECBalanceTaskParamsProtobufValidation(t *testing.T) {
|
||||
// Protobuf payload with missing ShardIds should fail validation
|
||||
badParams := &worker_pb.TaskParams{
|
||||
TaskId: "test-id",
|
||||
VolumeId: 100,
|
||||
Sources: []*worker_pb.TaskSource{{Node: "source:8080"}}, // no ShardIds
|
||||
Targets: []*worker_pb.TaskTarget{{Node: "target:8080", ShardIds: []uint32{3}}},
|
||||
}
|
||||
payload, _ := proto.Marshal(badParams)
|
||||
job := &plugin_pb.JobSpec{
|
||||
JobId: "job-validate",
|
||||
Parameters: map[string]*plugin_pb.ConfigValue{
|
||||
"task_params_pb": {Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}},
|
||||
},
|
||||
}
|
||||
_, err := decodeECBalanceTaskParams(job)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for missing Sources[0].ShardIds in protobuf")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeECBalanceTaskParamsMissingShardID(t *testing.T) {
|
||||
job := &plugin_pb.JobSpec{
|
||||
JobId: "job-3",
|
||||
Parameters: map[string]*plugin_pb.ConfigValue{
|
||||
"volume_id": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 300}},
|
||||
"source_server": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "src:8080"}},
|
||||
"target_server": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "dst:8080"}},
|
||||
"collection": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "test-col"}},
|
||||
// shard_id deliberately missing
|
||||
},
|
||||
}
|
||||
|
||||
_, err := decodeECBalanceTaskParams(job)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for missing shard_id")
|
||||
}
|
||||
}
|
||||
|
||||
func TestECBalanceHandlerCapability(t *testing.T) {
|
||||
handler := NewECBalanceHandler(nil)
|
||||
cap := handler.Capability()
|
||||
|
||||
if cap.JobType != "ec_balance" {
|
||||
t.Errorf("expected job type ec_balance, got %s", cap.JobType)
|
||||
}
|
||||
if !cap.CanDetect {
|
||||
t.Error("expected CanDetect=true")
|
||||
}
|
||||
if !cap.CanExecute {
|
||||
t.Error("expected CanExecute=true")
|
||||
}
|
||||
if cap.Weight != 60 {
|
||||
t.Errorf("expected weight 60, got %d", cap.Weight)
|
||||
}
|
||||
}
|
||||
|
||||
func TestECBalanceConfigRoundTrip(t *testing.T) {
|
||||
config := ecbalancetask.NewDefaultConfig()
|
||||
config.ImbalanceThreshold = 0.3
|
||||
config.MinServerCount = 5
|
||||
config.CollectionFilter = "my_col"
|
||||
config.DiskType = "ssd"
|
||||
config.PreferredTags = []string{"fast", "ssd"}
|
||||
|
||||
policy := config.ToTaskPolicy()
|
||||
if policy == nil {
|
||||
t.Fatal("expected non-nil policy")
|
||||
}
|
||||
|
||||
config2 := ecbalancetask.NewDefaultConfig()
|
||||
if err := config2.FromTaskPolicy(policy); err != nil {
|
||||
t.Fatalf("failed to load from policy: %v", err)
|
||||
}
|
||||
|
||||
if config2.ImbalanceThreshold != 0.3 {
|
||||
t.Errorf("expected threshold 0.3, got %f", config2.ImbalanceThreshold)
|
||||
}
|
||||
if config2.MinServerCount != 5 {
|
||||
t.Errorf("expected min_server_count 5, got %d", config2.MinServerCount)
|
||||
}
|
||||
if config2.CollectionFilter != "my_col" {
|
||||
t.Errorf("expected collection_filter my_col, got %s", config2.CollectionFilter)
|
||||
}
|
||||
if config2.DiskType != "ssd" {
|
||||
t.Errorf("expected disk_type ssd, got %s", config2.DiskType)
|
||||
}
|
||||
if len(config2.PreferredTags) != 2 || config2.PreferredTags[0] != "fast" {
|
||||
t.Errorf("expected preferred_tags [fast ssd], got %v", config2.PreferredTags)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user