From 5f85bf5e8acd4c9a9f58ef93d35cd9c211d4615b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 9 Mar 2026 19:30:08 -0700 Subject: [PATCH] Batch volume balance: run multiple moves per job (#8561) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * proto: add BalanceMoveSpec and batch fields to BalanceTaskParams Add BalanceMoveSpec message for encoding individual volume moves, and max_concurrent_moves + repeated moves fields to BalanceTaskParams to support batching multiple volume moves in a single job. * balance handler: add batch execution with concurrent volume moves Refactor Execute() into executeSingleMove() (backward compatible) and executeBatchMoves() which runs multiple volume moves concurrently using a semaphore-bounded goroutine pool. When BalanceTaskParams.Moves is populated, the batch path is taken; otherwise the single-move path. Includes aggregate progress reporting across concurrent moves, per-move error collection, and partial failure support. * balance handler: add batch config fields to Descriptor and worker config Add max_concurrent_moves and batch_size fields to the worker config form and deriveBalanceWorkerConfig(). These control how many volume moves run concurrently within a batch job and the maximum batch size. * balance handler: group detection proposals into batch jobs When batch_size > 1, the Detect method groups detection results into batch proposals where each proposal encodes multiple BalanceMoveSpec entries in BalanceTaskParams.Moves. Single-result batches fall back to the existing single-move proposal format for backward compatibility. * admin UI: add volume balance execution plan and batch badge Add renderBalanceExecutionPlan() for rich rendering of volume balance jobs in the job detail modal. Single-move jobs show source/target/volume info; batch jobs show a moves table with all volume moves. Add batch badge (e.g., "5 moves") next to job type in the execution jobs table when the job has batch=true label. * Update plugin_templ.go * fix: detection algorithm uses greedy target instead of divergent topology scores The detection loop tracked effective volume counts via an adjustments map, but createBalanceTask independently called planBalanceDestination which used the topology's LoadCount — a separate, unadjusted source of truth. This divergence caused multiple moves to pile onto the same server. Changes: - Add resolveBalanceDestination to resolve the detection loop's greedy target (minServer) rather than independently picking a destination - Add oscillation guard: stop when max-min <= 1 since no single move can improve the balance beyond that point - Track unseeded destinations: if a target server wasn't in the initial serverVolumeCounts, add it so subsequent iterations include it - Add TestDetection_UnseededDestinationDoesNotOverload * fix: handler force_move propagation, partial failure, deterministic dedupe - Propagate ForceMove from outer BalanceTaskParams to individual move TaskParams so batch moves respect the force_move flag - Fix partial failure: mark job successful if at least one move succeeded (succeeded > 0 || failed == 0) to avoid re-running already-completed moves on retry - Use SHA-256 hash for deterministic dedupe key fallback instead of time.Now().UnixNano() which is non-deterministic - Remove unused successDetails variable - Extract maxProposalStringLength constant to replace magic number 200 * admin UI: use template literals in balance execution plan rendering * fix: integration test handles batch proposals from batched detection With batch_size=20, all moves are grouped into a single proposal containing BalanceParams.Moves instead of top-level Sources/Targets. Update assertions to handle both batch and single-move proposal formats. * fix: verify volume size on target before deleting source during balance Add a pre-delete safety check that reads the volume file status on both source and target, then compares .dat file size and file count. If they don't match, the move is aborted — leaving the source intact rather than risking irreversible data loss. Also removes the redundant mountVolume call since VolumeCopy already mounts the volume on the target server. * fix: clamp maxConcurrent, serialize progress sends, validate config as int64 - Clamp maxConcurrentMoves to defaultMaxConcurrentMoves before creating the semaphore so a stale or malicious job cannot request unbounded concurrent volume moves - Extend progressMu to cover sender.SendProgress calls since the underlying gRPC stream is not safe for concurrent writes - Perform bounds checks on max_concurrent_moves and batch_size in int64 space before casting to int, avoiding potential overflow on 32-bit * fix: check disk capacity in resolveBalanceDestination Skip disks where VolumeCount >= MaxVolumeCount so the detection loop does not propose moves to a full disk that would fail at execution time. * test: rename unseeded destination test to match actual behavior The test exercises a server with 0 volumes that IS seeded from topology (matching disk type), not an unseeded destination. Rename to TestDetection_ZeroVolumeServerIncludedInBalance and fix comments. * test: tighten integration test to assert exactly one batch proposal With default batch_size=20, all moves should be grouped into a single batch proposal. Assert len(proposals)==1 and require BalanceParams with Moves, removing the legacy single-move else branch. * fix: propagate ctx to RPCs and restore source writability on abort - All helper methods (markVolumeReadonly, copyVolume, tailVolume, readVolumeFileStatus, deleteVolume) now accept a context parameter instead of using context.Background(), so Execute's ctx propagates cancellation and timeouts into every volume server RPC - Add deferred cleanup that restores the source volume to writable if any step after markVolumeReadonly fails, preventing the source from being left permanently readonly on abort - Add markVolumeWritable helper using VolumeMarkWritableRequest * fix: deep-copy protobuf messages in test recording sender Use proto.Clone in recordingExecutionSender to store immutable snapshots of JobProgressUpdate and JobCompleted, preventing assertions from observing mutations if the handler reuses message pointers. * fix: add VolumeMarkWritable and ReadVolumeFileStatus to fake volume server The balance task now calls ReadVolumeFileStatus for pre-delete verification and VolumeMarkWritable to restore writability on abort. Add both RPCs to the test fake, and drop the mountCalls assertion since BalanceTask no longer calls VolumeMount directly (VolumeCopy handles it). * fix: use maxConcurrentMovesLimit (50) for clamp, not defaultMaxConcurrentMoves defaultMaxConcurrentMoves (5) is the fallback when the field is unset, not an upper bound. Clamping to it silently overrides valid config values like 10/20/50. Introduce maxConcurrentMovesLimit (50) matching the descriptor's MaxValue and clamp to that instead. * fix: cancel batch moves on progress stream failure Derive a cancellable batchCtx from the caller's ctx. If sender.SendProgress returns an error (client disconnect, context cancelled), capture it, skip further sends, and cancel batchCtx so in-flight moves abort via their propagated context rather than running blind to completion. * fix: bound cleanup timeout and validate batch move fields - Use a 30-second timeout for the deferred markVolumeWritable cleanup instead of context.Background() which can block indefinitely if the volume server is unreachable - Validate required fields (VolumeID, SourceNode, TargetNode) before appending moves to a batch proposal, skipping invalid entries - Fall back to a single-move proposal when filtering leaves only one valid move in a batch * fix: cancel task execution on SendProgress stream failure All handler progress callbacks previously ignored SendProgress errors, allowing tasks to continue executing after the client disconnected. Now each handler creates a derived cancellable context and cancels it on the first SendProgress error, stopping the in-flight task promptly. Handlers fixed: erasure_coding, vacuum, volume_balance (single-move), and admin_script (breaks command loop on send failure). * fix: validate batch moves before scheduling in executeBatchMoves Reject empty batches, enforce a hard upper bound (100 moves), and filter out nil or incomplete move specs (missing source/target/volume) before allocating progress tracking and launching goroutines. * test: add batch balance execution integration test Tests the batch move path with 3 volumes, max concurrency 2, using fake volume servers. Verifies all moves complete with correct readonly, copy, tail, and delete RPC counts. * test: add MarkWritableCount and ReadFileStatusCount accessors Expose the markWritableCalls and readFileStatusCalls counters on the fake volume server, following the existing MarkReadonlyCount pattern. * fix: oscillation guard uses global effective counts for heterogeneous capacity The oscillation guard (max-min <= 1) previously used maxServer/minServer which are determined by utilization ratio. With heterogeneous capacity, maxServer by utilization can have fewer raw volumes than minServer, producing a negative diff and incorrectly triggering the guard. Now scans all servers' effective counts to find the true global max/min volume counts, so the guard works correctly regardless of whether utilization-based or raw-count balancing is used. * fix: admin script handler breaks outer loop on SendProgress failure The break on SendProgress error inside the shell.Commands scan only exited the inner loop, letting the outer command loop continue executing commands on a broken stream. Use a sendBroken flag to propagate the break to the outer execCommands loop. --- test/plugin_workers/fake_volume_server.go | 61 ++- .../volume_balance/detection_test.go | 28 +- .../volume_balance/execution_test.go | 83 ++- weed/admin/view/app/plugin.templ | 49 +- weed/admin/view/app/plugin_templ.go | 2 +- weed/pb/worker.proto | 11 + weed/pb/worker_pb/worker.pb.go | 406 ++++++++------ weed/plugin/worker/admin_script_handler.go | 22 +- weed/plugin/worker/erasure_coding_handler.go | 10 +- weed/plugin/worker/vacuum_handler.go | 10 +- weed/plugin/worker/volume_balance_handler.go | 502 +++++++++++++++++- .../worker/volume_balance_handler_test.go | 364 +++++++++++++ weed/worker/tasks/balance/balance_task.go | 121 +++-- weed/worker/tasks/balance/detection.go | 117 +++- weed/worker/tasks/balance/detection_test.go | 83 +++ 15 files changed, 1619 insertions(+), 250 deletions(-) diff --git a/test/plugin_workers/fake_volume_server.go b/test/plugin_workers/fake_volume_server.go index 922953759..5ad35edff 100644 --- a/test/plugin_workers/fake_volume_server.go +++ b/test/plugin_workers/fake_volume_server.go @@ -27,19 +27,21 @@ type VolumeServer struct { address string baseDir string - mu sync.Mutex - receivedFiles map[string]uint64 - mountRequests []*volume_server_pb.VolumeEcShardsMountRequest - deleteRequests []*volume_server_pb.VolumeDeleteRequest - markReadonlyCalls int - vacuumGarbageRatio float64 - vacuumCheckCalls int - vacuumCompactCalls int - vacuumCommitCalls int - vacuumCleanupCalls int - volumeCopyCalls int - volumeMountCalls int - tailReceiverCalls int + mu sync.Mutex + receivedFiles map[string]uint64 + mountRequests []*volume_server_pb.VolumeEcShardsMountRequest + deleteRequests []*volume_server_pb.VolumeDeleteRequest + markReadonlyCalls int + markWritableCalls int + readFileStatusCalls int + vacuumGarbageRatio float64 + vacuumCheckCalls int + vacuumCompactCalls int + vacuumCommitCalls int + vacuumCleanupCalls int + volumeCopyCalls int + volumeMountCalls int + tailReceiverCalls int } // NewVolumeServer starts a test volume server using the provided base directory. @@ -151,6 +153,20 @@ func (v *VolumeServer) MarkReadonlyCount() int { return v.markReadonlyCalls } +// MarkWritableCount returns the number of writable calls. +func (v *VolumeServer) MarkWritableCount() int { + v.mu.Lock() + defer v.mu.Unlock() + return v.markWritableCalls +} + +// ReadFileStatusCount returns the number of ReadVolumeFileStatus calls. +func (v *VolumeServer) ReadFileStatusCount() int { + v.mu.Lock() + defer v.mu.Unlock() + return v.readFileStatusCalls +} + // Shutdown stops the volume server. func (v *VolumeServer) Shutdown() { if v.server != nil { @@ -280,6 +296,25 @@ func (v *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serve return &volume_server_pb.VolumeMarkReadonlyResponse{}, nil } +func (v *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) { + v.mu.Lock() + v.markWritableCalls++ + v.mu.Unlock() + return &volume_server_pb.VolumeMarkWritableResponse{}, nil +} + +func (v *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) { + v.mu.Lock() + v.readFileStatusCalls++ + v.mu.Unlock() + return &volume_server_pb.ReadVolumeFileStatusResponse{ + VolumeId: req.VolumeId, + DatFileSize: 1024, + IdxFileSize: 16, + FileCount: 1, + }, nil +} + func (v *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_server_pb.VacuumVolumeCheckRequest) (*volume_server_pb.VacuumVolumeCheckResponse, error) { v.mu.Lock() v.vacuumCheckCalls++ diff --git a/test/plugin_workers/volume_balance/detection_test.go b/test/plugin_workers/volume_balance/detection_test.go index 0aa7e3a37..dd3b3dd9f 100644 --- a/test/plugin_workers/volume_balance/detection_test.go +++ b/test/plugin_workers/volume_balance/detection_test.go @@ -37,19 +37,25 @@ func TestVolumeBalanceDetectionIntegration(t *testing.T) { MasterGrpcAddresses: []string{master.Address()}, }, 10) require.NoError(t, err) - // With 10 volumes on one server and 1 on the other (avg=5.5), - // multiple balance moves should be detected until imbalance is within threshold. - require.Greater(t, len(proposals), 1, "expected multiple balance proposals") + // With default batch_size=20 and 10 overloaded volumes vs 1 underloaded, + // all moves are grouped into a single batch proposal. + require.Len(t, proposals, 1, "expected exactly one batch proposal") - for _, proposal := range proposals { - require.Equal(t, "volume_balance", proposal.JobType) - paramsValue := proposal.Parameters["task_params_pb"] - require.NotNil(t, paramsValue) + proposal := proposals[0] + require.Equal(t, "volume_balance", proposal.JobType) + paramsValue := proposal.Parameters["task_params_pb"] + require.NotNil(t, paramsValue) - params := &worker_pb.TaskParams{} - require.NoError(t, proto.Unmarshal(paramsValue.GetBytesValue(), params)) - require.NotEmpty(t, params.Sources) - require.NotEmpty(t, params.Targets) + params := &worker_pb.TaskParams{} + require.NoError(t, proto.Unmarshal(paramsValue.GetBytesValue(), params)) + + bp := params.GetBalanceParams() + require.NotNil(t, bp, "expected BalanceParams in batch proposal") + require.Greater(t, len(bp.Moves), 1, "batch proposal should contain multiple moves") + for _, move := range bp.Moves { + require.NotZero(t, move.VolumeId) + require.NotEmpty(t, move.SourceNode) + require.NotEmpty(t, move.TargetNode) } } diff --git a/test/plugin_workers/volume_balance/execution_test.go b/test/plugin_workers/volume_balance/execution_test.go index 0d2091fc6..e7c13154b 100644 --- a/test/plugin_workers/volume_balance/execution_test.go +++ b/test/plugin_workers/volume_balance/execution_test.go @@ -8,10 +8,12 @@ import ( pluginworkers "github.com/seaweedfs/seaweedfs/test/plugin_workers" "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/proto" ) func TestVolumeBalanceExecutionIntegration(t *testing.T) { @@ -60,8 +62,85 @@ func TestVolumeBalanceExecutionIntegration(t *testing.T) { require.GreaterOrEqual(t, source.MarkReadonlyCount(), 1) require.GreaterOrEqual(t, len(source.DeleteRequests()), 1) - copyCalls, mountCalls, tailCalls := target.BalanceStats() + copyCalls, _, tailCalls := target.BalanceStats() require.GreaterOrEqual(t, copyCalls, 1) - require.GreaterOrEqual(t, mountCalls, 1) require.GreaterOrEqual(t, tailCalls, 1) } + +func TestVolumeBalanceBatchExecutionIntegration(t *testing.T) { + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + handler := pluginworker.NewVolumeBalanceHandler(dialOption) + harness := pluginworkers.NewHarness(t, pluginworkers.HarnessConfig{ + WorkerOptions: pluginworker.WorkerOptions{ + GrpcDialOption: dialOption, + }, + Handlers: []pluginworker.JobHandler{handler}, + }) + harness.WaitForJobType("volume_balance") + + // Create one source and one target fake volume server. + source := pluginworkers.NewVolumeServer(t, "") + target := pluginworkers.NewVolumeServer(t, "") + + // Build a batch job with 3 volume moves from source → target. + volumeIDs := []uint32{401, 402, 403} + moves := make([]*worker_pb.BalanceMoveSpec, len(volumeIDs)) + for i, vid := range volumeIDs { + moves[i] = &worker_pb.BalanceMoveSpec{ + VolumeId: vid, + SourceNode: source.Address(), + TargetNode: target.Address(), + Collection: "batch-test", + } + } + + params := &worker_pb.TaskParams{ + TaskId: "batch-balance-test", + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{ + MaxConcurrentMoves: 2, + Moves: moves, + }, + }, + } + paramBytes, err := proto.Marshal(params) + require.NoError(t, err) + + job := &plugin_pb.JobSpec{ + JobId: "batch-balance-test", + JobType: "volume_balance", + Parameters: map[string]*plugin_pb.ConfigValue{ + "task_params_pb": { + Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: paramBytes}, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + result, err := harness.Plugin().ExecuteJob(ctx, job, nil, 1) + require.NoError(t, err) + require.NotNil(t, result) + require.True(t, result.Success, "batch balance job should succeed; result: %+v", result) + + // Each of the 3 moves should have marked the source readonly and deleted. + require.Equal(t, len(volumeIDs), source.MarkReadonlyCount(), + "each move should mark source volume readonly") + require.Equal(t, len(volumeIDs), len(source.DeleteRequests()), + "each move should delete the source volume") + + // Verify delete requests reference the expected volume IDs. + deletedVols := make(map[uint32]bool) + for _, req := range source.DeleteRequests() { + deletedVols[req.VolumeId] = true + } + for _, vid := range volumeIDs { + require.True(t, deletedVols[vid], "volume %d should have been deleted from source", vid) + } + + // Target should have received copy and tail calls for all 3 volumes. + copyCalls, _, tailCalls := target.BalanceStats() + require.Equal(t, len(volumeIDs), copyCalls, "target should receive one copy per volume") + require.Equal(t, len(volumeIDs), tailCalls, "target should receive one tail per volume") +} diff --git a/weed/admin/view/app/plugin.templ b/weed/admin/view/app/plugin.templ index 2b9e8fe27..2ccd6705c 100644 --- a/weed/admin/view/app/plugin.templ +++ b/weed/admin/view/app/plugin.templ @@ -1026,6 +1026,9 @@ templ Plugin(page string) { } var jobType = String(plan.job_type || '').trim().toLowerCase(); + if (jobType === 'volume_balance') { + return renderBalanceExecutionPlan(plan); + } if (jobType !== 'erasure_coding') { var fallbackText = toPrettyJson(plan); if (!fallbackText) { @@ -1107,6 +1110,44 @@ templ Plugin(page string) { return html; } + function renderBalanceExecutionPlan(plan) { + var html = '
Execution Plan
'; + var moves = Array.isArray(plan.moves) ? plan.moves : []; + + if (moves.length === 0) { + // Single-move balance job + var src = textOrDash(plan.source_node || plan.source_server); + var dst = textOrDash(plan.target_node || plan.target_server); + var vid = textOrDash(plan.volume_id); + var col = textOrDash(plan.collection); + html += `
+
Volume: ${escapeHtml(vid)}
+
Collection: ${escapeHtml(col)}
+
Source: ${escapeHtml(src)}
+
Target: ${escapeHtml(dst)}
+
`; + } else { + // Batch balance job + html += '
' + escapeHtml(String(moves.length)) + ' moves
'; + html += '
' + + ''; + for (var i = 0; i < moves.length; i++) { + var move = moves[i] || {}; + html += ` + + + + + + `; + } + html += '
#VolumeSourceTargetCollection
${escapeHtml(String(i + 1))}${escapeHtml(textOrDash(move.volume_id))}${escapeHtml(textOrDash(move.source_node))}${escapeHtml(textOrDash(move.target_node))}${escapeHtml(textOrDash(move.collection))}
'; + } + + html += '
'; + return html; + } + function isActiveJobState(candidateState) { var jobState = candidateState; if (candidateState && typeof candidateState === 'object' && candidateState.state !== undefined) { @@ -1676,9 +1717,15 @@ templ Plugin(page string) { barClass = 'bg-warning'; } + var jobTypeCell = escapeHtml(textOrDash(executionJob.job_type)); + var execLabels = executionJob.labels || {}; + if (execLabels.batch === 'true' && execLabels.batch_size) { + jobTypeCell += ' ' + escapeHtml(execLabels.batch_size) + ' moves'; + } + rows += '' + '' + renderJobLink(executionJob.job_id) + '' + - '' + escapeHtml(textOrDash(executionJob.job_type)) + '' + + '' + jobTypeCell + '' + '' + escapeHtml(textOrDash(executionJob.state)) + '' + '
' + Math.round(progress) + '%
' + '' + escapeHtml(textOrDash(executionJob.worker_id)) + '' + diff --git a/weed/admin/view/app/plugin_templ.go b/weed/admin/view/app/plugin_templ.go index cd9ae58d2..c43d04078 100644 --- a/weed/admin/view/app/plugin_templ.go +++ b/weed/admin/view/app/plugin_templ.go @@ -46,7 +46,7 @@ func Plugin(page string) templ.Component { if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } - templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 2, "\">

Workers

Cluster-wide worker status, per-job configuration, detection, queue, and execution workflows.

Workers

0

Active Jobs

0

Activities (recent)

0

Next Run

-

Per Job Type Summary
Job TypeActive JobsRecent Activities
Loading...
Scheduler State
Sequential scheduler with per-job runtime limits
Job TypeEnabledDetectorIn FlightMax RuntimeExec GlobalExec/WorkerExecutor WorkersEffective ExecLast Run
Loading...
Workers
WorkerAddressCapabilitiesLoad
Loading...
Job Type Configuration
Not loaded
Selected Job Type
-
Descriptor
Select a job type to load schema and config.
Admin Config Form
No admin form loaded.
Worker Config Form
No worker form loaded.
Job Scheduling Settings
How often to check for new work.
Next Run
Scheduler
-
Not scheduled
Run History
Keep last 10 success + last 10 errors
Successful Runs
TimeJob IDWorkerDuration
No data
Error Runs
TimeJob IDWorkerError
No data
Detection Results
Run detection to see proposals.
Job Queue
States: pending/assigned/running
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Detection Jobs
Detection activities for selected job type
TimeJob TypeRequest IDWorkerStageSourceMessage
Loading...
Execution Jobs
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Execution Activities
Non-detection events only
TimeJob TypeJob IDSourceStageMessage
Loading...
Job Detail
Select a job to view details.
") + templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 2, "\">

Workers

Cluster-wide worker status, per-job configuration, detection, queue, and execution workflows.

Workers

0

Active Jobs

0

Activities (recent)

0

Next Run

-

Per Job Type Summary
Job TypeActive JobsRecent Activities
Loading...
Scheduler State
Sequential scheduler with per-job runtime limits
Job TypeEnabledDetectorIn FlightMax RuntimeExec GlobalExec/WorkerExecutor WorkersEffective ExecLast Run
Loading...
Workers
WorkerAddressCapabilitiesLoad
Loading...
Job Type Configuration
Not loaded
Selected Job Type
-
Descriptor
Select a job type to load schema and config.
Admin Config Form
No admin form loaded.
Worker Config Form
No worker form loaded.
Job Scheduling Settings
How often to check for new work.
Next Run
Scheduler
-
Not scheduled
Run History
Keep last 10 success + last 10 errors
Successful Runs
TimeJob IDWorkerDuration
No data
Error Runs
TimeJob IDWorkerError
No data
Detection Results
Run detection to see proposals.
Job Queue
States: pending/assigned/running
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Detection Jobs
Detection activities for selected job type
TimeJob TypeRequest IDWorkerStageSourceMessage
Loading...
Execution Jobs
Job IDTypeStateProgressWorkerUpdatedMessage
Loading...
Execution Activities
Non-detection events only
TimeJob TypeJob IDSourceStageMessage
Loading...
Job Detail
Select a job to view details.
") if templ_7745c5c3_Err != nil { return templ_7745c5c3_Err } diff --git a/weed/pb/worker.proto b/weed/pb/worker.proto index bfb775151..9f9684473 100644 --- a/weed/pb/worker.proto +++ b/weed/pb/worker.proto @@ -157,10 +157,21 @@ message TaskTarget { +// BalanceMoveSpec describes a single volume move within a batch balance job +message BalanceMoveSpec { + uint32 volume_id = 1; // Volume to move + string source_node = 2; // Source server address (host:port) + string target_node = 3; // Destination server address (host:port) + string collection = 4; // Collection name + uint64 volume_size = 5; // Volume size in bytes (informational) +} + // BalanceTaskParams for volume balancing operations message BalanceTaskParams { bool force_move = 1; // Force move even with conflicts int32 timeout_seconds = 2; // Operation timeout + int32 max_concurrent_moves = 3; // Max concurrent moves in a batch job (0 = default 5) + repeated BalanceMoveSpec moves = 4; // Batch: multiple volume moves in one job } // ReplicationTaskParams for adding replicas diff --git a/weed/pb/worker_pb/worker.pb.go b/weed/pb/worker_pb/worker.pb.go index 1502f7867..7677b0837 100644 --- a/weed/pb/worker_pb/worker.pb.go +++ b/weed/pb/worker_pb/worker.pb.go @@ -1331,18 +1331,97 @@ func (x *TaskTarget) GetEstimatedSize() uint64 { return 0 } +// BalanceMoveSpec describes a single volume move within a batch balance job +type BalanceMoveSpec struct { + state protoimpl.MessageState `protogen:"open.v1"` + VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"` // Volume to move + SourceNode string `protobuf:"bytes,2,opt,name=source_node,json=sourceNode,proto3" json:"source_node,omitempty"` // Source server address (host:port) + TargetNode string `protobuf:"bytes,3,opt,name=target_node,json=targetNode,proto3" json:"target_node,omitempty"` // Destination server address (host:port) + Collection string `protobuf:"bytes,4,opt,name=collection,proto3" json:"collection,omitempty"` // Collection name + VolumeSize uint64 `protobuf:"varint,5,opt,name=volume_size,json=volumeSize,proto3" json:"volume_size,omitempty"` // Volume size in bytes (informational) + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *BalanceMoveSpec) Reset() { + *x = BalanceMoveSpec{} + mi := &file_worker_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *BalanceMoveSpec) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BalanceMoveSpec) ProtoMessage() {} + +func (x *BalanceMoveSpec) ProtoReflect() protoreflect.Message { + mi := &file_worker_proto_msgTypes[13] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BalanceMoveSpec.ProtoReflect.Descriptor instead. +func (*BalanceMoveSpec) Descriptor() ([]byte, []int) { + return file_worker_proto_rawDescGZIP(), []int{13} +} + +func (x *BalanceMoveSpec) GetVolumeId() uint32 { + if x != nil { + return x.VolumeId + } + return 0 +} + +func (x *BalanceMoveSpec) GetSourceNode() string { + if x != nil { + return x.SourceNode + } + return "" +} + +func (x *BalanceMoveSpec) GetTargetNode() string { + if x != nil { + return x.TargetNode + } + return "" +} + +func (x *BalanceMoveSpec) GetCollection() string { + if x != nil { + return x.Collection + } + return "" +} + +func (x *BalanceMoveSpec) GetVolumeSize() uint64 { + if x != nil { + return x.VolumeSize + } + return 0 +} + // BalanceTaskParams for volume balancing operations type BalanceTaskParams struct { - state protoimpl.MessageState `protogen:"open.v1"` - ForceMove bool `protobuf:"varint,1,opt,name=force_move,json=forceMove,proto3" json:"force_move,omitempty"` // Force move even with conflicts - TimeoutSeconds int32 `protobuf:"varint,2,opt,name=timeout_seconds,json=timeoutSeconds,proto3" json:"timeout_seconds,omitempty"` // Operation timeout - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + ForceMove bool `protobuf:"varint,1,opt,name=force_move,json=forceMove,proto3" json:"force_move,omitempty"` // Force move even with conflicts + TimeoutSeconds int32 `protobuf:"varint,2,opt,name=timeout_seconds,json=timeoutSeconds,proto3" json:"timeout_seconds,omitempty"` // Operation timeout + MaxConcurrentMoves int32 `protobuf:"varint,3,opt,name=max_concurrent_moves,json=maxConcurrentMoves,proto3" json:"max_concurrent_moves,omitempty"` // Max concurrent moves in a batch job (0 = default 5) + Moves []*BalanceMoveSpec `protobuf:"bytes,4,rep,name=moves,proto3" json:"moves,omitempty"` // Batch: multiple volume moves in one job + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *BalanceTaskParams) Reset() { *x = BalanceTaskParams{} - mi := &file_worker_proto_msgTypes[13] + mi := &file_worker_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1354,7 +1433,7 @@ func (x *BalanceTaskParams) String() string { func (*BalanceTaskParams) ProtoMessage() {} func (x *BalanceTaskParams) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[13] + mi := &file_worker_proto_msgTypes[14] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1367,7 +1446,7 @@ func (x *BalanceTaskParams) ProtoReflect() protoreflect.Message { // Deprecated: Use BalanceTaskParams.ProtoReflect.Descriptor instead. func (*BalanceTaskParams) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{13} + return file_worker_proto_rawDescGZIP(), []int{14} } func (x *BalanceTaskParams) GetForceMove() bool { @@ -1384,6 +1463,20 @@ func (x *BalanceTaskParams) GetTimeoutSeconds() int32 { return 0 } +func (x *BalanceTaskParams) GetMaxConcurrentMoves() int32 { + if x != nil { + return x.MaxConcurrentMoves + } + return 0 +} + +func (x *BalanceTaskParams) GetMoves() []*BalanceMoveSpec { + if x != nil { + return x.Moves + } + return nil +} + // ReplicationTaskParams for adding replicas type ReplicationTaskParams struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -1395,7 +1488,7 @@ type ReplicationTaskParams struct { func (x *ReplicationTaskParams) Reset() { *x = ReplicationTaskParams{} - mi := &file_worker_proto_msgTypes[14] + mi := &file_worker_proto_msgTypes[15] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1407,7 +1500,7 @@ func (x *ReplicationTaskParams) String() string { func (*ReplicationTaskParams) ProtoMessage() {} func (x *ReplicationTaskParams) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[14] + mi := &file_worker_proto_msgTypes[15] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1420,7 +1513,7 @@ func (x *ReplicationTaskParams) ProtoReflect() protoreflect.Message { // Deprecated: Use ReplicationTaskParams.ProtoReflect.Descriptor instead. func (*ReplicationTaskParams) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{14} + return file_worker_proto_rawDescGZIP(), []int{15} } func (x *ReplicationTaskParams) GetReplicaCount() int32 { @@ -1452,7 +1545,7 @@ type TaskUpdate struct { func (x *TaskUpdate) Reset() { *x = TaskUpdate{} - mi := &file_worker_proto_msgTypes[15] + mi := &file_worker_proto_msgTypes[16] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1464,7 +1557,7 @@ func (x *TaskUpdate) String() string { func (*TaskUpdate) ProtoMessage() {} func (x *TaskUpdate) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[15] + mi := &file_worker_proto_msgTypes[16] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1477,7 +1570,7 @@ func (x *TaskUpdate) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskUpdate.ProtoReflect.Descriptor instead. func (*TaskUpdate) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{15} + return file_worker_proto_rawDescGZIP(), []int{16} } func (x *TaskUpdate) GetTaskId() string { @@ -1537,7 +1630,7 @@ type TaskComplete struct { func (x *TaskComplete) Reset() { *x = TaskComplete{} - mi := &file_worker_proto_msgTypes[16] + mi := &file_worker_proto_msgTypes[17] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1549,7 +1642,7 @@ func (x *TaskComplete) String() string { func (*TaskComplete) ProtoMessage() {} func (x *TaskComplete) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[16] + mi := &file_worker_proto_msgTypes[17] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1562,7 +1655,7 @@ func (x *TaskComplete) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskComplete.ProtoReflect.Descriptor instead. func (*TaskComplete) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{16} + return file_worker_proto_rawDescGZIP(), []int{17} } func (x *TaskComplete) GetTaskId() string { @@ -1619,7 +1712,7 @@ type TaskCancellation struct { func (x *TaskCancellation) Reset() { *x = TaskCancellation{} - mi := &file_worker_proto_msgTypes[17] + mi := &file_worker_proto_msgTypes[18] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1631,7 +1724,7 @@ func (x *TaskCancellation) String() string { func (*TaskCancellation) ProtoMessage() {} func (x *TaskCancellation) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[17] + mi := &file_worker_proto_msgTypes[18] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1644,7 +1737,7 @@ func (x *TaskCancellation) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskCancellation.ProtoReflect.Descriptor instead. func (*TaskCancellation) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{17} + return file_worker_proto_rawDescGZIP(), []int{18} } func (x *TaskCancellation) GetTaskId() string { @@ -1680,7 +1773,7 @@ type WorkerShutdown struct { func (x *WorkerShutdown) Reset() { *x = WorkerShutdown{} - mi := &file_worker_proto_msgTypes[18] + mi := &file_worker_proto_msgTypes[19] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1692,7 +1785,7 @@ func (x *WorkerShutdown) String() string { func (*WorkerShutdown) ProtoMessage() {} func (x *WorkerShutdown) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[18] + mi := &file_worker_proto_msgTypes[19] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1705,7 +1798,7 @@ func (x *WorkerShutdown) ProtoReflect() protoreflect.Message { // Deprecated: Use WorkerShutdown.ProtoReflect.Descriptor instead. func (*WorkerShutdown) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{18} + return file_worker_proto_rawDescGZIP(), []int{19} } func (x *WorkerShutdown) GetWorkerId() string { @@ -1740,7 +1833,7 @@ type AdminShutdown struct { func (x *AdminShutdown) Reset() { *x = AdminShutdown{} - mi := &file_worker_proto_msgTypes[19] + mi := &file_worker_proto_msgTypes[20] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1752,7 +1845,7 @@ func (x *AdminShutdown) String() string { func (*AdminShutdown) ProtoMessage() {} func (x *AdminShutdown) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[19] + mi := &file_worker_proto_msgTypes[20] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1765,7 +1858,7 @@ func (x *AdminShutdown) ProtoReflect() protoreflect.Message { // Deprecated: Use AdminShutdown.ProtoReflect.Descriptor instead. func (*AdminShutdown) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{19} + return file_worker_proto_rawDescGZIP(), []int{20} } func (x *AdminShutdown) GetReason() string { @@ -1798,7 +1891,7 @@ type TaskLogRequest struct { func (x *TaskLogRequest) Reset() { *x = TaskLogRequest{} - mi := &file_worker_proto_msgTypes[20] + mi := &file_worker_proto_msgTypes[21] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1810,7 +1903,7 @@ func (x *TaskLogRequest) String() string { func (*TaskLogRequest) ProtoMessage() {} func (x *TaskLogRequest) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[20] + mi := &file_worker_proto_msgTypes[21] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1823,7 +1916,7 @@ func (x *TaskLogRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskLogRequest.ProtoReflect.Descriptor instead. func (*TaskLogRequest) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{20} + return file_worker_proto_rawDescGZIP(), []int{21} } func (x *TaskLogRequest) GetTaskId() string { @@ -1890,7 +1983,7 @@ type TaskLogResponse struct { func (x *TaskLogResponse) Reset() { *x = TaskLogResponse{} - mi := &file_worker_proto_msgTypes[21] + mi := &file_worker_proto_msgTypes[22] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1902,7 +1995,7 @@ func (x *TaskLogResponse) String() string { func (*TaskLogResponse) ProtoMessage() {} func (x *TaskLogResponse) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[21] + mi := &file_worker_proto_msgTypes[22] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1915,7 +2008,7 @@ func (x *TaskLogResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskLogResponse.ProtoReflect.Descriptor instead. func (*TaskLogResponse) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{21} + return file_worker_proto_rawDescGZIP(), []int{22} } func (x *TaskLogResponse) GetTaskId() string { @@ -1983,7 +2076,7 @@ type TaskLogMetadata struct { func (x *TaskLogMetadata) Reset() { *x = TaskLogMetadata{} - mi := &file_worker_proto_msgTypes[22] + mi := &file_worker_proto_msgTypes[23] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1995,7 +2088,7 @@ func (x *TaskLogMetadata) String() string { func (*TaskLogMetadata) ProtoMessage() {} func (x *TaskLogMetadata) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[22] + mi := &file_worker_proto_msgTypes[23] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2008,7 +2101,7 @@ func (x *TaskLogMetadata) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskLogMetadata.ProtoReflect.Descriptor instead. func (*TaskLogMetadata) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{22} + return file_worker_proto_rawDescGZIP(), []int{23} } func (x *TaskLogMetadata) GetTaskId() string { @@ -2124,7 +2217,7 @@ type TaskLogEntry struct { func (x *TaskLogEntry) Reset() { *x = TaskLogEntry{} - mi := &file_worker_proto_msgTypes[23] + mi := &file_worker_proto_msgTypes[24] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2136,7 +2229,7 @@ func (x *TaskLogEntry) String() string { func (*TaskLogEntry) ProtoMessage() {} func (x *TaskLogEntry) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[23] + mi := &file_worker_proto_msgTypes[24] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2149,7 +2242,7 @@ func (x *TaskLogEntry) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskLogEntry.ProtoReflect.Descriptor instead. func (*TaskLogEntry) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{23} + return file_worker_proto_rawDescGZIP(), []int{24} } func (x *TaskLogEntry) GetTimestamp() int64 { @@ -2212,7 +2305,7 @@ type MaintenanceConfig struct { func (x *MaintenanceConfig) Reset() { *x = MaintenanceConfig{} - mi := &file_worker_proto_msgTypes[24] + mi := &file_worker_proto_msgTypes[25] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2224,7 +2317,7 @@ func (x *MaintenanceConfig) String() string { func (*MaintenanceConfig) ProtoMessage() {} func (x *MaintenanceConfig) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[24] + mi := &file_worker_proto_msgTypes[25] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2237,7 +2330,7 @@ func (x *MaintenanceConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use MaintenanceConfig.ProtoReflect.Descriptor instead. func (*MaintenanceConfig) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{24} + return file_worker_proto_rawDescGZIP(), []int{25} } func (x *MaintenanceConfig) GetEnabled() bool { @@ -2316,7 +2409,7 @@ type MaintenancePolicy struct { func (x *MaintenancePolicy) Reset() { *x = MaintenancePolicy{} - mi := &file_worker_proto_msgTypes[25] + mi := &file_worker_proto_msgTypes[26] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2328,7 +2421,7 @@ func (x *MaintenancePolicy) String() string { func (*MaintenancePolicy) ProtoMessage() {} func (x *MaintenancePolicy) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[25] + mi := &file_worker_proto_msgTypes[26] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2341,7 +2434,7 @@ func (x *MaintenancePolicy) ProtoReflect() protoreflect.Message { // Deprecated: Use MaintenancePolicy.ProtoReflect.Descriptor instead. func (*MaintenancePolicy) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{25} + return file_worker_proto_rawDescGZIP(), []int{26} } func (x *MaintenancePolicy) GetTaskPolicies() map[string]*TaskPolicy { @@ -2394,7 +2487,7 @@ type TaskPolicy struct { func (x *TaskPolicy) Reset() { *x = TaskPolicy{} - mi := &file_worker_proto_msgTypes[26] + mi := &file_worker_proto_msgTypes[27] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2406,7 +2499,7 @@ func (x *TaskPolicy) String() string { func (*TaskPolicy) ProtoMessage() {} func (x *TaskPolicy) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[26] + mi := &file_worker_proto_msgTypes[27] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2419,7 +2512,7 @@ func (x *TaskPolicy) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskPolicy.ProtoReflect.Descriptor instead. func (*TaskPolicy) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{26} + return file_worker_proto_rawDescGZIP(), []int{27} } func (x *TaskPolicy) GetEnabled() bool { @@ -2533,7 +2626,7 @@ type VacuumTaskConfig struct { func (x *VacuumTaskConfig) Reset() { *x = VacuumTaskConfig{} - mi := &file_worker_proto_msgTypes[27] + mi := &file_worker_proto_msgTypes[28] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2545,7 +2638,7 @@ func (x *VacuumTaskConfig) String() string { func (*VacuumTaskConfig) ProtoMessage() {} func (x *VacuumTaskConfig) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[27] + mi := &file_worker_proto_msgTypes[28] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2558,7 +2651,7 @@ func (x *VacuumTaskConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use VacuumTaskConfig.ProtoReflect.Descriptor instead. func (*VacuumTaskConfig) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{27} + return file_worker_proto_rawDescGZIP(), []int{28} } func (x *VacuumTaskConfig) GetGarbageThreshold() float64 { @@ -2596,7 +2689,7 @@ type ErasureCodingTaskConfig struct { func (x *ErasureCodingTaskConfig) Reset() { *x = ErasureCodingTaskConfig{} - mi := &file_worker_proto_msgTypes[28] + mi := &file_worker_proto_msgTypes[29] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2608,7 +2701,7 @@ func (x *ErasureCodingTaskConfig) String() string { func (*ErasureCodingTaskConfig) ProtoMessage() {} func (x *ErasureCodingTaskConfig) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[28] + mi := &file_worker_proto_msgTypes[29] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2621,7 +2714,7 @@ func (x *ErasureCodingTaskConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use ErasureCodingTaskConfig.ProtoReflect.Descriptor instead. func (*ErasureCodingTaskConfig) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{28} + return file_worker_proto_rawDescGZIP(), []int{29} } func (x *ErasureCodingTaskConfig) GetFullnessRatio() float64 { @@ -2670,7 +2763,7 @@ type BalanceTaskConfig struct { func (x *BalanceTaskConfig) Reset() { *x = BalanceTaskConfig{} - mi := &file_worker_proto_msgTypes[29] + mi := &file_worker_proto_msgTypes[30] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2682,7 +2775,7 @@ func (x *BalanceTaskConfig) String() string { func (*BalanceTaskConfig) ProtoMessage() {} func (x *BalanceTaskConfig) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[29] + mi := &file_worker_proto_msgTypes[30] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2695,7 +2788,7 @@ func (x *BalanceTaskConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use BalanceTaskConfig.ProtoReflect.Descriptor instead. func (*BalanceTaskConfig) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{29} + return file_worker_proto_rawDescGZIP(), []int{30} } func (x *BalanceTaskConfig) GetImbalanceThreshold() float64 { @@ -2722,7 +2815,7 @@ type ReplicationTaskConfig struct { func (x *ReplicationTaskConfig) Reset() { *x = ReplicationTaskConfig{} - mi := &file_worker_proto_msgTypes[30] + mi := &file_worker_proto_msgTypes[31] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2734,7 +2827,7 @@ func (x *ReplicationTaskConfig) String() string { func (*ReplicationTaskConfig) ProtoMessage() {} func (x *ReplicationTaskConfig) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[30] + mi := &file_worker_proto_msgTypes[31] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2747,7 +2840,7 @@ func (x *ReplicationTaskConfig) ProtoReflect() protoreflect.Message { // Deprecated: Use ReplicationTaskConfig.ProtoReflect.Descriptor instead. func (*ReplicationTaskConfig) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{30} + return file_worker_proto_rawDescGZIP(), []int{31} } func (x *ReplicationTaskConfig) GetTargetReplicaCount() int32 { @@ -2791,7 +2884,7 @@ type MaintenanceTaskData struct { func (x *MaintenanceTaskData) Reset() { *x = MaintenanceTaskData{} - mi := &file_worker_proto_msgTypes[31] + mi := &file_worker_proto_msgTypes[32] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2803,7 +2896,7 @@ func (x *MaintenanceTaskData) String() string { func (*MaintenanceTaskData) ProtoMessage() {} func (x *MaintenanceTaskData) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[31] + mi := &file_worker_proto_msgTypes[32] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2816,7 +2909,7 @@ func (x *MaintenanceTaskData) ProtoReflect() protoreflect.Message { // Deprecated: Use MaintenanceTaskData.ProtoReflect.Descriptor instead. func (*MaintenanceTaskData) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{31} + return file_worker_proto_rawDescGZIP(), []int{32} } func (x *MaintenanceTaskData) GetId() string { @@ -3001,7 +3094,7 @@ type TaskAssignmentRecord struct { func (x *TaskAssignmentRecord) Reset() { *x = TaskAssignmentRecord{} - mi := &file_worker_proto_msgTypes[32] + mi := &file_worker_proto_msgTypes[33] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3013,7 +3106,7 @@ func (x *TaskAssignmentRecord) String() string { func (*TaskAssignmentRecord) ProtoMessage() {} func (x *TaskAssignmentRecord) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[32] + mi := &file_worker_proto_msgTypes[33] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3026,7 +3119,7 @@ func (x *TaskAssignmentRecord) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskAssignmentRecord.ProtoReflect.Descriptor instead. func (*TaskAssignmentRecord) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{32} + return file_worker_proto_rawDescGZIP(), []int{33} } func (x *TaskAssignmentRecord) GetWorkerId() string { @@ -3078,7 +3171,7 @@ type TaskCreationMetrics struct { func (x *TaskCreationMetrics) Reset() { *x = TaskCreationMetrics{} - mi := &file_worker_proto_msgTypes[33] + mi := &file_worker_proto_msgTypes[34] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3090,7 +3183,7 @@ func (x *TaskCreationMetrics) String() string { func (*TaskCreationMetrics) ProtoMessage() {} func (x *TaskCreationMetrics) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[33] + mi := &file_worker_proto_msgTypes[34] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3103,7 +3196,7 @@ func (x *TaskCreationMetrics) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskCreationMetrics.ProtoReflect.Descriptor instead. func (*TaskCreationMetrics) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{33} + return file_worker_proto_rawDescGZIP(), []int{34} } func (x *TaskCreationMetrics) GetTriggerMetric() string { @@ -3160,7 +3253,7 @@ type VolumeHealthMetrics struct { func (x *VolumeHealthMetrics) Reset() { *x = VolumeHealthMetrics{} - mi := &file_worker_proto_msgTypes[34] + mi := &file_worker_proto_msgTypes[35] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3172,7 +3265,7 @@ func (x *VolumeHealthMetrics) String() string { func (*VolumeHealthMetrics) ProtoMessage() {} func (x *VolumeHealthMetrics) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[34] + mi := &file_worker_proto_msgTypes[35] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3185,7 +3278,7 @@ func (x *VolumeHealthMetrics) ProtoReflect() protoreflect.Message { // Deprecated: Use VolumeHealthMetrics.ProtoReflect.Descriptor instead. func (*VolumeHealthMetrics) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{34} + return file_worker_proto_rawDescGZIP(), []int{35} } func (x *VolumeHealthMetrics) GetTotalSize() uint64 { @@ -3270,7 +3363,7 @@ type TaskStateFile struct { func (x *TaskStateFile) Reset() { *x = TaskStateFile{} - mi := &file_worker_proto_msgTypes[35] + mi := &file_worker_proto_msgTypes[36] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3282,7 +3375,7 @@ func (x *TaskStateFile) String() string { func (*TaskStateFile) ProtoMessage() {} func (x *TaskStateFile) ProtoReflect() protoreflect.Message { - mi := &file_worker_proto_msgTypes[35] + mi := &file_worker_proto_msgTypes[36] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3295,7 +3388,7 @@ func (x *TaskStateFile) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskStateFile.ProtoReflect.Descriptor instead. func (*TaskStateFile) Descriptor() ([]byte, []int) { - return file_worker_proto_rawDescGZIP(), []int{35} + return file_worker_proto_rawDescGZIP(), []int{36} } func (x *TaskStateFile) GetTask() *MaintenanceTaskData { @@ -3441,11 +3534,24 @@ const file_worker_proto_rawDesc = "" + "dataCenter\x12\x1b\n" + "\tvolume_id\x18\x05 \x01(\rR\bvolumeId\x12\x1b\n" + "\tshard_ids\x18\x06 \x03(\rR\bshardIds\x12%\n" + - "\x0eestimated_size\x18\a \x01(\x04R\restimatedSize\"[\n" + + "\x0eestimated_size\x18\a \x01(\x04R\restimatedSize\"\xb1\x01\n" + + "\x0fBalanceMoveSpec\x12\x1b\n" + + "\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x1f\n" + + "\vsource_node\x18\x02 \x01(\tR\n" + + "sourceNode\x12\x1f\n" + + "\vtarget_node\x18\x03 \x01(\tR\n" + + "targetNode\x12\x1e\n" + + "\n" + + "collection\x18\x04 \x01(\tR\n" + + "collection\x12\x1f\n" + + "\vvolume_size\x18\x05 \x01(\x04R\n" + + "volumeSize\"\xbf\x01\n" + "\x11BalanceTaskParams\x12\x1d\n" + "\n" + "force_move\x18\x01 \x01(\bR\tforceMove\x12'\n" + - "\x0ftimeout_seconds\x18\x02 \x01(\x05R\x0etimeoutSeconds\"k\n" + + "\x0ftimeout_seconds\x18\x02 \x01(\x05R\x0etimeoutSeconds\x120\n" + + "\x14max_concurrent_moves\x18\x03 \x01(\x05R\x12maxConcurrentMoves\x120\n" + + "\x05moves\x18\x04 \x03(\v2\x1a.worker_pb.BalanceMoveSpecR\x05moves\"k\n" + "\x15ReplicationTaskParams\x12#\n" + "\rreplica_count\x18\x01 \x01(\x05R\freplicaCount\x12-\n" + "\x12verify_consistency\x18\x02 \x01(\bR\x11verifyConsistency\"\x8e\x02\n" + @@ -3667,7 +3773,7 @@ func file_worker_proto_rawDescGZIP() []byte { return file_worker_proto_rawDescData } -var file_worker_proto_msgTypes = make([]protoimpl.MessageInfo, 45) +var file_worker_proto_msgTypes = make([]protoimpl.MessageInfo, 46) var file_worker_proto_goTypes = []any{ (*WorkerMessage)(nil), // 0: worker_pb.WorkerMessage (*AdminMessage)(nil), // 1: worker_pb.AdminMessage @@ -3682,89 +3788,91 @@ var file_worker_proto_goTypes = []any{ (*ErasureCodingTaskParams)(nil), // 10: worker_pb.ErasureCodingTaskParams (*TaskSource)(nil), // 11: worker_pb.TaskSource (*TaskTarget)(nil), // 12: worker_pb.TaskTarget - (*BalanceTaskParams)(nil), // 13: worker_pb.BalanceTaskParams - (*ReplicationTaskParams)(nil), // 14: worker_pb.ReplicationTaskParams - (*TaskUpdate)(nil), // 15: worker_pb.TaskUpdate - (*TaskComplete)(nil), // 16: worker_pb.TaskComplete - (*TaskCancellation)(nil), // 17: worker_pb.TaskCancellation - (*WorkerShutdown)(nil), // 18: worker_pb.WorkerShutdown - (*AdminShutdown)(nil), // 19: worker_pb.AdminShutdown - (*TaskLogRequest)(nil), // 20: worker_pb.TaskLogRequest - (*TaskLogResponse)(nil), // 21: worker_pb.TaskLogResponse - (*TaskLogMetadata)(nil), // 22: worker_pb.TaskLogMetadata - (*TaskLogEntry)(nil), // 23: worker_pb.TaskLogEntry - (*MaintenanceConfig)(nil), // 24: worker_pb.MaintenanceConfig - (*MaintenancePolicy)(nil), // 25: worker_pb.MaintenancePolicy - (*TaskPolicy)(nil), // 26: worker_pb.TaskPolicy - (*VacuumTaskConfig)(nil), // 27: worker_pb.VacuumTaskConfig - (*ErasureCodingTaskConfig)(nil), // 28: worker_pb.ErasureCodingTaskConfig - (*BalanceTaskConfig)(nil), // 29: worker_pb.BalanceTaskConfig - (*ReplicationTaskConfig)(nil), // 30: worker_pb.ReplicationTaskConfig - (*MaintenanceTaskData)(nil), // 31: worker_pb.MaintenanceTaskData - (*TaskAssignmentRecord)(nil), // 32: worker_pb.TaskAssignmentRecord - (*TaskCreationMetrics)(nil), // 33: worker_pb.TaskCreationMetrics - (*VolumeHealthMetrics)(nil), // 34: worker_pb.VolumeHealthMetrics - (*TaskStateFile)(nil), // 35: worker_pb.TaskStateFile - nil, // 36: worker_pb.WorkerRegistration.MetadataEntry - nil, // 37: worker_pb.TaskAssignment.MetadataEntry - nil, // 38: worker_pb.TaskUpdate.MetadataEntry - nil, // 39: worker_pb.TaskComplete.ResultMetadataEntry - nil, // 40: worker_pb.TaskLogMetadata.CustomDataEntry - nil, // 41: worker_pb.TaskLogEntry.FieldsEntry - nil, // 42: worker_pb.MaintenancePolicy.TaskPoliciesEntry - nil, // 43: worker_pb.MaintenanceTaskData.TagsEntry - nil, // 44: worker_pb.TaskCreationMetrics.AdditionalDataEntry + (*BalanceMoveSpec)(nil), // 13: worker_pb.BalanceMoveSpec + (*BalanceTaskParams)(nil), // 14: worker_pb.BalanceTaskParams + (*ReplicationTaskParams)(nil), // 15: worker_pb.ReplicationTaskParams + (*TaskUpdate)(nil), // 16: worker_pb.TaskUpdate + (*TaskComplete)(nil), // 17: worker_pb.TaskComplete + (*TaskCancellation)(nil), // 18: worker_pb.TaskCancellation + (*WorkerShutdown)(nil), // 19: worker_pb.WorkerShutdown + (*AdminShutdown)(nil), // 20: worker_pb.AdminShutdown + (*TaskLogRequest)(nil), // 21: worker_pb.TaskLogRequest + (*TaskLogResponse)(nil), // 22: worker_pb.TaskLogResponse + (*TaskLogMetadata)(nil), // 23: worker_pb.TaskLogMetadata + (*TaskLogEntry)(nil), // 24: worker_pb.TaskLogEntry + (*MaintenanceConfig)(nil), // 25: worker_pb.MaintenanceConfig + (*MaintenancePolicy)(nil), // 26: worker_pb.MaintenancePolicy + (*TaskPolicy)(nil), // 27: worker_pb.TaskPolicy + (*VacuumTaskConfig)(nil), // 28: worker_pb.VacuumTaskConfig + (*ErasureCodingTaskConfig)(nil), // 29: worker_pb.ErasureCodingTaskConfig + (*BalanceTaskConfig)(nil), // 30: worker_pb.BalanceTaskConfig + (*ReplicationTaskConfig)(nil), // 31: worker_pb.ReplicationTaskConfig + (*MaintenanceTaskData)(nil), // 32: worker_pb.MaintenanceTaskData + (*TaskAssignmentRecord)(nil), // 33: worker_pb.TaskAssignmentRecord + (*TaskCreationMetrics)(nil), // 34: worker_pb.TaskCreationMetrics + (*VolumeHealthMetrics)(nil), // 35: worker_pb.VolumeHealthMetrics + (*TaskStateFile)(nil), // 36: worker_pb.TaskStateFile + nil, // 37: worker_pb.WorkerRegistration.MetadataEntry + nil, // 38: worker_pb.TaskAssignment.MetadataEntry + nil, // 39: worker_pb.TaskUpdate.MetadataEntry + nil, // 40: worker_pb.TaskComplete.ResultMetadataEntry + nil, // 41: worker_pb.TaskLogMetadata.CustomDataEntry + nil, // 42: worker_pb.TaskLogEntry.FieldsEntry + nil, // 43: worker_pb.MaintenancePolicy.TaskPoliciesEntry + nil, // 44: worker_pb.MaintenanceTaskData.TagsEntry + nil, // 45: worker_pb.TaskCreationMetrics.AdditionalDataEntry } var file_worker_proto_depIdxs = []int32{ 2, // 0: worker_pb.WorkerMessage.registration:type_name -> worker_pb.WorkerRegistration 4, // 1: worker_pb.WorkerMessage.heartbeat:type_name -> worker_pb.WorkerHeartbeat 6, // 2: worker_pb.WorkerMessage.task_request:type_name -> worker_pb.TaskRequest - 15, // 3: worker_pb.WorkerMessage.task_update:type_name -> worker_pb.TaskUpdate - 16, // 4: worker_pb.WorkerMessage.task_complete:type_name -> worker_pb.TaskComplete - 18, // 5: worker_pb.WorkerMessage.shutdown:type_name -> worker_pb.WorkerShutdown - 21, // 6: worker_pb.WorkerMessage.task_log_response:type_name -> worker_pb.TaskLogResponse + 16, // 3: worker_pb.WorkerMessage.task_update:type_name -> worker_pb.TaskUpdate + 17, // 4: worker_pb.WorkerMessage.task_complete:type_name -> worker_pb.TaskComplete + 19, // 5: worker_pb.WorkerMessage.shutdown:type_name -> worker_pb.WorkerShutdown + 22, // 6: worker_pb.WorkerMessage.task_log_response:type_name -> worker_pb.TaskLogResponse 3, // 7: worker_pb.AdminMessage.registration_response:type_name -> worker_pb.RegistrationResponse 5, // 8: worker_pb.AdminMessage.heartbeat_response:type_name -> worker_pb.HeartbeatResponse 7, // 9: worker_pb.AdminMessage.task_assignment:type_name -> worker_pb.TaskAssignment - 17, // 10: worker_pb.AdminMessage.task_cancellation:type_name -> worker_pb.TaskCancellation - 19, // 11: worker_pb.AdminMessage.admin_shutdown:type_name -> worker_pb.AdminShutdown - 20, // 12: worker_pb.AdminMessage.task_log_request:type_name -> worker_pb.TaskLogRequest - 36, // 13: worker_pb.WorkerRegistration.metadata:type_name -> worker_pb.WorkerRegistration.MetadataEntry + 18, // 10: worker_pb.AdminMessage.task_cancellation:type_name -> worker_pb.TaskCancellation + 20, // 11: worker_pb.AdminMessage.admin_shutdown:type_name -> worker_pb.AdminShutdown + 21, // 12: worker_pb.AdminMessage.task_log_request:type_name -> worker_pb.TaskLogRequest + 37, // 13: worker_pb.WorkerRegistration.metadata:type_name -> worker_pb.WorkerRegistration.MetadataEntry 8, // 14: worker_pb.TaskAssignment.params:type_name -> worker_pb.TaskParams - 37, // 15: worker_pb.TaskAssignment.metadata:type_name -> worker_pb.TaskAssignment.MetadataEntry + 38, // 15: worker_pb.TaskAssignment.metadata:type_name -> worker_pb.TaskAssignment.MetadataEntry 11, // 16: worker_pb.TaskParams.sources:type_name -> worker_pb.TaskSource 12, // 17: worker_pb.TaskParams.targets:type_name -> worker_pb.TaskTarget 9, // 18: worker_pb.TaskParams.vacuum_params:type_name -> worker_pb.VacuumTaskParams 10, // 19: worker_pb.TaskParams.erasure_coding_params:type_name -> worker_pb.ErasureCodingTaskParams - 13, // 20: worker_pb.TaskParams.balance_params:type_name -> worker_pb.BalanceTaskParams - 14, // 21: worker_pb.TaskParams.replication_params:type_name -> worker_pb.ReplicationTaskParams - 38, // 22: worker_pb.TaskUpdate.metadata:type_name -> worker_pb.TaskUpdate.MetadataEntry - 39, // 23: worker_pb.TaskComplete.result_metadata:type_name -> worker_pb.TaskComplete.ResultMetadataEntry - 22, // 24: worker_pb.TaskLogResponse.metadata:type_name -> worker_pb.TaskLogMetadata - 23, // 25: worker_pb.TaskLogResponse.log_entries:type_name -> worker_pb.TaskLogEntry - 40, // 26: worker_pb.TaskLogMetadata.custom_data:type_name -> worker_pb.TaskLogMetadata.CustomDataEntry - 41, // 27: worker_pb.TaskLogEntry.fields:type_name -> worker_pb.TaskLogEntry.FieldsEntry - 25, // 28: worker_pb.MaintenanceConfig.policy:type_name -> worker_pb.MaintenancePolicy - 42, // 29: worker_pb.MaintenancePolicy.task_policies:type_name -> worker_pb.MaintenancePolicy.TaskPoliciesEntry - 27, // 30: worker_pb.TaskPolicy.vacuum_config:type_name -> worker_pb.VacuumTaskConfig - 28, // 31: worker_pb.TaskPolicy.erasure_coding_config:type_name -> worker_pb.ErasureCodingTaskConfig - 29, // 32: worker_pb.TaskPolicy.balance_config:type_name -> worker_pb.BalanceTaskConfig - 30, // 33: worker_pb.TaskPolicy.replication_config:type_name -> worker_pb.ReplicationTaskConfig - 8, // 34: worker_pb.MaintenanceTaskData.typed_params:type_name -> worker_pb.TaskParams - 32, // 35: worker_pb.MaintenanceTaskData.assignment_history:type_name -> worker_pb.TaskAssignmentRecord - 43, // 36: worker_pb.MaintenanceTaskData.tags:type_name -> worker_pb.MaintenanceTaskData.TagsEntry - 33, // 37: worker_pb.MaintenanceTaskData.creation_metrics:type_name -> worker_pb.TaskCreationMetrics - 34, // 38: worker_pb.TaskCreationMetrics.volume_metrics:type_name -> worker_pb.VolumeHealthMetrics - 44, // 39: worker_pb.TaskCreationMetrics.additional_data:type_name -> worker_pb.TaskCreationMetrics.AdditionalDataEntry - 31, // 40: worker_pb.TaskStateFile.task:type_name -> worker_pb.MaintenanceTaskData - 26, // 41: worker_pb.MaintenancePolicy.TaskPoliciesEntry.value:type_name -> worker_pb.TaskPolicy - 0, // 42: worker_pb.WorkerService.WorkerStream:input_type -> worker_pb.WorkerMessage - 1, // 43: worker_pb.WorkerService.WorkerStream:output_type -> worker_pb.AdminMessage - 43, // [43:44] is the sub-list for method output_type - 42, // [42:43] is the sub-list for method input_type - 42, // [42:42] is the sub-list for extension type_name - 42, // [42:42] is the sub-list for extension extendee - 0, // [0:42] is the sub-list for field type_name + 14, // 20: worker_pb.TaskParams.balance_params:type_name -> worker_pb.BalanceTaskParams + 15, // 21: worker_pb.TaskParams.replication_params:type_name -> worker_pb.ReplicationTaskParams + 13, // 22: worker_pb.BalanceTaskParams.moves:type_name -> worker_pb.BalanceMoveSpec + 39, // 23: worker_pb.TaskUpdate.metadata:type_name -> worker_pb.TaskUpdate.MetadataEntry + 40, // 24: worker_pb.TaskComplete.result_metadata:type_name -> worker_pb.TaskComplete.ResultMetadataEntry + 23, // 25: worker_pb.TaskLogResponse.metadata:type_name -> worker_pb.TaskLogMetadata + 24, // 26: worker_pb.TaskLogResponse.log_entries:type_name -> worker_pb.TaskLogEntry + 41, // 27: worker_pb.TaskLogMetadata.custom_data:type_name -> worker_pb.TaskLogMetadata.CustomDataEntry + 42, // 28: worker_pb.TaskLogEntry.fields:type_name -> worker_pb.TaskLogEntry.FieldsEntry + 26, // 29: worker_pb.MaintenanceConfig.policy:type_name -> worker_pb.MaintenancePolicy + 43, // 30: worker_pb.MaintenancePolicy.task_policies:type_name -> worker_pb.MaintenancePolicy.TaskPoliciesEntry + 28, // 31: worker_pb.TaskPolicy.vacuum_config:type_name -> worker_pb.VacuumTaskConfig + 29, // 32: worker_pb.TaskPolicy.erasure_coding_config:type_name -> worker_pb.ErasureCodingTaskConfig + 30, // 33: worker_pb.TaskPolicy.balance_config:type_name -> worker_pb.BalanceTaskConfig + 31, // 34: worker_pb.TaskPolicy.replication_config:type_name -> worker_pb.ReplicationTaskConfig + 8, // 35: worker_pb.MaintenanceTaskData.typed_params:type_name -> worker_pb.TaskParams + 33, // 36: worker_pb.MaintenanceTaskData.assignment_history:type_name -> worker_pb.TaskAssignmentRecord + 44, // 37: worker_pb.MaintenanceTaskData.tags:type_name -> worker_pb.MaintenanceTaskData.TagsEntry + 34, // 38: worker_pb.MaintenanceTaskData.creation_metrics:type_name -> worker_pb.TaskCreationMetrics + 35, // 39: worker_pb.TaskCreationMetrics.volume_metrics:type_name -> worker_pb.VolumeHealthMetrics + 45, // 40: worker_pb.TaskCreationMetrics.additional_data:type_name -> worker_pb.TaskCreationMetrics.AdditionalDataEntry + 32, // 41: worker_pb.TaskStateFile.task:type_name -> worker_pb.MaintenanceTaskData + 27, // 42: worker_pb.MaintenancePolicy.TaskPoliciesEntry.value:type_name -> worker_pb.TaskPolicy + 0, // 43: worker_pb.WorkerService.WorkerStream:input_type -> worker_pb.WorkerMessage + 1, // 44: worker_pb.WorkerService.WorkerStream:output_type -> worker_pb.AdminMessage + 44, // [44:45] is the sub-list for method output_type + 43, // [43:44] is the sub-list for method input_type + 43, // [43:43] is the sub-list for extension type_name + 43, // [43:43] is the sub-list for extension extendee + 0, // [0:43] is the sub-list for field type_name } func init() { file_worker_proto_init() } @@ -3795,7 +3903,7 @@ func file_worker_proto_init() { (*TaskParams_BalanceParams)(nil), (*TaskParams_ReplicationParams)(nil), } - file_worker_proto_msgTypes[26].OneofWrappers = []any{ + file_worker_proto_msgTypes[27].OneofWrappers = []any{ (*TaskPolicy_VacuumConfig)(nil), (*TaskPolicy_ErasureCodingConfig)(nil), (*TaskPolicy_BalanceConfig)(nil), @@ -3807,7 +3915,7 @@ func file_worker_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_worker_proto_rawDesc), len(file_worker_proto_rawDesc)), NumEnums: 0, - NumMessages: 45, + NumMessages: 46, NumExtensions: 0, NumServices: 1, }, diff --git a/weed/plugin/worker/admin_script_handler.go b/weed/plugin/worker/admin_script_handler.go index 5338a1162..8aee33a3c 100644 --- a/weed/plugin/worker/admin_script_handler.go +++ b/weed/plugin/worker/admin_script_handler.go @@ -284,6 +284,7 @@ func (h *AdminScriptHandler) Execute(ctx context.Context, request *plugin_pb.Exe _, _ = fmt.Fprintf(output, "$ %s\n", commandLine) found := false + sendBroken := false for _, command := range shell.Commands { if command.Name() != cmd.Name { continue @@ -292,7 +293,7 @@ func (h *AdminScriptHandler) Execute(ctx context.Context, request *plugin_pb.Exe if err := command.Do(cmd.Args, commandEnv, output); err != nil { msg := fmt.Sprintf("%s: %v", cmd.Name, err) errorMessages = append(errorMessages, msg) - _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ + if sendErr := sender.SendProgress(&plugin_pb.JobProgressUpdate{ State: plugin_pb.JobState_JOB_STATE_RUNNING, ProgressPercent: percentProgress(executed+1, len(execCommands)), Stage: "error", @@ -300,15 +301,20 @@ func (h *AdminScriptHandler) Execute(ctx context.Context, request *plugin_pb.Exe Activities: []*plugin_pb.ActivityEvent{ BuildExecutorActivity("error", msg), }, - }) + }); sendErr != nil { + sendBroken = true + } } break } + if sendBroken { + break + } if !found { msg := fmt.Sprintf("unknown admin command: %s", cmd.Name) errorMessages = append(errorMessages, msg) - _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ + if sendErr := sender.SendProgress(&plugin_pb.JobProgressUpdate{ State: plugin_pb.JobState_JOB_STATE_RUNNING, ProgressPercent: percentProgress(executed+1, len(execCommands)), Stage: "error", @@ -316,12 +322,14 @@ func (h *AdminScriptHandler) Execute(ctx context.Context, request *plugin_pb.Exe Activities: []*plugin_pb.ActivityEvent{ BuildExecutorActivity("error", msg), }, - }) + }); sendErr != nil { + break + } } executed++ progress := percentProgress(executed, len(execCommands)) - _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ + if sendErr := sender.SendProgress(&plugin_pb.JobProgressUpdate{ State: plugin_pb.JobState_JOB_STATE_RUNNING, ProgressPercent: progress, Stage: "running", @@ -329,7 +337,9 @@ func (h *AdminScriptHandler) Execute(ctx context.Context, request *plugin_pb.Exe Activities: []*plugin_pb.ActivityEvent{ BuildExecutorActivity("running", commandLine), }, - }) + }); sendErr != nil { + break + } } scriptHash := hashAdminScript(script) diff --git a/weed/plugin/worker/erasure_coding_handler.go b/weed/plugin/worker/erasure_coding_handler.go index 881181ae6..d5f32c3f4 100644 --- a/weed/plugin/worker/erasure_coding_handler.go +++ b/weed/plugin/worker/erasure_coding_handler.go @@ -518,12 +518,14 @@ func (h *ErasureCodingHandler) Execute( params.Collection, h.grpcDialOption, ) + execCtx, execCancel := context.WithCancel(ctx) + defer execCancel() task.SetProgressCallback(func(progress float64, stage string) { message := fmt.Sprintf("erasure coding progress %.0f%%", progress) if strings.TrimSpace(stage) != "" { message = stage } - _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ + if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{ JobId: request.Job.JobId, JobType: request.Job.JobType, State: plugin_pb.JobState_JOB_STATE_RUNNING, @@ -533,7 +535,9 @@ func (h *ErasureCodingHandler) Execute( Activities: []*plugin_pb.ActivityEvent{ BuildExecutorActivity(stage, message), }, - }) + }); err != nil { + execCancel() + } }) if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{ @@ -550,7 +554,7 @@ func (h *ErasureCodingHandler) Execute( return err } - if err := task.Execute(ctx, params); err != nil { + if err := task.Execute(execCtx, params); err != nil { _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ JobId: request.Job.JobId, JobType: request.Job.JobType, diff --git a/weed/plugin/worker/vacuum_handler.go b/weed/plugin/worker/vacuum_handler.go index c520e7efc..d1a93d299 100644 --- a/weed/plugin/worker/vacuum_handler.go +++ b/weed/plugin/worker/vacuum_handler.go @@ -427,12 +427,14 @@ func (h *VacuumHandler) Execute(ctx context.Context, request *plugin_pb.ExecuteJ params.Collection, h.grpcDialOption, ) + execCtx, execCancel := context.WithCancel(ctx) + defer execCancel() task.SetProgressCallback(func(progress float64, stage string) { message := fmt.Sprintf("vacuum progress %.0f%%", progress) if strings.TrimSpace(stage) != "" { message = stage } - _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ + if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{ JobId: request.Job.JobId, JobType: request.Job.JobType, State: plugin_pb.JobState_JOB_STATE_RUNNING, @@ -442,7 +444,9 @@ func (h *VacuumHandler) Execute(ctx context.Context, request *plugin_pb.ExecuteJ Activities: []*plugin_pb.ActivityEvent{ BuildExecutorActivity(stage, message), }, - }) + }); err != nil { + execCancel() + } }) if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{ @@ -459,7 +463,7 @@ func (h *VacuumHandler) Execute(ctx context.Context, request *plugin_pb.ExecuteJ return err } - if err := task.Execute(ctx, params); err != nil { + if err := task.Execute(execCtx, params); err != nil { _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ JobId: request.Job.JobId, JobType: request.Job.JobType, diff --git a/weed/plugin/worker/volume_balance_handler.go b/weed/plugin/worker/volume_balance_handler.go index 1a3af2b0d..01b8bc1d3 100644 --- a/weed/plugin/worker/volume_balance_handler.go +++ b/weed/plugin/worker/volume_balance_handler.go @@ -2,9 +2,12 @@ package pluginworker import ( "context" + "crypto/sha256" + "encoding/hex" "fmt" "sort" "strings" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/admin/topology" @@ -19,6 +22,7 @@ import ( const ( defaultBalanceTimeoutSeconds = int32(10 * 60) + maxProposalStringLength = 200 ) func init() { @@ -35,6 +39,8 @@ func init() { type volumeBalanceWorkerConfig struct { TaskConfig *balancetask.Config MinIntervalSeconds int + MaxConcurrentMoves int + BatchSize int } // VolumeBalanceHandler is the plugin job handler for volume balancing. @@ -133,6 +139,31 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { }, }, }, + { + SectionId: "batch_execution", + Title: "Batch Execution", + Description: "Controls for running multiple volume moves per job. The worker coordinates moves via gRPC and is not on the data path.", + Fields: []*plugin_pb.ConfigField{ + { + Name: "max_concurrent_moves", + Label: "Max Concurrent Moves", + Description: "Maximum number of volume moves to run concurrently within a single batch job.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 50}}, + }, + { + Name: "batch_size", + Label: "Batch Size", + Description: "Maximum number of volume moves to group into a single job. Set to 1 to disable batching.", + FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64, + Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER, + MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}}, + MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 100}}, + }, + }, + }, }, DefaultValues: map[string]*plugin_pb.ConfigValue{ "imbalance_threshold": { @@ -144,6 +175,12 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { "min_interval_seconds": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30 * 60}, }, + "max_concurrent_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(defaultMaxConcurrentMoves)}, + }, + "batch_size": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 20}, + }, }, }, AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{ @@ -167,6 +204,12 @@ func (h *VolumeBalanceHandler) Descriptor() *plugin_pb.JobTypeDescriptor { "min_interval_seconds": { Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30 * 60}, }, + "max_concurrent_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(defaultMaxConcurrentMoves)}, + }, + "batch_size": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 20}, + }, }, } } @@ -234,14 +277,19 @@ func (h *VolumeBalanceHandler) Detect( glog.Warningf("Plugin worker failed to emit volume_balance detection trace: %v", traceErr) } - proposals := make([]*plugin_pb.JobProposal, 0, len(results)) - for _, result := range results { - proposal, proposalErr := buildVolumeBalanceProposal(result) - if proposalErr != nil { - glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", proposalErr) - continue + var proposals []*plugin_pb.JobProposal + if workerConfig.BatchSize > 1 && len(results) > 1 { + proposals = buildBatchVolumeBalanceProposals(results, workerConfig.BatchSize, workerConfig.MaxConcurrentMoves) + } else { + proposals = make([]*plugin_pb.JobProposal, 0, len(results)) + for _, result := range results { + proposal, proposalErr := buildVolumeBalanceProposal(result) + if proposalErr != nil { + glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", proposalErr) + continue + } + proposals = append(proposals, proposal) } - proposals = append(proposals, proposal) } if err := sender.SendProposals(&plugin_pb.DetectionProposals{ @@ -511,6 +559,12 @@ func countBalanceDiskTypes(metrics []*workertypes.VolumeHealthMetrics) int { return len(diskTypes) } +const ( + defaultMaxConcurrentMoves = 5 + maxConcurrentMovesLimit = 50 + maxBatchMoves = 100 +) + func (h *VolumeBalanceHandler) Execute( ctx context.Context, request *plugin_pb.ExecuteJobRequest, @@ -530,6 +584,24 @@ func (h *VolumeBalanceHandler) Execute( if err != nil { return err } + + applyBalanceExecutionDefaults(params) + + // Batch path: if BalanceTaskParams has moves, execute them concurrently + if bp := params.GetBalanceParams(); bp != nil && len(bp.Moves) > 0 { + return h.executeBatchMoves(ctx, request, params, sender) + } + + // Single-move path (backward compatible) + return h.executeSingleMove(ctx, request, params, sender) +} + +func (h *VolumeBalanceHandler) executeSingleMove( + ctx context.Context, + request *plugin_pb.ExecuteJobRequest, + params *worker_pb.TaskParams, + sender ExecutionSender, +) error { if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" { return fmt.Errorf("volume balance source node is required") } @@ -537,8 +609,6 @@ func (h *VolumeBalanceHandler) Execute( return fmt.Errorf("volume balance target node is required") } - applyBalanceExecutionDefaults(params) - task := balancetask.NewBalanceTask( request.Job.JobId, params.Sources[0].Node, @@ -546,12 +616,14 @@ func (h *VolumeBalanceHandler) Execute( params.Collection, h.grpcDialOption, ) + execCtx, execCancel := context.WithCancel(ctx) + defer execCancel() task.SetProgressCallback(func(progress float64, stage string) { message := fmt.Sprintf("balance progress %.0f%%", progress) if strings.TrimSpace(stage) != "" { message = stage } - _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ + if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{ JobId: request.Job.JobId, JobType: request.Job.JobType, State: plugin_pb.JobState_JOB_STATE_RUNNING, @@ -561,7 +633,9 @@ func (h *VolumeBalanceHandler) Execute( Activities: []*plugin_pb.ActivityEvent{ BuildExecutorActivity(stage, message), }, - }) + }); err != nil { + execCancel() + } }) if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{ @@ -578,7 +652,7 @@ func (h *VolumeBalanceHandler) Execute( return err } - if err := task.Execute(ctx, params); err != nil { + if err := task.Execute(execCtx, params); err != nil { _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ JobId: request.Job.JobId, JobType: request.Job.JobType, @@ -621,6 +695,233 @@ func (h *VolumeBalanceHandler) Execute( }) } +// executeBatchMoves runs multiple volume moves concurrently within a single job. +func (h *VolumeBalanceHandler) executeBatchMoves( + ctx context.Context, + request *plugin_pb.ExecuteJobRequest, + params *worker_pb.TaskParams, + sender ExecutionSender, +) error { + bp := params.GetBalanceParams() + if len(bp.Moves) == 0 { + return fmt.Errorf("batch balance job has no moves") + } + if len(bp.Moves) > maxBatchMoves { + return fmt.Errorf("batch balance job has %d moves, exceeding limit of %d", len(bp.Moves), maxBatchMoves) + } + + // Filter out nil or incomplete moves before scheduling. + validMoves := make([]*worker_pb.BalanceMoveSpec, 0, len(bp.Moves)) + for _, m := range bp.Moves { + if m == nil { + continue + } + if strings.TrimSpace(m.SourceNode) == "" || strings.TrimSpace(m.TargetNode) == "" || m.VolumeId == 0 { + glog.Warningf("batch balance: skipping invalid move (vol:%d src:%q tgt:%q)", m.VolumeId, m.SourceNode, m.TargetNode) + continue + } + validMoves = append(validMoves, m) + } + if len(validMoves) == 0 { + return fmt.Errorf("batch balance job has no valid moves after validation") + } + moves := validMoves + + maxConcurrent := int(bp.MaxConcurrentMoves) + if maxConcurrent <= 0 { + maxConcurrent = defaultMaxConcurrentMoves + } + // Clamp to the worker-side upper bound so a stale or malicious job + // cannot request unbounded fan-out of concurrent volume moves. + if maxConcurrent > maxConcurrentMovesLimit { + maxConcurrent = maxConcurrentMovesLimit + } + + totalMoves := len(moves) + glog.Infof("batch volume balance: %d moves, max concurrent %d", totalMoves, maxConcurrent) + + if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{ + JobId: request.Job.JobId, + JobType: request.Job.JobType, + State: plugin_pb.JobState_JOB_STATE_ASSIGNED, + ProgressPercent: 0, + Stage: "assigned", + Message: fmt.Sprintf("batch volume balance accepted: %d moves", totalMoves), + Activities: []*plugin_pb.ActivityEvent{ + BuildExecutorActivity("assigned", fmt.Sprintf("batch volume balance: %d moves, concurrency %d", totalMoves, maxConcurrent)), + }, + }); err != nil { + return err + } + + // Derive a cancellable context so we can abort remaining moves if the + // progress stream breaks (client disconnect, context cancelled). + batchCtx, batchCancel := context.WithCancel(ctx) + defer batchCancel() + + // Per-move progress tracking. The mutex serializes both the progress + // bookkeeping and the sender.SendProgress call, since the underlying + // gRPC stream is not safe for concurrent writes. + var progressMu sync.Mutex + moveProgress := make([]float64, totalMoves) + var sendErr error // first progress send error + + reportAggregate := func(moveIndex int, progress float64, stage string) { + progressMu.Lock() + defer progressMu.Unlock() + + if sendErr != nil { + return // stream already broken, skip further sends + } + + moveProgress[moveIndex] = progress + total := 0.0 + for _, p := range moveProgress { + total += p + } + + aggregate := total / float64(totalMoves) + move := moves[moveIndex] + message := fmt.Sprintf("[Move %d/%d vol:%d] %s", moveIndex+1, totalMoves, move.VolumeId, stage) + + if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{ + JobId: request.Job.JobId, + JobType: request.Job.JobType, + State: plugin_pb.JobState_JOB_STATE_RUNNING, + ProgressPercent: aggregate, + Stage: fmt.Sprintf("move %d/%d", moveIndex+1, totalMoves), + Message: message, + Activities: []*plugin_pb.ActivityEvent{ + BuildExecutorActivity(fmt.Sprintf("move-%d", moveIndex+1), message), + }, + }); err != nil { + sendErr = err + batchCancel() // cancel in-flight and pending moves + } + } + + type moveResult struct { + index int + volumeID uint32 + source string + target string + err error + } + + sem := make(chan struct{}, maxConcurrent) + results := make(chan moveResult, totalMoves) + + for i, move := range moves { + sem <- struct{}{} // acquire slot + go func(idx int, m *worker_pb.BalanceMoveSpec) { + defer func() { <-sem }() // release slot + + task := balancetask.NewBalanceTask( + fmt.Sprintf("%s-move-%d", request.Job.JobId, idx), + m.SourceNode, + m.VolumeId, + m.Collection, + h.grpcDialOption, + ) + task.SetProgressCallback(func(progress float64, stage string) { + reportAggregate(idx, progress, stage) + }) + + moveParams := buildMoveTaskParams(m, bp) + err := task.Execute(batchCtx, moveParams) + results <- moveResult{ + index: idx, + volumeID: m.VolumeId, + source: m.SourceNode, + target: m.TargetNode, + err: err, + } + }(i, move) + } + + // Collect all results + var succeeded, failed int + var errMessages []string + for range moves { + r := <-results + if r.err != nil { + failed++ + errMessages = append(errMessages, fmt.Sprintf("volume %d (%s→%s): %v", r.volumeID, r.source, r.target, r.err)) + glog.Warningf("batch balance move %d failed: volume %d %s→%s: %v", r.index, r.volumeID, r.source, r.target, r.err) + } else { + succeeded++ + } + } + + summary := fmt.Sprintf("%d/%d volumes moved successfully", succeeded, totalMoves) + if failed > 0 { + summary += fmt.Sprintf("; %d failed", failed) + } + + // Mark the job as successful if at least one move succeeded. This avoids + // the standard retry path re-running already-completed moves. The failed + // move details are available in ErrorMessage and result metadata so a + // retry mechanism can operate only on the failed items. + success := succeeded > 0 || failed == 0 + var errMsg string + if failed > 0 { + errMsg = strings.Join(errMessages, "; ") + } + + return sender.SendCompleted(&plugin_pb.JobCompleted{ + JobId: request.Job.JobId, + JobType: request.Job.JobType, + Success: success, + ErrorMessage: errMsg, + Result: &plugin_pb.JobResult{ + Summary: summary, + OutputValues: map[string]*plugin_pb.ConfigValue{ + "total_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalMoves)}, + }, + "succeeded": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(succeeded)}, + }, + "failed": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(failed)}, + }, + }, + }, + Activities: []*plugin_pb.ActivityEvent{ + BuildExecutorActivity("completed", summary), + }, + }) +} + +// buildMoveTaskParams constructs a TaskParams for a single move within a batch. +func buildMoveTaskParams(move *worker_pb.BalanceMoveSpec, outerParams *worker_pb.BalanceTaskParams) *worker_pb.TaskParams { + timeoutSeconds := defaultBalanceTimeoutSeconds + forceMove := false + if outerParams != nil { + if outerParams.TimeoutSeconds > 0 { + timeoutSeconds = outerParams.TimeoutSeconds + } + forceMove = outerParams.ForceMove + } + return &worker_pb.TaskParams{ + VolumeId: move.VolumeId, + Collection: move.Collection, + VolumeSize: move.VolumeSize, + Sources: []*worker_pb.TaskSource{ + {Node: move.SourceNode, VolumeId: move.VolumeId}, + }, + Targets: []*worker_pb.TaskTarget{ + {Node: move.TargetNode, VolumeId: move.VolumeId}, + }, + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{ + ForceMove: forceMove, + TimeoutSeconds: timeoutSeconds, + }, + }, + } +} + func (h *VolumeBalanceHandler) collectVolumeMetrics( ctx context.Context, masterAddresses []string, @@ -654,9 +955,29 @@ func deriveBalanceWorkerConfig(values map[string]*plugin_pb.ConfigValue) *volume minIntervalSeconds = 0 } + maxConcurrentMoves64 := readInt64Config(values, "max_concurrent_moves", int64(defaultMaxConcurrentMoves)) + if maxConcurrentMoves64 < 1 { + maxConcurrentMoves64 = 1 + } + if maxConcurrentMoves64 > 50 { + maxConcurrentMoves64 = 50 + } + maxConcurrentMoves := int(maxConcurrentMoves64) + + batchSize64 := readInt64Config(values, "batch_size", 20) + if batchSize64 < 1 { + batchSize64 = 1 + } + if batchSize64 > 100 { + batchSize64 = 100 + } + batchSize := int(batchSize64) + return &volumeBalanceWorkerConfig{ TaskConfig: taskConfig, MinIntervalSeconds: minIntervalSeconds, + MaxConcurrentMoves: maxConcurrentMoves, + BatchSize: batchSize, } } @@ -738,6 +1059,163 @@ func buildVolumeBalanceProposal( }, nil } +// buildBatchVolumeBalanceProposals groups detection results into batch proposals. +// Each batch proposal encodes multiple moves in BalanceTaskParams.Moves. +func buildBatchVolumeBalanceProposals( + results []*workertypes.TaskDetectionResult, + batchSize int, + maxConcurrentMoves int, +) []*plugin_pb.JobProposal { + if batchSize <= 0 { + batchSize = 1 + } + if maxConcurrentMoves <= 0 { + maxConcurrentMoves = defaultMaxConcurrentMoves + } + + var proposals []*plugin_pb.JobProposal + + for batchStart := 0; batchStart < len(results); batchStart += batchSize { + batchEnd := batchStart + batchSize + if batchEnd > len(results) { + batchEnd = len(results) + } + batch := results[batchStart:batchEnd] + + // If only one result in this batch, emit a single-move proposal + if len(batch) == 1 { + proposal, err := buildVolumeBalanceProposal(batch[0]) + if err != nil { + glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", err) + continue + } + proposals = append(proposals, proposal) + continue + } + + // Build batch proposal with BalanceMoveSpec entries + moves := make([]*worker_pb.BalanceMoveSpec, 0, len(batch)) + var volumeIDs []string + var dedupeKeys []string + highestPriority := workertypes.TaskPriorityLow + + for _, result := range batch { + if result == nil || result.TypedParams == nil { + continue + } + sourceNode := "" + targetNode := "" + if len(result.TypedParams.Sources) > 0 { + sourceNode = result.TypedParams.Sources[0].Node + } + if len(result.TypedParams.Targets) > 0 { + targetNode = result.TypedParams.Targets[0].Node + } + // Skip moves with missing required fields that would fail at execution time. + if result.VolumeID == 0 || sourceNode == "" || targetNode == "" { + glog.Warningf("Plugin worker skip invalid batch move: volume=%d source=%q target=%q", result.VolumeID, sourceNode, targetNode) + continue + } + moves = append(moves, &worker_pb.BalanceMoveSpec{ + VolumeId: uint32(result.VolumeID), + SourceNode: sourceNode, + TargetNode: targetNode, + Collection: result.Collection, + VolumeSize: result.TypedParams.VolumeSize, + }) + volumeIDs = append(volumeIDs, fmt.Sprintf("%d", result.VolumeID)) + + dedupeKey := fmt.Sprintf("volume_balance:%d", result.VolumeID) + if result.Collection != "" { + dedupeKey += ":" + result.Collection + } + dedupeKeys = append(dedupeKeys, dedupeKey) + + if result.Priority > highestPriority { + highestPriority = result.Priority + } + } + + if len(moves) == 0 { + continue + } + + // After filtering, if only one valid move remains, emit a single-move + // proposal instead of a batch to preserve the simpler execution path. + if len(moves) == 1 { + // Find the matching result for the single valid move + for _, result := range batch { + if result != nil && uint32(result.VolumeID) == moves[0].VolumeId { + proposal, err := buildVolumeBalanceProposal(result) + if err != nil { + glog.Warningf("Plugin worker skip invalid volume_balance proposal: %v", err) + } else { + proposals = append(proposals, proposal) + } + break + } + } + continue + } + + // Serialize batch params + taskParams := &worker_pb.TaskParams{ + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{ + TimeoutSeconds: defaultBalanceTimeoutSeconds, + MaxConcurrentMoves: int32(maxConcurrentMoves), + Moves: moves, + }, + }, + } + payload, err := proto.Marshal(taskParams) + if err != nil { + glog.Warningf("Plugin worker failed to marshal batch balance proposal: %v", err) + continue + } + + proposalID := fmt.Sprintf("volume-balance-batch-%d-%d", batchStart, time.Now().UnixNano()) + summary := fmt.Sprintf("Batch balance %d volumes (%s)", len(moves), strings.Join(volumeIDs, ",")) + if len(summary) > maxProposalStringLength { + summary = fmt.Sprintf("Batch balance %d volumes", len(moves)) + } + + // Use composite dedupe key for the batch. When the full key exceeds + // the length limit, fall back to a deterministic hash of the sorted + // keys so the same batch always produces the same dedupe key. + sort.Strings(dedupeKeys) + compositeDedupeKey := fmt.Sprintf("volume_balance_batch:%s", strings.Join(dedupeKeys, "+")) + if len(compositeDedupeKey) > maxProposalStringLength { + h := sha256.Sum256([]byte(strings.Join(dedupeKeys, "+"))) + compositeDedupeKey = fmt.Sprintf("volume_balance_batch:%d-%s", batchStart, hex.EncodeToString(h[:12])) + } + + proposals = append(proposals, &plugin_pb.JobProposal{ + ProposalId: proposalID, + DedupeKey: compositeDedupeKey, + JobType: "volume_balance", + Priority: mapTaskPriority(highestPriority), + Summary: summary, + Detail: fmt.Sprintf("Batch of %d volume moves with concurrency %d", len(moves), maxConcurrentMoves), + Parameters: map[string]*plugin_pb.ConfigValue{ + "task_params_pb": { + Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}, + }, + "batch_size": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(moves))}, + }, + }, + Labels: map[string]string{ + "task_type": "balance", + "batch": "true", + "batch_size": fmt.Sprintf("%d", len(moves)), + }, + }) + } + + return proposals +} + func decodeVolumeBalanceTaskParams(job *plugin_pb.JobSpec) (*worker_pb.TaskParams, error) { if job == nil { return nil, fmt.Errorf("job spec is nil") diff --git a/weed/plugin/worker/volume_balance_handler_test.go b/weed/plugin/worker/volume_balance_handler_test.go index ace71ae3a..ccfdd5a7c 100644 --- a/weed/plugin/worker/volume_balance_handler_test.go +++ b/weed/plugin/worker/volume_balance_handler_test.go @@ -2,7 +2,9 @@ package pluginworker import ( "context" + "fmt" "strings" + "sync" "testing" "time" @@ -111,6 +113,167 @@ func TestDeriveBalanceWorkerConfig(t *testing.T) { if cfg.MinIntervalSeconds != 33 { t.Fatalf("expected min_interval_seconds 33, got %d", cfg.MinIntervalSeconds) } + // Defaults for batch config when not specified + if cfg.MaxConcurrentMoves != defaultMaxConcurrentMoves { + t.Fatalf("expected default max_concurrent_moves %d, got %d", defaultMaxConcurrentMoves, cfg.MaxConcurrentMoves) + } + if cfg.BatchSize != 20 { + t.Fatalf("expected default batch_size 20, got %d", cfg.BatchSize) + } +} + +func TestDeriveBalanceWorkerConfigBatchFields(t *testing.T) { + values := map[string]*plugin_pb.ConfigValue{ + "max_concurrent_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 10}, + }, + "batch_size": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 50}, + }, + } + + cfg := deriveBalanceWorkerConfig(values) + if cfg.MaxConcurrentMoves != 10 { + t.Fatalf("expected max_concurrent_moves 10, got %d", cfg.MaxConcurrentMoves) + } + if cfg.BatchSize != 50 { + t.Fatalf("expected batch_size 50, got %d", cfg.BatchSize) + } +} + +func TestDeriveBalanceWorkerConfigBatchClamping(t *testing.T) { + values := map[string]*plugin_pb.ConfigValue{ + "max_concurrent_moves": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 999}, + }, + "batch_size": { + Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}, + }, + } + + cfg := deriveBalanceWorkerConfig(values) + if cfg.MaxConcurrentMoves != 50 { + t.Fatalf("expected max_concurrent_moves clamped to 50, got %d", cfg.MaxConcurrentMoves) + } + if cfg.BatchSize != 1 { + t.Fatalf("expected batch_size clamped to 1, got %d", cfg.BatchSize) + } +} + +func makeDetectionResult(volumeID uint32, source, target, collection string) *workertypes.TaskDetectionResult { + return &workertypes.TaskDetectionResult{ + TaskID: fmt.Sprintf("balance-%d", volumeID), + TaskType: workertypes.TaskTypeBalance, + VolumeID: volumeID, + Server: source, + Collection: collection, + Priority: workertypes.TaskPriorityNormal, + Reason: "imbalanced", + TypedParams: &worker_pb.TaskParams{ + VolumeId: volumeID, + Collection: collection, + VolumeSize: 1024, + Sources: []*worker_pb.TaskSource{ + {Node: source, VolumeId: volumeID}, + }, + Targets: []*worker_pb.TaskTarget{ + {Node: target, VolumeId: volumeID}, + }, + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{TimeoutSeconds: 600}, + }, + }, + } +} + +func TestBuildBatchVolumeBalanceProposals_SingleBatch(t *testing.T) { + results := []*workertypes.TaskDetectionResult{ + makeDetectionResult(1, "s1:8080", "t1:8080", "c1"), + makeDetectionResult(2, "s2:8080", "t2:8080", "c1"), + makeDetectionResult(3, "s1:8080", "t2:8080", "c1"), + } + + proposals := buildBatchVolumeBalanceProposals(results, 10, 5) + if len(proposals) != 1 { + t.Fatalf("expected 1 batch proposal, got %d", len(proposals)) + } + + p := proposals[0] + if p.Labels["batch"] != "true" { + t.Fatalf("expected batch label") + } + if p.Labels["batch_size"] != "3" { + t.Fatalf("expected batch_size label '3', got %q", p.Labels["batch_size"]) + } + + // Decode and verify moves + payload := p.Parameters["task_params_pb"].GetBytesValue() + if len(payload) == 0 { + t.Fatalf("expected task_params_pb payload") + } + decoded := &worker_pb.TaskParams{} + if err := proto.Unmarshal(payload, decoded); err != nil { + t.Fatalf("unmarshal: %v", err) + } + moves := decoded.GetBalanceParams().GetMoves() + if len(moves) != 3 { + t.Fatalf("expected 3 moves, got %d", len(moves)) + } + if moves[0].VolumeId != 1 || moves[1].VolumeId != 2 || moves[2].VolumeId != 3 { + t.Fatalf("unexpected volume IDs: %v", moves) + } + if decoded.GetBalanceParams().MaxConcurrentMoves != 5 { + t.Fatalf("expected MaxConcurrentMoves 5, got %d", decoded.GetBalanceParams().MaxConcurrentMoves) + } +} + +func TestBuildBatchVolumeBalanceProposals_MultipleBatches(t *testing.T) { + results := make([]*workertypes.TaskDetectionResult, 5) + for i := range results { + results[i] = makeDetectionResult(uint32(i+1), "s1:8080", "t1:8080", "c1") + } + + proposals := buildBatchVolumeBalanceProposals(results, 2, 3) + // 5 results / batch_size 2 = 3 proposals (2, 2, 1) + if len(proposals) != 3 { + t.Fatalf("expected 3 proposals, got %d", len(proposals)) + } + + // First two should be batch proposals + if proposals[0].Labels["batch"] != "true" { + t.Fatalf("first proposal should be batch") + } + if proposals[1].Labels["batch"] != "true" { + t.Fatalf("second proposal should be batch") + } + // Last one has only 1 result, should fall back to single-move proposal + if proposals[2].Labels["batch"] == "true" { + t.Fatalf("last proposal with 1 result should be single-move, not batch") + } +} + +func TestBuildBatchVolumeBalanceProposals_BatchSizeOne(t *testing.T) { + results := []*workertypes.TaskDetectionResult{ + makeDetectionResult(1, "s1:8080", "t1:8080", "c1"), + makeDetectionResult(2, "s2:8080", "t2:8080", "c1"), + } + + // batch_size=1 should not be called (Detect guards this), but test the function directly + proposals := buildBatchVolumeBalanceProposals(results, 1, 5) + // Each result becomes its own single-move proposal + if len(proposals) != 2 { + t.Fatalf("expected 2 proposals, got %d", len(proposals)) + } +} + +func TestVolumeBalanceDescriptorHasBatchFields(t *testing.T) { + descriptor := NewVolumeBalanceHandler(nil).Descriptor() + if !workerConfigFormHasField(descriptor.WorkerConfigForm, "max_concurrent_moves") { + t.Fatalf("expected max_concurrent_moves in worker config form") + } + if !workerConfigFormHasField(descriptor.WorkerConfigForm, "batch_size") { + t.Fatalf("expected batch_size in worker config form") + } } func TestBuildVolumeBalanceProposal(t *testing.T) { @@ -265,6 +428,207 @@ func TestVolumeBalanceDescriptorOmitsExecutionTuningFields(t *testing.T) { } } +type recordingExecutionSender struct { + mu sync.Mutex + progress []*plugin_pb.JobProgressUpdate + completed *plugin_pb.JobCompleted +} + +func (r *recordingExecutionSender) SendProgress(p *plugin_pb.JobProgressUpdate) error { + r.mu.Lock() + defer r.mu.Unlock() + r.progress = append(r.progress, proto.Clone(p).(*plugin_pb.JobProgressUpdate)) + return nil +} + +func (r *recordingExecutionSender) SendCompleted(c *plugin_pb.JobCompleted) error { + r.mu.Lock() + defer r.mu.Unlock() + r.completed = proto.Clone(c).(*plugin_pb.JobCompleted) + return nil +} + +func TestBuildMoveTaskParams(t *testing.T) { + move := &worker_pb.BalanceMoveSpec{ + VolumeId: 42, + SourceNode: "10.0.0.1:8080", + TargetNode: "10.0.0.2:8080", + Collection: "photos", + VolumeSize: 1024 * 1024, + } + + outerParams := &worker_pb.BalanceTaskParams{ + ForceMove: true, + TimeoutSeconds: 300, + } + params := buildMoveTaskParams(move, outerParams) + if params.VolumeId != 42 { + t.Fatalf("expected volume_id 42, got %d", params.VolumeId) + } + if params.Collection != "photos" { + t.Fatalf("expected collection photos, got %s", params.Collection) + } + if params.VolumeSize != 1024*1024 { + t.Fatalf("expected volume_size %d, got %d", 1024*1024, params.VolumeSize) + } + if len(params.Sources) != 1 || params.Sources[0].Node != "10.0.0.1:8080" { + t.Fatalf("unexpected sources: %+v", params.Sources) + } + if len(params.Targets) != 1 || params.Targets[0].Node != "10.0.0.2:8080" { + t.Fatalf("unexpected targets: %+v", params.Targets) + } + bp := params.GetBalanceParams() + if bp == nil { + t.Fatalf("expected balance params") + } + if bp.TimeoutSeconds != 300 { + t.Fatalf("expected timeout 300, got %d", bp.TimeoutSeconds) + } + if !bp.ForceMove { + t.Fatalf("expected force_move to be propagated from outer params") + } +} + +func TestBuildMoveTaskParamsDefaultTimeout(t *testing.T) { + move := &worker_pb.BalanceMoveSpec{ + VolumeId: 1, + SourceNode: "a:8080", + TargetNode: "b:8080", + } + params := buildMoveTaskParams(move, nil) + if params.GetBalanceParams().TimeoutSeconds != defaultBalanceTimeoutSeconds { + t.Fatalf("expected default timeout %d, got %d", defaultBalanceTimeoutSeconds, params.GetBalanceParams().TimeoutSeconds) + } + if params.GetBalanceParams().ForceMove { + t.Fatalf("expected force_move to default to false with nil outer params") + } +} + +func TestExecuteDispatchesBatchPath(t *testing.T) { + // Build a job with batch moves in BalanceTaskParams + bp := &worker_pb.BalanceTaskParams{ + TimeoutSeconds: 60, + MaxConcurrentMoves: 2, + Moves: []*worker_pb.BalanceMoveSpec{ + {VolumeId: 1, SourceNode: "s1:8080", TargetNode: "t1:8080", Collection: "c1"}, + {VolumeId: 2, SourceNode: "s2:8080", TargetNode: "t2:8080", Collection: "c1"}, + }, + } + taskParams := &worker_pb.TaskParams{ + TaskId: "batch-1", + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: bp, + }, + } + payload, err := proto.Marshal(taskParams) + if err != nil { + t.Fatalf("marshal: %v", err) + } + + job := &plugin_pb.JobSpec{ + JobId: "batch-job-1", + JobType: "volume_balance", + Parameters: map[string]*plugin_pb.ConfigValue{ + "task_params_pb": {Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}}, + }, + } + + handler := NewVolumeBalanceHandler(nil) + sender := &recordingExecutionSender{} + + // Execute will enter the batch path. It will fail because there's no real gRPC server, + // but we verify it sends the assigned progress and eventually a completion. + err = handler.Execute(context.Background(), &plugin_pb.ExecuteJobRequest{ + Job: job, + }, sender) + + // Expect an error since no real volume servers exist + // But verify the batch path was taken by checking the assigned message + sender.mu.Lock() + defer sender.mu.Unlock() + + if len(sender.progress) == 0 { + t.Fatalf("expected progress messages from batch path") + } + + // First progress should be "assigned" with batch info + first := sender.progress[0] + if first.Stage != "assigned" { + t.Fatalf("expected first stage 'assigned', got %q", first.Stage) + } + if !strings.Contains(first.Message, "batch") || !strings.Contains(first.Message, "2 moves") { + t.Fatalf("expected batch assigned message, got %q", first.Message) + } + + // Should have a completion with failure details (since no servers) + if sender.completed == nil { + t.Fatalf("expected completion message") + } + if sender.completed.Success { + t.Fatalf("expected failure since no real gRPC servers") + } + // Should report 0 succeeded, 2 failed + if v, ok := sender.completed.Result.OutputValues["failed"]; !ok || v.GetInt64Value() != 2 { + t.Fatalf("expected 2 failed moves, got %+v", sender.completed.Result.OutputValues) + } +} + +func TestExecuteSingleMovePathUnchanged(t *testing.T) { + // Build a single-move job (no batch moves) + taskParams := &worker_pb.TaskParams{ + TaskId: "single-1", + VolumeId: 99, + Collection: "videos", + Sources: []*worker_pb.TaskSource{ + {Node: "src:8080", VolumeId: 99}, + }, + Targets: []*worker_pb.TaskTarget{ + {Node: "dst:8080", VolumeId: 99}, + }, + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{ + TimeoutSeconds: 60, + }, + }, + } + payload, err := proto.Marshal(taskParams) + if err != nil { + t.Fatalf("marshal: %v", err) + } + + job := &plugin_pb.JobSpec{ + JobId: "single-job-1", + JobType: "volume_balance", + Parameters: map[string]*plugin_pb.ConfigValue{ + "task_params_pb": {Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: payload}}, + }, + } + + handler := NewVolumeBalanceHandler(nil) + sender := &recordingExecutionSender{} + + // Execute single-move path. Will fail on gRPC but verify it takes the single-move path. + _ = handler.Execute(context.Background(), &plugin_pb.ExecuteJobRequest{ + Job: job, + }, sender) + + sender.mu.Lock() + defer sender.mu.Unlock() + + if len(sender.progress) == 0 { + t.Fatalf("expected progress messages") + } + + // Single-move path sends "volume balance job accepted" not "batch volume balance" + first := sender.progress[0] + if first.Stage != "assigned" { + t.Fatalf("expected first stage 'assigned', got %q", first.Stage) + } + if strings.Contains(first.Message, "batch") { + t.Fatalf("single-move path should not mention batch, got %q", first.Message) + } +} + func workerConfigFormHasField(form *plugin_pb.ConfigForm, fieldName string) bool { if form == nil { return false diff --git a/weed/worker/tasks/balance/balance_task.go b/weed/worker/tasks/balance/balance_task.go index ae7af01f3..140e5db06 100644 --- a/weed/worker/tasks/balance/balance_task.go +++ b/weed/worker/tasks/balance/balance_task.go @@ -82,38 +82,70 @@ func (t *BalanceTask) Execute(ctx context.Context, params *worker_pb.TaskParams) // Step 1: Mark volume readonly t.ReportProgress(10.0) t.GetLogger().Info("Marking volume readonly for move") - if err := t.markVolumeReadonly(sourceServer, volumeId); err != nil { + if err := t.markVolumeReadonly(ctx, sourceServer, volumeId); err != nil { return fmt.Errorf("failed to mark volume readonly: %v", err) } + // Restore source writability if any subsequent step fails, so the + // source volume is not left permanently readonly on abort. + sourceMarkedReadonly := true + defer func() { + if sourceMarkedReadonly { + cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cleanupCancel() + if wErr := t.markVolumeWritable(cleanupCtx, sourceServer, volumeId); wErr != nil { + glog.Warningf("failed to restore volume %d writability on %s: %v", volumeId, sourceServer, wErr) + } + } + }() - // Step 2: Copy volume to destination - t.ReportProgress(20.0) - t.GetLogger().Info("Copying volume to destination") - lastAppendAtNs, err := t.copyVolume(sourceServer, targetServer, volumeId) + // Step 2: Read source volume size before copy (for post-copy verification) + t.ReportProgress(15.0) + sourceStatus, err := t.readVolumeFileStatus(ctx, sourceServer, volumeId) if err != nil { - return fmt.Errorf("failed to copy volume: %v", err) + return fmt.Errorf("failed to read source volume status: %v", err) } - // Step 3: Mount volume on target and mark it readonly - t.ReportProgress(60.0) - t.GetLogger().Info("Mounting volume on target server") - if err := t.mountVolume(targetServer, volumeId); err != nil { - return fmt.Errorf("failed to mount volume on target: %v", err) + // Step 3: Copy volume to destination (VolumeCopy also mounts the volume) + t.ReportProgress(20.0) + t.GetLogger().Info("Copying volume to destination") + lastAppendAtNs, err := t.copyVolume(ctx, sourceServer, targetServer, volumeId) + if err != nil { + return fmt.Errorf("failed to copy volume: %v", err) } // Step 4: Tail for updates t.ReportProgress(70.0) t.GetLogger().Info("Syncing final updates") - if err := t.tailVolume(sourceServer, targetServer, volumeId, lastAppendAtNs); err != nil { + if err := t.tailVolume(ctx, sourceServer, targetServer, volumeId, lastAppendAtNs); err != nil { glog.Warningf("Tail operation failed (may be normal): %v", err) } - // Step 5: Delete from source + // Step 5: Verify the volume on target before deleting source. + // This is a critical safety check — once the source is deleted, data loss + // is irreversible. We verify the target has the volume with matching size. + t.ReportProgress(85.0) + t.GetLogger().Info("Verifying volume on target before deleting source") + targetStatus, err := t.readVolumeFileStatus(ctx, targetServer, volumeId) + if err != nil { + return fmt.Errorf("aborting: cannot verify volume %d on target %s before deleting source: %v", volumeId, targetServer, err) + } + if targetStatus.DatFileSize != sourceStatus.DatFileSize { + return fmt.Errorf("aborting: volume %d .dat size mismatch — source %d bytes, target %d bytes", + volumeId, sourceStatus.DatFileSize, targetStatus.DatFileSize) + } + if targetStatus.FileCount != sourceStatus.FileCount { + return fmt.Errorf("aborting: volume %d file count mismatch — source %d, target %d", + volumeId, sourceStatus.FileCount, targetStatus.FileCount) + } + + // Step 6: Delete from source — after this, the move is committed. + // Clear the readonly flag so the defer doesn't try to restore writability. t.ReportProgress(90.0) t.GetLogger().Info("Deleting volume from source server") - if err := t.deleteVolume(sourceServer, volumeId); err != nil { + if err := t.deleteVolume(ctx, sourceServer, volumeId); err != nil { return fmt.Errorf("failed to delete volume from source: %v", err) } + sourceMarkedReadonly = false t.ReportProgress(100.0) glog.Infof("Balance task completed successfully: volume %d moved from %s to %s", @@ -164,24 +196,35 @@ func (t *BalanceTask) GetProgress() float64 { // Helper methods for real balance operations -// markVolumeReadonly marks the volume readonly -func (t *BalanceTask) markVolumeReadonly(server pb.ServerAddress, volumeId needle.VolumeId) error { +// markVolumeReadonly marks the volume readonly on the source server. +func (t *BalanceTask) markVolumeReadonly(ctx context.Context, server pb.ServerAddress, volumeId needle.VolumeId) error { return operation.WithVolumeServerClient(false, server, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - _, err := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ + _, err := client.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: uint32(volumeId), }) return err }) } -// copyVolume copies volume from source to target server -func (t *BalanceTask) copyVolume(sourceServer, targetServer pb.ServerAddress, volumeId needle.VolumeId) (uint64, error) { +// markVolumeWritable restores the volume to writable on the source server. +func (t *BalanceTask) markVolumeWritable(ctx context.Context, server pb.ServerAddress, volumeId needle.VolumeId) error { + return operation.WithVolumeServerClient(false, server, t.grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + _, err := client.VolumeMarkWritable(ctx, &volume_server_pb.VolumeMarkWritableRequest{ + VolumeId: uint32(volumeId), + }) + return err + }) +} + +// copyVolume copies volume from source to target server. +func (t *BalanceTask) copyVolume(ctx context.Context, sourceServer, targetServer pb.ServerAddress, volumeId needle.VolumeId) (uint64, error) { var lastAppendAtNs uint64 err := operation.WithVolumeServerClient(true, targetServer, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - stream, err := client.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ + stream, err := client.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{ VolumeId: uint32(volumeId), SourceDataNode: string(sourceServer), }) @@ -213,22 +256,11 @@ func (t *BalanceTask) copyVolume(sourceServer, targetServer pb.ServerAddress, vo return lastAppendAtNs, err } -// mountVolume mounts the volume on the target server -func (t *BalanceTask) mountVolume(server pb.ServerAddress, volumeId needle.VolumeId) error { - return operation.WithVolumeServerClient(false, server, t.grpcDialOption, - func(client volume_server_pb.VolumeServerClient) error { - _, err := client.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ - VolumeId: uint32(volumeId), - }) - return err - }) -} - -// tailVolume syncs remaining updates from source to target -func (t *BalanceTask) tailVolume(sourceServer, targetServer pb.ServerAddress, volumeId needle.VolumeId, sinceNs uint64) error { +// tailVolume syncs remaining updates from source to target. +func (t *BalanceTask) tailVolume(ctx context.Context, sourceServer, targetServer pb.ServerAddress, volumeId needle.VolumeId, sinceNs uint64) error { return operation.WithVolumeServerClient(true, targetServer, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - _, err := client.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{ + _, err := client.VolumeTailReceiver(ctx, &volume_server_pb.VolumeTailReceiverRequest{ VolumeId: uint32(volumeId), SinceNs: sinceNs, IdleTimeoutSeconds: 60, // 1 minute timeout @@ -238,11 +270,26 @@ func (t *BalanceTask) tailVolume(sourceServer, targetServer pb.ServerAddress, vo }) } -// deleteVolume deletes the volume from the server -func (t *BalanceTask) deleteVolume(server pb.ServerAddress, volumeId needle.VolumeId) error { +// readVolumeFileStatus reads the volume's file status (sizes, file count) from a server. +func (t *BalanceTask) readVolumeFileStatus(ctx context.Context, server pb.ServerAddress, volumeId needle.VolumeId) (*volume_server_pb.ReadVolumeFileStatusResponse, error) { + var resp *volume_server_pb.ReadVolumeFileStatusResponse + err := operation.WithVolumeServerClient(false, server, t.grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + var err error + resp, err = client.ReadVolumeFileStatus(ctx, + &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: uint32(volumeId), + }) + return err + }) + return resp, err +} + +// deleteVolume deletes the volume from the server. +func (t *BalanceTask) deleteVolume(ctx context.Context, server pb.ServerAddress, volumeId needle.VolumeId) error { return operation.WithVolumeServerClient(false, server, t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - _, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ + _, err := client.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{ VolumeId: uint32(volumeId), OnlyEmpty: false, }) diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index ac1ad488f..d10033dd3 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -230,6 +230,33 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics break } + // When the global max and min effective counts differ by at most 1, + // no single move can improve balance — it would just swap which server + // is min vs max. Stop here to avoid infinite oscillation when the + // threshold is unachievable (e.g., 11 vols across 4 servers: best is + // 3/3/3/2, imbalance=36%). We scan ALL servers' effective counts so the + // check works regardless of whether utilization or raw counts are used. + globalMaxCount, globalMinCount := 0, math.MaxInt + for _, c := range effectiveCounts { + if c > globalMaxCount { + globalMaxCount = c + } + if c < globalMinCount { + globalMinCount = c + } + } + if globalMaxCount-globalMinCount <= 1 { + if len(results) == 0 { + glog.Infof("BALANCE [%s]: No tasks created - cluster as balanced as possible. Imbalance=%.1f%% (threshold=%.1f%%), but max-min diff is %d", + diskType, imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, globalMaxCount-globalMinCount) + } else { + glog.Infof("BALANCE [%s]: Created %d task(s), cluster as balanced as possible. Imbalance=%.1f%% (threshold=%.1f%%), max-min diff=%d", + diskType, len(results), imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, globalMaxCount-globalMinCount) + } + balanced = true + break + } + // Select a volume from the overloaded server using per-server cursor var selectedVolume *types.VolumeHealthMetrics serverVols := volumesByServer[maxServer] @@ -252,11 +279,13 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics continue } - // Plan destination and create task. - // On failure, continue to the next volume on the same server rather - // than exhausting the entire server — the failure may be per-volume - // (e.g., volume not found in topology, AddPendingTask failed). - task, destServerID := createBalanceTask(diskType, selectedVolume, clusterInfo) + // Create task targeting minServer — the greedy algorithm's natural choice. + // Using minServer instead of letting planBalanceDestination independently + // pick a destination ensures that the detection loop's effective counts + // and the destination selection stay in sync. Without this, the topology's + // LoadCount-based scoring can diverge from the adjustment-based effective + // counts, causing moves to pile onto one server or oscillate (A→B, B→A). + task, destServerID := createBalanceTask(diskType, selectedVolume, clusterInfo, minServer) if task == nil { glog.V(1).Infof("BALANCE [%s]: Cannot plan task for volume %d on server %s, trying next volume", diskType, selectedVolume.VolumeID, maxServer) continue @@ -264,10 +293,18 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics results = append(results, task) - // Adjust effective counts for the next iteration + // Adjust effective counts for the next iteration. adjustments[maxServer]-- if destServerID != "" { adjustments[destServerID]++ + // If the destination server wasn't in serverVolumeCounts (e.g., a + // server with 0 volumes not seeded from topology), add it so + // subsequent iterations include it in effective/average/min/max. + if _, exists := serverVolumeCounts[destServerID]; !exists { + serverVolumeCounts[destServerID] = 0 + sortedServers = append(sortedServers, destServerID) + sort.Strings(sortedServers) + } } } @@ -277,9 +314,10 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics } // createBalanceTask creates a single balance task for the selected volume. +// targetServer is the server ID chosen by the detection loop's greedy algorithm. // Returns (nil, "") if destination planning fails. // On success, returns the task result and the canonical destination server ID. -func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo) (*types.TaskDetectionResult, string) { +func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, targetServer string) (*types.TaskDetectionResult, string) { taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().UnixNano()) task := &types.TaskDetectionResult{ @@ -300,10 +338,19 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric return nil, "" } - destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) + // Resolve the target server chosen by the detection loop's effective counts. + // This keeps destination selection in sync with the greedy algorithm rather + // than relying on topology LoadCount which can diverge across iterations. + destinationPlan, err := resolveBalanceDestination(clusterInfo.ActiveTopology, selectedVolume, targetServer) if err != nil { - glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err) - return nil, "" + // Fall back to score-based planning if the preferred target can't be resolved + glog.V(1).Infof("BALANCE [%s]: Cannot resolve target %s for volume %d, falling back to score-based planning: %v", + diskType, targetServer, selectedVolume.VolumeID, err) + destinationPlan, err = planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) + if err != nil { + glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err) + return nil, "" + } } // Find the actual disk containing the volume on the source server @@ -383,8 +430,54 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric return task, destinationPlan.TargetNode } -// planBalanceDestination plans the destination for a balance operation -// This function implements destination planning logic directly in the detection phase +// resolveBalanceDestination resolves the destination for a balance operation +// when the target server is already known (chosen by the detection loop's +// effective volume counts). It finds the appropriate disk and address for the +// target server in the topology. +func resolveBalanceDestination(activeTopology *topology.ActiveTopology, selectedVolume *types.VolumeHealthMetrics, targetServer string) (*topology.DestinationPlan, error) { + topologyInfo := activeTopology.GetTopologyInfo() + if topologyInfo == nil { + return nil, fmt.Errorf("no topology info available") + } + + // Find the target node in the topology and get its disk info + for _, dc := range topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + if node.Id != targetServer { + continue + } + // Find an available disk matching the volume's disk type + for diskTypeName, diskInfo := range node.DiskInfos { + if diskTypeName != selectedVolume.DiskType { + continue + } + if diskInfo.MaxVolumeCount > 0 && diskInfo.VolumeCount >= diskInfo.MaxVolumeCount { + continue // disk is full + } + targetAddress, err := util.ResolveServerAddress(node.Id, activeTopology) + if err != nil { + return nil, fmt.Errorf("failed to resolve address for target server %s: %v", node.Id, err) + } + return &topology.DestinationPlan{ + TargetNode: node.Id, + TargetAddress: targetAddress, + TargetDisk: diskInfo.DiskId, + TargetRack: rack.Id, + TargetDC: dc.Id, + ExpectedSize: selectedVolume.Size, + }, nil + } + return nil, fmt.Errorf("target server %s has no available disk of type %s", targetServer, selectedVolume.DiskType) + } + } + } + return nil, fmt.Errorf("target server %s not found in topology", targetServer) +} + +// planBalanceDestination plans the destination for a balance operation using +// score-based selection. Used as a fallback when the preferred target cannot +// be resolved, and for single-move scenarios outside the detection loop. func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVolume *types.VolumeHealthMetrics) (*topology.DestinationPlan, error) { // Get source node information from topology var sourceRack, sourceDC string diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go index af7b5bc9f..aee5b5420 100644 --- a/weed/worker/tasks/balance/detection_test.go +++ b/weed/worker/tasks/balance/detection_test.go @@ -917,3 +917,86 @@ func TestDetection_HeterogeneousCapacity(t *testing.T) { } t.Logf("First balance task: move from %s (correct: highest utilization)", firstSource) } + +// TestDetection_ZeroVolumeServerIncludedInBalance verifies that a server +// with zero volumes (seeded from topology with a matching disk type) is +// correctly included in the balance calculation and receives moves to +// equalize the distribution. +func TestDetection_ZeroVolumeServerIncludedInBalance(t *testing.T) { + // 4 servers total, but only 3 have volumes. + // node-d has a disk of the same type but zero volumes, so it appears in the + // topology and is seeded into serverVolumeCounts with count=0. + + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1", maxVolumes: 20}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1", maxVolumes: 20}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1", maxVolumes: 20}, + {id: "node-d", diskType: "hdd", diskID: 4, dc: "dc1", rack: "rack1", maxVolumes: 20}, + } + + // node-a: 8 volumes, node-b: 2, node-c: 1, node-d: 0 + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "", 1, 8)...) + metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "", 20, 2)...) + metrics = append(metrics, makeVolumes("node-c", "hdd", "dc1", "rack1", "", 30, 1)...) + // node-d has 0 volumes — no metrics + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 100) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + if len(tasks) == 0 { + t.Fatal("Expected balance tasks for 8/2/1/0 distribution, got 0") + } + + assertNoDuplicateVolumes(t, tasks) + + // With 11 volumes across 4 servers, the best achievable is 3/3/3/2 + // (imbalance=36.4%), which exceeds the 20% threshold. The algorithm should + // stop when max-min<=1 rather than oscillating endlessly. + effective := computeEffectiveCounts(servers, metrics, tasks) + total := 0 + maxC, minC := 0, len(metrics) + for _, c := range effective { + total += c + if c > maxC { + maxC = c + } + if c < minC { + minC = c + } + } + + // The diff between max and min should be at most 1 (as balanced as possible) + if maxC-minC > 1 { + t.Errorf("After %d moves, distribution not optimally balanced: effective=%v, max-min=%d (want ≤1)", + len(tasks), effective, maxC-minC) + } + + // Count destinations — moves should spread, not pile onto one server + destCounts := make(map[string]int) + for _, task := range tasks { + if task.TypedParams != nil && len(task.TypedParams.Targets) > 0 { + destCounts[task.TypedParams.Targets[0].Node]++ + } + } + + // Moves should go to at least 2 different destinations + if len(destCounts) < 2 { + t.Errorf("Expected moves to spread across destinations, but got: %v", destCounts) + } + + // Should need only ~5 moves for 8/2/1/0 → 3/3/3/2, not 8+ (oscillation) + if len(tasks) > 8 { + t.Errorf("Too many moves (%d) — likely oscillating; expected ≤8 for this distribution", len(tasks)) + } + + avg := float64(total) / float64(len(effective)) + imbalance := float64(maxC-minC) / avg + t.Logf("Distribution 8/2/1/0 → %v after %d moves (imbalance=%.1f%%)", + effective, len(tasks), imbalance*100) +}