Files
seaweedFS/weed/pb/worker.proto
Chris Lu a838661b83 feat(plugin): EC shard balance handler for plugin worker (#8629)
* feat(ec_balance): add TaskTypeECBalance constant and protobuf definitions

Add the ec_balance task type constant to both topology and worker type
systems. Define EcBalanceTaskParams, EcShardMoveSpec, and
EcBalanceTaskConfig protobuf messages for EC shard balance operations.

* feat(ec_balance): add configuration for EC shard balance task

Config includes imbalance threshold, min server count, collection
filter, disk type, and preferred tags for tag-aware placement.

* feat(ec_balance): add multi-phase EC shard balance detection algorithm

Implements four detection phases adapted from the ec.balance shell
command:
1. Duplicate shard detection and removal proposals
2. Cross-rack shard distribution balancing
3. Within-rack node-level shard balancing
4. Global shard count equalization across nodes

Detection is side-effect-free: it builds an EC topology view from
ActiveTopology and generates move proposals without executing them.

* feat(ec_balance): add EC shard move task execution

Implements the shard move sequence using the same VolumeEcShardsCopy,
VolumeEcShardsMount, VolumeEcShardsUnmount, and VolumeEcShardsDelete
RPCs as the shell ec.balance command. Supports both regular shard
moves and dedup-phase deletions (unmount+delete without copy).

* feat(ec_balance): add task registration and scheduling

Register EC balance task definition with auto-config update support.
Scheduling respects max concurrent limits and worker capabilities.

* feat(ec_balance): add plugin handler for EC shard balance

Implements the full plugin handler with detection, execution, admin
and worker config forms, proposal building, and decision trace
reporting. Supports collection/DC/disk type filtering, preferred tag
placement, and configurable detection intervals. Auto-registered via
init() with the handler registry.

* test(ec_balance): add tests for detection algorithm and plugin handler

Detection tests cover: duplicate shard detection, cross-rack imbalance,
within-rack imbalance, global rebalancing, topology building, collection
filtering, and edge cases. Handler tests cover: config derivation with
clamping, proposal building, protobuf encode/decode round-trip, fallback
parameter decoding, capability, and config policy round-trip.

* fix(ec_balance): address PR review feedback and fix CI test failure

- Update TestWorkerDefaultJobTypes to expect 6 handlers (was 5)
- Extract threshold constants (ecBalanceMinImbalanceThreshold, etc.)
  to eliminate magic numbers in Descriptor and config derivation
- Remove duplicate ShardIdsToUint32 helper (use erasure_coding package)
- Add bounds checks for int64→int/uint32 conversions to fix CodeQL
  integer conversion warnings

* fix(ec_balance): address code review findings

storage_impact.go:
- Add TaskTypeECBalance case returning shard-level reservation
  (ShardSlots: -1/+1) instead of falling through to default which
  incorrectly reserves a full volume slot on target.

detection.go:
- Use dc:rack composite key to avoid cross-DC rack name collisions.
  Only create rack entries after confirming node has matching disks.
- Add exceedsImbalanceThreshold check to cross-rack, within-rack,
  and global phases so trivial skews below the configured threshold
  are ignored. Dedup phase always runs since duplicates are errors.
- Reserve destination capacity after each planned move (decrement
  destNode.freeSlots, update rackShardCount/nodeShardCount) to
  prevent overbooking the same destination.
- Skip nodes with freeSlots <= 0 when selecting minNode in global
  balance to avoid proposing moves to full nodes.
- Include loop index and source/target node IDs in TaskID to
  guarantee uniqueness across moves with the same volumeID/shardID.

ec_balance_handler.go:
- Fail fast with error when shard_id is absent in fallback parameter
  decoding instead of silently defaulting to shard 0.

ec_balance_task.go:
- Delegate GetProgress() to BaseTask.GetProgress() so progress
  updates from ReportProgressWithStage are visible to callers.
- Add fail-fast guard rejecting multiple sources/targets until
  batch execution is implemented.

Findings verified but not changed (matches existing codebase pattern
in vacuum/balance/erasure_coding handlers):
- register.go globalTaskDef.Config race: same unsynchronized pattern
  in all 4 task packages.
- CreateTask using generated ID: same fmt.Sprintf pattern in all 4
  task packages.

* fix(ec_balance): harden parameter decoding, progress tracking, and validation

ec_balance_handler.go (decodeECBalanceTaskParams):
- Validate execution-critical fields (Sources[0].Node, ShardIds,
  Targets[0].Node, ShardIds) after protobuf deserialization.
- Require source_disk_id and target_disk_id in legacy fallback path
  so Targets[0].DiskId is populated for VolumeEcShardsCopyRequest.
- All error messages reference decodeECBalanceTaskParams and the
  specific missing field (TaskParams, shard_id, Targets[0].DiskId,
  EcBalanceTaskParams) for debuggability.

ec_balance_task.go:
- Track progress in ECBalanceTask.progress field, updated via
  reportProgress() helper called before ReportProgressWithStage(),
  so GetProgress() returns real stage progress instead of stale 0.
- Validate: require exactly 1 source and 1 target (mirrors Execute
  guard), require ShardIds on both, with error messages referencing
  ECBalanceTask.Validate and the specific field.

* fix(ec_balance): fix dedup execution path, stale topology, collection filter, timeout, and dedupeKey

detection.go:
- Dedup moves now set target=source so isDedupPhase() triggers the
  unmount+delete-only execution path instead of attempting a copy.
- Apply moves to in-memory topology between phases via
  applyMovesToTopology() so subsequent phases see updated shard
  placement and don't conflict with already-planned moves.
- detectGlobalImbalance now accepts allowedVids and filters both
  shard counting and shard selection to respect CollectionFilter.

ec_balance_task.go:
- Apply EcBalanceTaskParams.TimeoutSeconds to the context via
  context.WithTimeout so all RPC operations respect the configured
  timeout instead of hanging indefinitely.

ec_balance_handler.go:
- Include source node ID in dedupeKey so dedup deletions from
  different source nodes for the same shard aren't collapsed.
- Clamp minServerCountRaw and minIntervalRaw lower bounds on int64
  before narrowing to int, preventing undefined overflow on 32-bit.

* fix(ec_balance): log warning before cancelling on progress send failure

Log the error, job ID, job type, progress percentage, and stage
before calling execCancel() in the progress callback so failed
progress sends are diagnosable instead of silently cancelling.
2026-03-14 21:34:53 -07:00

441 lines
15 KiB
Protocol Buffer

syntax = "proto3";
package worker_pb;
option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb";
// WorkerService provides bidirectional communication between admin and worker
service WorkerService {
// WorkerStream maintains a bidirectional stream for worker communication
rpc WorkerStream(stream WorkerMessage) returns (stream AdminMessage);
}
// WorkerMessage represents messages from worker to admin
message WorkerMessage {
string worker_id = 1;
int64 timestamp = 2;
oneof message {
WorkerRegistration registration = 3;
WorkerHeartbeat heartbeat = 4;
TaskRequest task_request = 5;
TaskUpdate task_update = 6;
TaskComplete task_complete = 7;
WorkerShutdown shutdown = 8;
TaskLogResponse task_log_response = 9;
}
}
// AdminMessage represents messages from admin to worker
message AdminMessage {
string admin_id = 1;
int64 timestamp = 2;
oneof message {
RegistrationResponse registration_response = 3;
HeartbeatResponse heartbeat_response = 4;
TaskAssignment task_assignment = 5;
TaskCancellation task_cancellation = 6;
AdminShutdown admin_shutdown = 7;
TaskLogRequest task_log_request = 8;
}
}
// WorkerRegistration message when worker connects
message WorkerRegistration {
string worker_id = 1;
string address = 2;
repeated string capabilities = 3;
int32 max_concurrent = 4;
map<string, string> metadata = 5;
}
// RegistrationResponse confirms worker registration
message RegistrationResponse {
bool success = 1;
string message = 2;
string assigned_worker_id = 3;
}
// WorkerHeartbeat sent periodically by worker
message WorkerHeartbeat {
string worker_id = 1;
string status = 2;
int32 current_load = 3;
int32 max_concurrent = 4;
repeated string current_task_ids = 5;
int32 tasks_completed = 6;
int32 tasks_failed = 7;
int64 uptime_seconds = 8;
}
// HeartbeatResponse acknowledges heartbeat
message HeartbeatResponse {
bool success = 1;
string message = 2;
}
// TaskRequest from worker asking for new tasks
message TaskRequest {
string worker_id = 1;
repeated string capabilities = 2;
int32 available_slots = 3;
}
// TaskAssignment from admin to worker
message TaskAssignment {
string task_id = 1;
string task_type = 2;
TaskParams params = 3;
int32 priority = 4;
int64 created_time = 5;
map<string, string> metadata = 6;
}
// TaskParams contains task-specific parameters with typed variants
message TaskParams {
string task_id = 1; // ActiveTopology task ID for lifecycle management
uint32 volume_id = 2; // Primary volume ID for the task
string collection = 3; // Collection name
string data_center = 4; // Primary data center
string rack = 5; // Primary rack
uint64 volume_size = 6; // Original volume size in bytes for tracking size changes
// Unified source and target arrays for all task types
repeated TaskSource sources = 7; // Source locations (volume replicas, EC shards, etc.)
repeated TaskTarget targets = 8; // Target locations (destinations, new replicas, etc.)
// Typed task parameters
oneof task_params {
VacuumTaskParams vacuum_params = 9;
ErasureCodingTaskParams erasure_coding_params = 10;
BalanceTaskParams balance_params = 11;
ReplicationTaskParams replication_params = 12;
EcBalanceTaskParams ec_balance_params = 13;
}
}
// VacuumTaskParams for vacuum operations
message VacuumTaskParams {
double garbage_threshold = 1; // Minimum garbage ratio to trigger vacuum
bool force_vacuum = 2; // Force vacuum even if below threshold
int32 batch_size = 3; // Number of files to process per batch
string working_dir = 4; // Working directory for temporary files
bool verify_checksum = 5; // Verify file checksums during vacuum
}
// ErasureCodingTaskParams for EC encoding operations
message ErasureCodingTaskParams {
uint64 estimated_shard_size = 1; // Estimated size per shard
int32 data_shards = 2; // Number of data shards (default: 10)
int32 parity_shards = 3; // Number of parity shards (default: 4)
string working_dir = 4; // Working directory for EC processing
string master_client = 5; // Master server address
bool cleanup_source = 6; // Whether to cleanup source volume after EC
}
// TaskSource represents a unified source location for any task type
message TaskSource {
string node = 1; // Source server address
uint32 disk_id = 2; // Source disk ID
string rack = 3; // Source rack for tracking
string data_center = 4; // Source data center for tracking
uint32 volume_id = 5; // Volume ID (for volume operations)
repeated uint32 shard_ids = 6; // Shard IDs (for EC shard operations)
uint64 estimated_size = 7; // Estimated size to be processed
}
// TaskTarget represents a unified target location for any task type
message TaskTarget {
string node = 1; // Target server address
uint32 disk_id = 2; // Target disk ID
string rack = 3; // Target rack for tracking
string data_center = 4; // Target data center for tracking
uint32 volume_id = 5; // Volume ID (for volume operations)
repeated uint32 shard_ids = 6; // Shard IDs (for EC shard operations)
uint64 estimated_size = 7; // Estimated size to be created
}
// BalanceMoveSpec describes a single volume move within a batch balance job
message BalanceMoveSpec {
uint32 volume_id = 1; // Volume to move
string source_node = 2; // Source server address (host:port)
string target_node = 3; // Destination server address (host:port)
string collection = 4; // Collection name
uint64 volume_size = 5; // Volume size in bytes (informational)
}
// BalanceTaskParams for volume balancing operations
message BalanceTaskParams {
bool force_move = 1; // Force move even with conflicts
int32 timeout_seconds = 2; // Operation timeout
int32 max_concurrent_moves = 3; // Max concurrent moves in a batch job (0 = default 5)
repeated BalanceMoveSpec moves = 4; // Batch: multiple volume moves in one job
}
// ReplicationTaskParams for adding replicas
message ReplicationTaskParams {
int32 replica_count = 1; // Target replica count
bool verify_consistency = 2; // Verify replica consistency after creation
}
// TaskUpdate reports task progress
message TaskUpdate {
string task_id = 1;
string worker_id = 2;
string status = 3;
float progress = 4;
string message = 5;
map<string, string> metadata = 6;
}
// TaskComplete reports task completion
message TaskComplete {
string task_id = 1;
string worker_id = 2;
bool success = 3;
string error_message = 4;
int64 completion_time = 5;
map<string, string> result_metadata = 6;
}
// TaskCancellation from admin to cancel a task
message TaskCancellation {
string task_id = 1;
string reason = 2;
bool force = 3;
}
// WorkerShutdown notifies admin that worker is shutting down
message WorkerShutdown {
string worker_id = 1;
string reason = 2;
repeated string pending_task_ids = 3;
}
// AdminShutdown notifies worker that admin is shutting down
message AdminShutdown {
string reason = 1;
int32 graceful_shutdown_seconds = 2;
}
// ========== Task Log Messages ==========
// TaskLogRequest requests logs for a specific task
message TaskLogRequest {
string task_id = 1;
string worker_id = 2;
bool include_metadata = 3; // Include task metadata
int32 max_entries = 4; // Maximum number of log entries (0 = all)
string log_level = 5; // Filter by log level (INFO, WARNING, ERROR, DEBUG)
int64 start_time = 6; // Unix timestamp for start time filter
int64 end_time = 7; // Unix timestamp for end time filter
}
// TaskLogResponse returns task logs and metadata
message TaskLogResponse {
string task_id = 1;
string worker_id = 2;
bool success = 3;
string error_message = 4;
TaskLogMetadata metadata = 5;
repeated TaskLogEntry log_entries = 6;
}
// TaskLogMetadata contains metadata about task execution
message TaskLogMetadata {
string task_id = 1;
string task_type = 2;
string worker_id = 3;
int64 start_time = 4;
int64 end_time = 5;
int64 duration_ms = 6;
string status = 7;
float progress = 8;
uint32 volume_id = 9;
string server = 10;
string collection = 11;
string log_file_path = 12;
int64 created_at = 13;
map<string, string> custom_data = 14;
}
// TaskLogEntry represents a single log entry
message TaskLogEntry {
int64 timestamp = 1;
string level = 2;
string message = 3;
map<string, string> fields = 4;
float progress = 5;
string status = 6;
}
// ========== Maintenance Configuration Messages ==========
// MaintenanceConfig holds configuration for the maintenance system
message MaintenanceConfig {
bool enabled = 1;
int32 scan_interval_seconds = 2; // How often to scan for maintenance needs
int32 worker_timeout_seconds = 3; // Worker heartbeat timeout
int32 task_timeout_seconds = 4; // Individual task timeout
int32 retry_delay_seconds = 5; // Delay between retries
int32 max_retries = 6; // Default max retries for tasks
int32 cleanup_interval_seconds = 7; // How often to clean up old tasks
int32 task_retention_seconds = 8; // How long to keep completed/failed tasks
MaintenancePolicy policy = 9;
}
// MaintenancePolicy defines policies for maintenance operations
message MaintenancePolicy {
map<string, TaskPolicy> task_policies = 1; // Task type -> policy mapping
int32 global_max_concurrent = 2; // Overall limit across all task types
int32 default_repeat_interval_seconds = 3; // Default seconds if task doesn't specify
int32 default_check_interval_seconds = 4; // Default seconds for periodic checks
}
// TaskPolicy represents configuration for a specific task type
message TaskPolicy {
bool enabled = 1;
int32 max_concurrent = 2;
int32 repeat_interval_seconds = 3; // Seconds to wait before repeating
int32 check_interval_seconds = 4; // Seconds between checks
// Typed task-specific configuration (replaces generic map)
oneof task_config {
VacuumTaskConfig vacuum_config = 5;
ErasureCodingTaskConfig erasure_coding_config = 6;
BalanceTaskConfig balance_config = 7;
ReplicationTaskConfig replication_config = 8;
EcBalanceTaskConfig ec_balance_config = 9;
}
}
// Task-specific configuration messages
// VacuumTaskConfig contains vacuum-specific configuration
message VacuumTaskConfig {
double garbage_threshold = 1; // Minimum garbage ratio to trigger vacuum (0.0-1.0)
int32 min_volume_age_hours = 2; // Minimum age before vacuum is considered
int32 min_interval_seconds = 3; // Minimum time between vacuum operations on the same volume
}
// ErasureCodingTaskConfig contains EC-specific configuration
message ErasureCodingTaskConfig {
double fullness_ratio = 1; // Minimum fullness ratio to trigger EC (0.0-1.0)
int32 quiet_for_seconds = 2; // Minimum quiet time before EC
int32 min_volume_size_mb = 3; // Minimum volume size for EC
string collection_filter = 4; // Only process volumes from specific collections
repeated string preferred_tags = 5; // Disk tags to prioritize for EC shard placement
}
// BalanceTaskConfig contains balance-specific configuration
message BalanceTaskConfig {
double imbalance_threshold = 1; // Threshold for triggering rebalancing (0.0-1.0)
int32 min_server_count = 2; // Minimum number of servers required for balancing
}
// ReplicationTaskConfig contains replication-specific configuration
message ReplicationTaskConfig {
int32 target_replica_count = 1; // Target number of replicas
}
// EcBalanceTaskParams for EC shard balancing operations
message EcBalanceTaskParams {
string disk_type = 1; // Disk type filter (hdd, ssd, "")
int32 max_parallelization = 2; // Max parallel shard moves within a batch
int32 timeout_seconds = 3; // Operation timeout per move
repeated EcShardMoveSpec moves = 4; // Batch: multiple shard moves in one job
}
// EcShardMoveSpec describes a single EC shard move within a batch
message EcShardMoveSpec {
uint32 volume_id = 1; // EC volume ID
uint32 shard_id = 2; // Shard ID (0-13)
string collection = 3; // Collection name
string source_node = 4; // Source server address
uint32 source_disk_id = 5; // Source disk ID
string target_node = 6; // Target server address
uint32 target_disk_id = 7; // Target disk ID
}
// EcBalanceTaskConfig contains EC balance-specific configuration
message EcBalanceTaskConfig {
double imbalance_threshold = 1; // Threshold for triggering EC shard rebalancing
int32 min_server_count = 2; // Minimum number of servers required
string collection_filter = 3; // Collection filter
string disk_type = 4; // Disk type filter
repeated string preferred_tags = 5; // Preferred disk tags for placement
}
// ========== Task Persistence Messages ==========
// MaintenanceTaskData represents complete task state for persistence
message MaintenanceTaskData {
string id = 1;
string type = 2;
string priority = 3;
string status = 4;
uint32 volume_id = 5;
string server = 6;
string collection = 7;
TaskParams typed_params = 8;
string reason = 9;
int64 created_at = 10;
int64 scheduled_at = 11;
int64 started_at = 12;
int64 completed_at = 13;
string worker_id = 14;
string error = 15;
double progress = 16;
int32 retry_count = 17;
int32 max_retries = 18;
// Enhanced fields for detailed task tracking
string created_by = 19;
string creation_context = 20;
repeated TaskAssignmentRecord assignment_history = 21;
string detailed_reason = 22;
map<string, string> tags = 23;
TaskCreationMetrics creation_metrics = 24;
}
// TaskAssignmentRecord tracks worker assignments for a task
message TaskAssignmentRecord {
string worker_id = 1;
string worker_address = 2;
int64 assigned_at = 3;
int64 unassigned_at = 4; // Optional: when worker was unassigned
string reason = 5; // Reason for assignment/unassignment
}
// TaskCreationMetrics tracks why and how a task was created
message TaskCreationMetrics {
string trigger_metric = 1; // Name of metric that triggered creation
double metric_value = 2; // Value that triggered creation
double threshold = 3; // Threshold that was exceeded
VolumeHealthMetrics volume_metrics = 4; // Volume health at creation time
map<string, string> additional_data = 5; // Additional context data
}
// VolumeHealthMetrics captures volume state at task creation
message VolumeHealthMetrics {
uint64 total_size = 1;
uint64 used_size = 2;
uint64 garbage_size = 3;
double garbage_ratio = 4;
int32 file_count = 5;
int32 deleted_file_count = 6;
int64 last_modified = 7;
int32 replica_count = 8;
bool is_ec_volume = 9;
string collection = 10;
}
// TaskStateFile wraps task data with metadata for persistence
message TaskStateFile {
MaintenanceTaskData task = 1;
int64 last_updated = 2;
string admin_version = 3;
}