Files
seaweedFS/weed/worker/worker.go
Mariano Ntrougkas f06ddd05cc Improve-worker (#7367)
* ♻️ refactor(worker): remove goto

* ♻️ refactor(worker): let manager loop exit by itself

* ♻️ refactor(worker): fix race condition when closing worker

CloseSend is not safe to call when another
goroutine concurrently calls Send. streamCancel
already handles proper stream closure. Also,
streamExit signal should be called AFTER
sending shutdownMsg

Now the worker has no race condition if stopped
during any moment (hopefully, tested with -race
flag)

* 🐛 fix(task_logger): deadlock in log closure

* 🐛 fix(balance): fix balance task

Removes the outdated "UnloadVolume" step as it is handled by "DeleteVolume".

#7346
2025-10-23 17:09:46 -07:00

1049 lines
32 KiB
Go

package worker
import (
"context"
"crypto/rand"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
// Import task packages to trigger their auto-registration
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
)
// Worker represents a maintenance worker instance
type Worker struct {
id string
config *types.WorkerConfig
registry *tasks.TaskRegistry
cmds chan workerCommand
state *workerState
taskLogHandler *tasks.TaskLogHandler
}
type workerState struct {
running bool
adminClient AdminClient
startTime time.Time
stopChan chan struct{}
heartbeatTicker *time.Ticker
requestTicker *time.Ticker
currentTasks map[string]*types.TaskInput
tasksCompleted int
tasksFailed int
}
type workerAction string
const (
ActionStart workerAction = "start"
ActionStop workerAction = "stop"
ActionGetStatus workerAction = "getstatus"
ActionGetTaskLoad workerAction = "getload"
ActionSetTask workerAction = "settask"
ActionSetAdmin workerAction = "setadmin"
ActionRemoveTask workerAction = "removetask"
ActionGetAdmin workerAction = "getadmin"
ActionIncTaskFail workerAction = "inctaskfail"
ActionIncTaskComplete workerAction = "inctaskcomplete"
ActionGetHbTick workerAction = "gethbtick"
ActionGetReqTick workerAction = "getreqtick"
ActionGetStopChan workerAction = "getstopchan"
ActionSetHbTick workerAction = "sethbtick"
ActionSetReqTick workerAction = "setreqtick"
ActionGetStartTime workerAction = "getstarttime"
ActionGetCompletedTasks workerAction = "getcompletedtasks"
ActionGetFailedTasks workerAction = "getfailedtasks"
ActionCancelTask workerAction = "canceltask"
// ... other worker actions like Stop, Status, etc.
)
type statusResponse chan types.WorkerStatus
type workerCommand struct {
action workerAction
data any
resp chan error // for reporting success/failure
}
// AdminClient defines the interface for communicating with the admin server
type AdminClient interface {
Connect() error
Disconnect() error
RegisterWorker(worker *types.WorkerData) error
SendHeartbeat(workerID string, status *types.WorkerStatus) error
RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error)
CompleteTask(taskID string, success bool, errorMsg string) error
CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error
UpdateTaskProgress(taskID string, progress float64) error
IsConnected() bool
}
// GenerateOrLoadWorkerID generates a unique worker ID or loads existing one from working directory
func GenerateOrLoadWorkerID(workingDir string) (string, error) {
const workerIDFile = "worker.id"
var idFilePath string
if workingDir != "" {
idFilePath = filepath.Join(workingDir, workerIDFile)
} else {
// Use current working directory if none specified
wd, err := os.Getwd()
if err != nil {
return "", fmt.Errorf("failed to get working directory: %w", err)
}
idFilePath = filepath.Join(wd, workerIDFile)
}
// Try to read existing worker ID
if data, err := os.ReadFile(idFilePath); err == nil {
workerID := strings.TrimSpace(string(data))
if workerID != "" {
glog.Infof("Loaded existing worker ID from %s: %s", idFilePath, workerID)
return workerID, nil
}
}
// Generate simplified worker ID
hostname, _ := os.Hostname()
if hostname == "" {
hostname = "unknown"
}
// Use short hostname - take first 6 chars or last part after dots
shortHostname := hostname
if len(hostname) > 6 {
if parts := strings.Split(hostname, "."); len(parts) > 1 {
// Use last part before domain (e.g., "worker1" from "worker1.example.com")
shortHostname = parts[0]
if len(shortHostname) > 6 {
shortHostname = shortHostname[:6]
}
} else {
// Use first 6 characters
shortHostname = hostname[:6]
}
}
// Generate random component for uniqueness (2 bytes = 4 hex chars)
randomBytes := make([]byte, 2)
var workerID string
if _, err := rand.Read(randomBytes); err != nil {
// Fallback to short timestamp if crypto/rand fails
timestamp := time.Now().Unix() % 10000 // last 4 digits
workerID = fmt.Sprintf("w-%s-%04d", shortHostname, timestamp)
glog.Infof("Generated fallback worker ID: %s", workerID)
} else {
// Use random hex for uniqueness
randomHex := fmt.Sprintf("%x", randomBytes)
workerID = fmt.Sprintf("w-%s-%s", shortHostname, randomHex)
glog.Infof("Generated new worker ID: %s", workerID)
}
// Save worker ID to file
if err := os.WriteFile(idFilePath, []byte(workerID), 0644); err != nil {
glog.Warningf("Failed to save worker ID to %s: %v", idFilePath, err)
} else {
glog.Infof("Saved worker ID to %s", idFilePath)
}
return workerID, nil
}
// NewWorker creates a new worker instance
func NewWorker(config *types.WorkerConfig) (*Worker, error) {
if config == nil {
config = types.DefaultWorkerConfig()
}
// Generate or load persistent worker ID
workerID, err := GenerateOrLoadWorkerID(config.BaseWorkingDir)
if err != nil {
return nil, fmt.Errorf("failed to generate or load worker ID: %w", err)
}
// Use the global unified registry that already has all tasks registered
registry := tasks.GetGlobalTaskRegistry()
// Initialize task log handler
logDir := filepath.Join(config.BaseWorkingDir, "task_logs")
// Ensure the base task log directory exists to avoid errors when admin requests logs
if err := os.MkdirAll(logDir, 0755); err != nil {
glog.Warningf("Failed to create task log base directory %s: %v", logDir, err)
}
taskLogHandler := tasks.NewTaskLogHandler(logDir)
worker := &Worker{
id: workerID,
config: config,
registry: registry,
taskLogHandler: taskLogHandler,
cmds: make(chan workerCommand),
}
glog.V(1).Infof("Worker created with %d registered task types", len(registry.GetAll()))
go worker.managerLoop()
return worker, nil
}
func (w *Worker) managerLoop() {
w.state = &workerState{
startTime: time.Now(),
stopChan: make(chan struct{}),
currentTasks: make(map[string]*types.TaskInput),
}
out:
for cmd := range w.cmds {
switch cmd.action {
case ActionStart:
w.handleStart(cmd)
case ActionStop:
w.handleStop(cmd)
break out
case ActionGetStatus:
respCh := cmd.data.(statusResponse)
var currentTasks []types.TaskInput
for _, task := range w.state.currentTasks {
currentTasks = append(currentTasks, *task)
}
statusStr := "active"
if len(w.state.currentTasks) >= w.config.MaxConcurrent {
statusStr = "busy"
}
status := types.WorkerStatus{
WorkerID: w.id,
Status: statusStr,
Capabilities: w.config.Capabilities,
MaxConcurrent: w.config.MaxConcurrent,
CurrentLoad: len(w.state.currentTasks),
LastHeartbeat: time.Now(),
CurrentTasks: currentTasks,
Uptime: time.Since(w.state.startTime),
TasksCompleted: w.state.tasksCompleted,
TasksFailed: w.state.tasksFailed,
}
respCh <- status
case ActionGetTaskLoad:
respCh := cmd.data.(chan int)
respCh <- len(w.state.currentTasks)
case ActionSetTask:
currentLoad := len(w.state.currentTasks)
if currentLoad >= w.config.MaxConcurrent {
cmd.resp <- fmt.Errorf("worker is at capacity")
}
task := cmd.data.(*types.TaskInput)
w.state.currentTasks[task.ID] = task
cmd.resp <- nil
case ActionSetAdmin:
admin := cmd.data.(AdminClient)
w.state.adminClient = admin
case ActionRemoveTask:
taskID := cmd.data.(string)
delete(w.state.currentTasks, taskID)
case ActionGetAdmin:
respCh := cmd.data.(chan AdminClient)
respCh <- w.state.adminClient
case ActionIncTaskFail:
w.state.tasksFailed++
case ActionIncTaskComplete:
w.state.tasksCompleted++
case ActionGetHbTick:
respCh := cmd.data.(chan *time.Ticker)
respCh <- w.state.heartbeatTicker
case ActionGetReqTick:
respCh := cmd.data.(chan *time.Ticker)
respCh <- w.state.requestTicker
case ActionSetHbTick:
w.state.heartbeatTicker = cmd.data.(*time.Ticker)
case ActionSetReqTick:
w.state.requestTicker = cmd.data.(*time.Ticker)
case ActionGetStopChan:
cmd.data.(chan chan struct{}) <- w.state.stopChan
case ActionGetStartTime:
cmd.data.(chan time.Time) <- w.state.startTime
case ActionGetCompletedTasks:
cmd.data.(chan int) <- w.state.tasksCompleted
case ActionGetFailedTasks:
cmd.data.(chan int) <- w.state.tasksFailed
case ActionCancelTask:
taskID := cmd.data.(string)
if task, exists := w.state.currentTasks[taskID]; exists {
glog.Infof("Cancelling task %s", task.ID)
// TODO: Implement actual task cancellation logic
} else {
glog.Warningf("Cannot cancel task %s: task not found", taskID)
}
}
}
}
func (w *Worker) getTaskLoad() int {
respCh := make(chan int, 1)
w.cmds <- workerCommand{
action: ActionGetTaskLoad,
data: respCh,
resp: nil,
}
return <-respCh
}
func (w *Worker) setTask(task *types.TaskInput) error {
resp := make(chan error)
w.cmds <- workerCommand{
action: ActionSetTask,
data: task,
resp: resp,
}
if err := <-resp; err != nil {
glog.Errorf("TASK REJECTED: Worker %s at capacity (%d/%d) - rejecting task %s",
w.id, w.getTaskLoad(), w.config.MaxConcurrent, task.ID)
return err
}
newLoad := w.getTaskLoad()
glog.Infof("TASK ACCEPTED: Worker %s accepted task %s - current load: %d/%d",
w.id, task.ID, newLoad, w.config.MaxConcurrent)
return nil
}
func (w *Worker) removeTask(task *types.TaskInput) int {
w.cmds <- workerCommand{
action: ActionRemoveTask,
data: task.ID,
}
return w.getTaskLoad()
}
func (w *Worker) getAdmin() AdminClient {
respCh := make(chan AdminClient, 1)
w.cmds <- workerCommand{
action: ActionGetAdmin,
data: respCh,
}
return <-respCh
}
func (w *Worker) getStopChan() chan struct{} {
respCh := make(chan chan struct{}, 1)
w.cmds <- workerCommand{
action: ActionGetStopChan,
data: respCh,
}
return <-respCh
}
func (w *Worker) getHbTick() *time.Ticker {
respCh := make(chan *time.Ticker, 1)
w.cmds <- workerCommand{
action: ActionGetHbTick,
data: respCh,
}
return <-respCh
}
func (w *Worker) getReqTick() *time.Ticker {
respCh := make(chan *time.Ticker, 1)
w.cmds <- workerCommand{
action: ActionGetReqTick,
data: respCh,
}
return <-respCh
}
func (w *Worker) setHbTick(tick *time.Ticker) *time.Ticker {
w.cmds <- workerCommand{
action: ActionSetHbTick,
data: tick,
}
return w.getHbTick()
}
func (w *Worker) setReqTick(tick *time.Ticker) *time.Ticker {
w.cmds <- workerCommand{
action: ActionSetReqTick,
data: tick,
}
return w.getReqTick()
}
func (w *Worker) getStartTime() time.Time {
respCh := make(chan time.Time, 1)
w.cmds <- workerCommand{
action: ActionGetStartTime,
data: respCh,
}
return <-respCh
}
func (w *Worker) getCompletedTasks() int {
respCh := make(chan int, 1)
w.cmds <- workerCommand{
action: ActionGetCompletedTasks,
data: respCh,
}
return <-respCh
}
func (w *Worker) getFailedTasks() int {
respCh := make(chan int, 1)
w.cmds <- workerCommand{
action: ActionGetFailedTasks,
data: respCh,
}
return <-respCh
}
// getTaskLoggerConfig returns the task logger configuration with worker's log directory
func (w *Worker) getTaskLoggerConfig() tasks.TaskLoggerConfig {
config := tasks.DefaultTaskLoggerConfig()
// Use worker's configured log directory (BaseWorkingDir is guaranteed to be non-empty)
logDir := filepath.Join(w.config.BaseWorkingDir, "task_logs")
config.BaseLogDir = logDir
return config
}
// ID returns the worker ID
func (w *Worker) ID() string {
return w.id
}
func (w *Worker) Start() error {
resp := make(chan error)
w.cmds <- workerCommand{
action: ActionStart,
resp: resp,
}
return <-resp
}
// Start starts the worker
func (w *Worker) handleStart(cmd workerCommand) {
if w.state.running {
cmd.resp <- fmt.Errorf("worker is already running")
return
}
if w.state.adminClient == nil {
cmd.resp <- fmt.Errorf("admin client is not set")
return
}
w.state.running = true
w.state.startTime = time.Now()
// Prepare worker info for registration
workerInfo := &types.WorkerData{
ID: w.id,
Capabilities: w.config.Capabilities,
MaxConcurrent: w.config.MaxConcurrent,
Status: "active",
CurrentLoad: 0,
LastHeartbeat: time.Now(),
}
// Register worker info with client first (this stores it for use during connection)
if err := w.state.adminClient.RegisterWorker(workerInfo); err != nil {
glog.V(1).Infof("Worker info stored for registration: %v", err)
// This is expected if not connected yet
}
// Start connection attempt (will register immediately if successful)
glog.Infof("WORKER STARTING: Worker %s starting with capabilities %v, max concurrent: %d",
w.id, w.config.Capabilities, w.config.MaxConcurrent)
// Try initial connection, but don't fail if it doesn't work immediately
if err := w.state.adminClient.Connect(); err != nil {
glog.Warningf("INITIAL CONNECTION FAILED: Worker %s initial connection to admin server failed, will keep retrying: %v", w.id, err)
// Don't return error - let the reconnection loop handle it
} else {
glog.Infof("INITIAL CONNECTION SUCCESS: Worker %s successfully connected to admin server", w.id)
}
// Start worker loops regardless of initial connection status
// They will handle connection failures gracefully
glog.V(1).Infof("STARTING LOOPS: Worker %s starting background loops", w.id)
go w.heartbeatLoop()
go w.taskRequestLoop()
go w.connectionMonitorLoop()
go w.messageProcessingLoop()
glog.Infof("WORKER STARTED: Worker %s started successfully (connection attempts will continue in background)", w.id)
cmd.resp <- nil
}
func (w *Worker) Stop() error {
resp := make(chan error)
w.cmds <- workerCommand{
action: ActionStop,
resp: resp,
}
if err := <-resp; err != nil {
return err
}
// Wait for tasks to finish
timeout := time.NewTimer(30 * time.Second)
defer timeout.Stop()
out:
for w.getTaskLoad() > 0 {
select {
case <-timeout.C:
glog.Warningf("Worker %s stopping with %d tasks still running", w.id, w.getTaskLoad())
break out
case <-time.After(100 * time.Millisecond):
}
}
// Disconnect from admin server
if adminClient := w.getAdmin(); adminClient != nil {
if err := adminClient.Disconnect(); err != nil {
glog.Errorf("Error disconnecting from admin server: %v", err)
}
}
glog.Infof("Worker %s stopped", w.id)
return nil
}
// Stop stops the worker
func (w *Worker) handleStop(cmd workerCommand) {
if !w.state.running {
cmd.resp <- nil
return
}
w.state.running = false
close(w.state.stopChan)
// Stop tickers
if w.state.heartbeatTicker != nil {
w.state.heartbeatTicker.Stop()
}
if w.state.requestTicker != nil {
w.state.requestTicker.Stop()
}
cmd.resp <- nil
}
// RegisterTask registers a task factory
func (w *Worker) RegisterTask(taskType types.TaskType, factory types.TaskFactory) {
w.registry.Register(taskType, factory)
}
// GetCapabilities returns the worker capabilities
func (w *Worker) GetCapabilities() []types.TaskType {
return w.config.Capabilities
}
// GetStatus returns the current worker status
func (w *Worker) GetStatus() types.WorkerStatus {
respCh := make(statusResponse, 1)
w.cmds <- workerCommand{
action: ActionGetStatus,
data: respCh,
resp: nil,
}
return <-respCh
}
// HandleTask handles a task execution
func (w *Worker) HandleTask(task *types.TaskInput) error {
glog.V(1).Infof("Worker %s received task %s (type: %s, volume: %d)",
w.id, task.ID, task.Type, task.VolumeID)
if err := w.setTask(task); err != nil {
return err
}
// Execute task in goroutine
go w.executeTask(task)
return nil
}
// SetCapabilities sets the worker capabilities
func (w *Worker) SetCapabilities(capabilities []types.TaskType) {
w.config.Capabilities = capabilities
}
// SetMaxConcurrent sets the maximum concurrent tasks
func (w *Worker) SetMaxConcurrent(max int) {
w.config.MaxConcurrent = max
}
// SetHeartbeatInterval sets the heartbeat interval
func (w *Worker) SetHeartbeatInterval(interval time.Duration) {
w.config.HeartbeatInterval = interval
}
// SetTaskRequestInterval sets the task request interval
func (w *Worker) SetTaskRequestInterval(interval time.Duration) {
w.config.TaskRequestInterval = interval
}
// SetAdminClient sets the admin client
func (w *Worker) SetAdminClient(client AdminClient) {
w.cmds <- workerCommand{
action: ActionSetAdmin,
data: client,
}
}
// executeTask executes a task
func (w *Worker) executeTask(task *types.TaskInput) {
startTime := time.Now()
defer func() {
currentLoad := w.removeTask(task)
duration := time.Since(startTime)
glog.Infof("TASK EXECUTION FINISHED: Worker %s finished executing task %s after %v - current load: %d/%d",
w.id, task.ID, duration, currentLoad, w.config.MaxConcurrent)
}()
glog.Infof("TASK EXECUTION STARTED: Worker %s starting execution of task %s (type: %s, volume: %d, server: %s, collection: %s) at %v",
w.id, task.ID, task.Type, task.VolumeID, task.Server, task.Collection, startTime.Format(time.RFC3339))
// Report task start to admin server
if err := w.getAdmin().UpdateTaskProgress(task.ID, 0.0); err != nil {
glog.V(1).Infof("Failed to report task start to admin: %v", err)
}
// Determine task-specific working directory (BaseWorkingDir is guaranteed to be non-empty)
taskWorkingDir := filepath.Join(w.config.BaseWorkingDir, string(task.Type))
glog.V(2).Infof("WORKING DIRECTORY: Task %s using working directory: %s", task.ID, taskWorkingDir)
// Check if we have typed protobuf parameters
if task.TypedParams == nil {
w.completeTask(task.ID, false, "task has no typed parameters - task was not properly planned")
glog.Errorf("Worker %s rejecting task %s: no typed parameters", w.id, task.ID)
return
}
// Use new task execution system with unified Task interface
glog.V(1).Infof("Executing task %s with typed protobuf parameters", task.ID)
// Initialize a file-based task logger so admin can retrieve logs
// Build minimal params for logger metadata
loggerParams := types.TaskParams{
VolumeID: task.VolumeID,
Collection: task.Collection,
TypedParams: task.TypedParams,
}
loggerConfig := w.getTaskLoggerConfig()
fileLogger, logErr := tasks.NewTaskLogger(task.ID, task.Type, w.id, loggerParams, loggerConfig)
if logErr != nil {
glog.Warningf("Failed to initialize file logger for task %s: %v", task.ID, logErr)
} else {
defer func() {
if err := fileLogger.Close(); err != nil {
glog.V(1).Infof("Failed to close task logger for %s: %v", task.ID, err)
}
}()
fileLogger.Info("Task %s started (type=%s, server=%s, collection=%s)", task.ID, task.Type, task.Server, task.Collection)
}
taskFactory := w.registry.Get(task.Type)
if taskFactory == nil {
w.completeTask(task.ID, false, fmt.Sprintf("task factory not available for %s: task type not found", task.Type))
glog.Errorf("Worker %s failed to get task factory for %s type %v", w.id, task.ID, task.Type)
// Log supported task types for debugging
allFactories := w.registry.GetAll()
glog.Errorf("Available task types: %d", len(allFactories))
for taskType := range allFactories {
glog.Errorf("Supported task type: %v", taskType)
}
return
}
taskInstance, err := taskFactory.Create(task.TypedParams)
if err != nil {
w.completeTask(task.ID, false, fmt.Sprintf("failed to create task for %s: %v", task.Type, err))
glog.Errorf("Worker %s failed to create task %s type %v: %v", w.id, task.ID, task.Type, err)
return
}
// Task execution uses the new unified Task interface
glog.V(2).Infof("Executing task %s in working directory: %s", task.ID, taskWorkingDir)
// If we have a file logger, adapt it so task WithFields logs are captured into file
if fileLogger != nil {
if withLogger, ok := taskInstance.(interface{ SetLogger(types.Logger) }); ok {
withLogger.SetLogger(newTaskLoggerAdapter(fileLogger))
}
}
// Set progress callback that reports to admin server
taskInstance.SetProgressCallback(func(progress float64, stage string) {
// Report progress updates to admin server
glog.V(2).Infof("Task %s progress: %.1f%% - %s", task.ID, progress, stage)
if err := w.getAdmin().UpdateTaskProgress(task.ID, progress); err != nil {
glog.V(1).Infof("Failed to report task progress to admin: %v", err)
}
if fileLogger != nil {
// Use meaningful stage description or fallback to generic message
message := stage
if message == "" {
message = fmt.Sprintf("Progress: %.1f%%", progress)
}
fileLogger.LogProgress(progress, message)
}
})
// Execute task with context
ctx := context.Background()
err = taskInstance.Execute(ctx, task.TypedParams)
// Report completion
if err != nil {
w.completeTask(task.ID, false, err.Error())
w.cmds <- workerCommand{
action: ActionIncTaskFail,
}
glog.Errorf("Worker %s failed to execute task %s: %v", w.id, task.ID, err)
if fileLogger != nil {
fileLogger.LogStatus("failed", err.Error())
fileLogger.Error("Task %s failed: %v", task.ID, err)
}
} else {
w.completeTask(task.ID, true, "")
w.cmds <- workerCommand{
action: ActionIncTaskComplete,
}
glog.Infof("Worker %s completed task %s successfully", w.id, task.ID)
if fileLogger != nil {
fileLogger.Info("Task %s completed successfully", task.ID)
}
}
}
// completeTask reports task completion to admin server
func (w *Worker) completeTask(taskID string, success bool, errorMsg string) {
if w.getAdmin() != nil {
if err := w.getAdmin().CompleteTask(taskID, success, errorMsg); err != nil {
glog.Errorf("Failed to report task completion: %v", err)
}
}
}
// heartbeatLoop sends periodic heartbeats to the admin server
func (w *Worker) heartbeatLoop() {
defer w.setHbTick(time.NewTicker(w.config.HeartbeatInterval)).Stop()
ticker := w.getHbTick()
stopChan := w.getStopChan()
for {
select {
case <-stopChan:
return
case <-ticker.C:
w.sendHeartbeat()
}
}
}
// taskRequestLoop periodically requests new tasks from the admin server
func (w *Worker) taskRequestLoop() {
defer w.setReqTick(time.NewTicker(w.config.TaskRequestInterval)).Stop()
ticker := w.getReqTick()
stopChan := w.getStopChan()
for {
select {
case <-stopChan:
return
case <-ticker.C:
w.requestTasks()
}
}
}
// sendHeartbeat sends heartbeat to admin server
func (w *Worker) sendHeartbeat() {
if w.getAdmin() != nil {
if err := w.getAdmin().SendHeartbeat(w.id, &types.WorkerStatus{
WorkerID: w.id,
Status: "active",
Capabilities: w.config.Capabilities,
MaxConcurrent: w.config.MaxConcurrent,
CurrentLoad: w.getTaskLoad(),
LastHeartbeat: time.Now(),
}); err != nil {
glog.Warningf("Failed to send heartbeat: %v", err)
}
}
}
// requestTasks requests new tasks from the admin server
func (w *Worker) requestTasks() {
currentLoad := w.getTaskLoad()
if currentLoad >= w.config.MaxConcurrent {
glog.V(3).Infof("TASK REQUEST SKIPPED: Worker %s at capacity (%d/%d)",
w.id, currentLoad, w.config.MaxConcurrent)
return // Already at capacity
}
if w.getAdmin() != nil {
glog.V(3).Infof("REQUESTING TASK: Worker %s requesting task from admin server (current load: %d/%d, capabilities: %v)",
w.id, currentLoad, w.config.MaxConcurrent, w.config.Capabilities)
task, err := w.getAdmin().RequestTask(w.id, w.config.Capabilities)
if err != nil {
glog.V(2).Infof("TASK REQUEST FAILED: Worker %s failed to request task: %v", w.id, err)
return
}
if task != nil {
glog.Infof("TASK RESPONSE RECEIVED: Worker %s received task from admin server - ID: %s, Type: %s",
w.id, task.ID, task.Type)
if err := w.HandleTask(task); err != nil {
glog.Errorf("TASK HANDLING FAILED: Worker %s failed to handle task %s: %v", w.id, task.ID, err)
}
} else {
glog.V(3).Infof("NO TASK AVAILABLE: Worker %s - admin server has no tasks available", w.id)
}
}
}
// GetTaskRegistry returns the task registry
func (w *Worker) GetTaskRegistry() *tasks.TaskRegistry {
return w.registry
}
// registerWorker registers the worker with the admin server
func (w *Worker) registerWorker() {
workerInfo := &types.WorkerData{
ID: w.id,
Capabilities: w.config.Capabilities,
MaxConcurrent: w.config.MaxConcurrent,
Status: "active",
CurrentLoad: 0,
LastHeartbeat: time.Now(),
}
if err := w.getAdmin().RegisterWorker(workerInfo); err != nil {
glog.Warningf("Failed to register worker (will retry on next heartbeat): %v", err)
} else {
glog.Infof("Worker %s registered successfully with admin server", w.id)
}
}
// connectionMonitorLoop monitors connection status
func (w *Worker) connectionMonitorLoop() {
ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds
defer ticker.Stop()
lastConnectionStatus := false
stopChan := w.getStopChan()
for {
select {
case <-stopChan:
glog.V(1).Infof("CONNECTION MONITOR STOPPING: Worker %s connection monitor loop stopping", w.id)
return
case <-ticker.C:
// Monitor connection status and log changes
currentConnectionStatus := w.getAdmin() != nil && w.getAdmin().IsConnected()
if currentConnectionStatus != lastConnectionStatus {
if currentConnectionStatus {
glog.Infof("CONNECTION RESTORED: Worker %s connection status changed: connected", w.id)
} else {
glog.Warningf("CONNECTION LOST: Worker %s connection status changed: disconnected", w.id)
}
lastConnectionStatus = currentConnectionStatus
} else {
if currentConnectionStatus {
glog.V(3).Infof("CONNECTION OK: Worker %s connection status: connected", w.id)
} else {
glog.V(1).Infof("CONNECTION DOWN: Worker %s connection status: disconnected, reconnection in progress", w.id)
}
}
}
}
}
// GetConfig returns the worker configuration
func (w *Worker) GetConfig() *types.WorkerConfig {
return w.config
}
// GetPerformanceMetrics returns performance metrics
func (w *Worker) GetPerformanceMetrics() *types.WorkerPerformance {
uptime := time.Since(w.getStartTime())
var successRate float64
totalTasks := w.getCompletedTasks() + w.getFailedTasks()
if totalTasks > 0 {
successRate = float64(w.getCompletedTasks()) / float64(totalTasks) * 100
}
return &types.WorkerPerformance{
TasksCompleted: w.getCompletedTasks(),
TasksFailed: w.getFailedTasks(),
AverageTaskTime: 0, // Would need to track this
Uptime: uptime,
SuccessRate: successRate,
}
}
// messageProcessingLoop processes incoming admin messages
func (w *Worker) messageProcessingLoop() {
glog.Infof("MESSAGE LOOP STARTED: Worker %s message processing loop started", w.id)
// Get access to the incoming message channel from gRPC client
grpcClient, ok := w.getAdmin().(*GrpcAdminClient)
if !ok {
glog.Warningf("MESSAGE LOOP UNAVAILABLE: Worker %s admin client is not gRPC client, message processing not available", w.id)
return
}
incomingChan := grpcClient.GetIncomingChannel()
glog.V(1).Infof("MESSAGE CHANNEL READY: Worker %s connected to incoming message channel", w.id)
stopChan := w.getStopChan()
for {
select {
case <-stopChan:
glog.Infof("MESSAGE LOOP STOPPING: Worker %s message processing loop stopping", w.id)
return
case message := <-incomingChan:
if message != nil {
glog.V(3).Infof("MESSAGE PROCESSING: Worker %s processing incoming message", w.id)
w.processAdminMessage(message)
} else {
glog.V(3).Infof("NULL MESSAGE: Worker %s received nil message", w.id)
}
}
}
}
// processAdminMessage processes different types of admin messages
func (w *Worker) processAdminMessage(message *worker_pb.AdminMessage) {
glog.V(4).Infof("ADMIN MESSAGE RECEIVED: Worker %s received admin message: %T", w.id, message.Message)
switch msg := message.Message.(type) {
case *worker_pb.AdminMessage_RegistrationResponse:
glog.V(2).Infof("REGISTRATION RESPONSE: Worker %s received registration response", w.id)
w.handleRegistrationResponse(msg.RegistrationResponse)
case *worker_pb.AdminMessage_HeartbeatResponse:
glog.V(3).Infof("HEARTBEAT RESPONSE: Worker %s received heartbeat response", w.id)
w.handleHeartbeatResponse(msg.HeartbeatResponse)
case *worker_pb.AdminMessage_TaskLogRequest:
glog.V(1).Infof("TASK LOG REQUEST: Worker %s received task log request for task %s", w.id, msg.TaskLogRequest.TaskId)
w.handleTaskLogRequest(msg.TaskLogRequest)
case *worker_pb.AdminMessage_TaskAssignment:
taskAssign := msg.TaskAssignment
glog.V(1).Infof("Worker %s received direct task assignment %s (type: %s, volume: %d)",
w.id, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId)
// Convert to task and handle it
task := &types.TaskInput{
ID: taskAssign.TaskId,
Type: types.TaskType(taskAssign.TaskType),
Status: types.TaskStatusAssigned,
VolumeID: taskAssign.Params.VolumeId,
Server: getServerFromParams(taskAssign.Params),
Collection: taskAssign.Params.Collection,
Priority: types.TaskPriority(taskAssign.Priority),
CreatedAt: time.Unix(taskAssign.CreatedTime, 0),
TypedParams: taskAssign.Params,
}
if err := w.HandleTask(task); err != nil {
glog.Errorf("DIRECT TASK ASSIGNMENT FAILED: Worker %s failed to handle direct task assignment %s: %v", w.id, task.ID, err)
}
case *worker_pb.AdminMessage_TaskCancellation:
glog.Infof("TASK CANCELLATION: Worker %s received task cancellation for task %s", w.id, msg.TaskCancellation.TaskId)
w.handleTaskCancellation(msg.TaskCancellation)
case *worker_pb.AdminMessage_AdminShutdown:
glog.Infof("ADMIN SHUTDOWN: Worker %s received admin shutdown message", w.id)
w.handleAdminShutdown(msg.AdminShutdown)
default:
glog.V(1).Infof("UNKNOWN MESSAGE: Worker %s received unknown admin message type: %T", w.id, message.Message)
}
}
// handleTaskLogRequest processes task log requests from admin server
func (w *Worker) handleTaskLogRequest(request *worker_pb.TaskLogRequest) {
glog.V(1).Infof("Worker %s handling task log request for task %s", w.id, request.TaskId)
// Use the task log handler to process the request
response := w.taskLogHandler.HandleLogRequest(request)
// Send response back to admin server
responseMsg := &worker_pb.WorkerMessage{
WorkerId: w.id,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_TaskLogResponse{
TaskLogResponse: response,
},
}
grpcClient, ok := w.getAdmin().(*GrpcAdminClient)
if !ok {
glog.Errorf("Cannot send task log response: admin client is not gRPC client")
return
}
select {
case grpcClient.outgoing <- responseMsg:
glog.V(1).Infof("Task log response sent for task %s", request.TaskId)
case <-time.After(5 * time.Second):
glog.Errorf("Failed to send task log response for task %s: timeout", request.TaskId)
}
}
// handleTaskCancellation processes task cancellation requests
func (w *Worker) handleTaskCancellation(cancellation *worker_pb.TaskCancellation) {
glog.Infof("Worker %s received task cancellation for task %s", w.id, cancellation.TaskId)
w.cmds <- workerCommand{
action: ActionCancelTask,
data: cancellation.TaskId,
resp: nil,
}
}
// handleAdminShutdown processes admin shutdown notifications
func (w *Worker) handleAdminShutdown(shutdown *worker_pb.AdminShutdown) {
glog.Infof("Worker %s received admin shutdown notification: %s", w.id, shutdown.Reason)
gracefulSeconds := shutdown.GracefulShutdownSeconds
if gracefulSeconds > 0 {
glog.Infof("Graceful shutdown in %d seconds", gracefulSeconds)
time.AfterFunc(time.Duration(gracefulSeconds)*time.Second, func() {
w.Stop()
})
} else {
// Immediate shutdown
go w.Stop()
}
}
// handleRegistrationResponse processes registration response from admin server
func (w *Worker) handleRegistrationResponse(response *worker_pb.RegistrationResponse) {
glog.V(2).Infof("Worker %s processed registration response: success=%v", w.id, response.Success)
if !response.Success {
glog.Warningf("Worker %s registration failed: %s", w.id, response.Message)
}
// Registration responses are typically handled by the gRPC client during connection setup
// No additional action needed here
}
// handleHeartbeatResponse processes heartbeat response from admin server
func (w *Worker) handleHeartbeatResponse(response *worker_pb.HeartbeatResponse) {
glog.V(4).Infof("Worker %s processed heartbeat response", w.id)
// Heartbeat responses are mainly for keeping the connection alive
// The admin may include configuration updates or status information in the future
// For now, just acknowledge receipt
}