* Fix worker reconnection race condition causing context canceled errors Fixes #7824 This commit fixes critical connection stability issues between admin server and workers that manifested as rapid reconnection cycles with 'context canceled' errors, particularly after 24+ hours of operation in containerized environments. Root Cause: ----------- Race condition where TWO goroutines were calling stream.Recv() on the same gRPC bidirectional stream concurrently: 1. sendRegistrationSync() started a goroutine that calls stream.Recv() 2. handleIncoming() also calls stream.Recv() in a loop Per gRPC specification, only ONE goroutine can call Recv() on a stream at a time. Concurrent Recv() calls cause undefined behavior, manifesting as 'context canceled' errors and stream corruption. The race occurred during worker reconnection: - Sometimes sendRegistrationSync goroutine read the registration response first (success) - Sometimes handleIncoming read it first, causing sendRegistrationSync to timeout - This left the stream in an inconsistent state, triggering 'context canceled' error - The error triggered rapid reconnection attempts, creating a reconnection storm Why it happened after 24 hours: Container orchestration systems (Docker Swarm/Kubernetes) periodically restart pods. Over time, workers reconnect multiple times. Each reconnection had a chance of hitting the race condition. Eventually the race manifested and caused the connection storm. Changes: -------- weed/worker/client.go: - Start handleIncoming and handleOutgoing goroutines BEFORE sending registration - Use sendRegistration() instead of sendRegistrationSync() - Ensures only ONE goroutine (handleIncoming) calls stream.Recv() - Eliminates race condition entirely weed/admin/dash/worker_grpc_server.go: - Clean up old connection when worker reconnects with same ID - Cancel old connection context to stop its goroutines - Prevents resource leaks and stale connection accumulation Impact: ------- Before: Random 'context canceled' errors during reconnection, rapid reconnection cycles, resource leaks, requires manual restart to recover After: Reliable reconnection, single Recv() goroutine, proper cleanup, stable operation over 24+ hours Testing: -------- Build verified successful with no compilation errors. How to reproduce the bug: 1. Start admin server and worker 2. Restart admin server (simulates container recreation) 3. Worker reconnects 4. Race condition may manifest, causing 'context canceled' error 5. Observe rapid reconnection cycles in logs The fix is backward compatible and requires no configuration changes. * Add MaxConnectionAge to gRPC server for Docker Swarm DNS handling - Configure MaxConnectionAge and MaxConnectionAgeGrace for gRPC server - Expand error detection in shouldInvalidateConnection for better cache invalidation - Add connection lifecycle logging for debugging * Add topology validation and nil-safety checks - Add validation guards in UpdateTopology to prevent invalid updates - Add nil-safety checks in rebuildIndexes - Add GetDiskCount method for diagnostic purposes * Fix worker registration race condition - Reorder goroutine startup in WorkerStream to prevent race conditions - Add defensive cleanup in unregisterWorker with panic-safe channel closing * Add comprehensive topology update logging - Enhance UpdateTopologyInfo with detailed logging of datacenter/node/disk counts - Add metrics logging for topology changes * Add periodic diagnostic status logging - Implement topologyStatusLoop running every 5 minutes - Add logTopologyStatus function reporting system metrics - Run as background goroutine in maintenance manager * Enhance master client connection logging - Add connection timing logs in tryConnectToMaster - Add reconnection attempt counting in KeepConnectedToMaster - Improve diagnostic visibility for connection issues * Remove unused sendRegistrationSync function - Function is no longer called after switching to asynchronous sendRegistration - Contains the problematic concurrent stream.Recv() pattern that caused race conditions - Cleanup as suggested in PR review * Clarify comment for channel closing during disconnection - Improve comment to explain why channels are closed and their effect - Make the code more self-documenting as suggested in PR review * Address code review feedback: refactor and improvements - Extract topology counting logic to shared helper function CountTopologyResources() to eliminate duplication between topology_management.go and maintenance_integration.go - Use gRPC status codes for more robust error detection in shouldInvalidateConnection(), falling back to string matching for transport-level errors - Add recover wrapper for channel close consistency in cleanupStaleConnections() to match unregisterWorker() pattern * Update grpc_client_server.go * Fix data race on lastSeen field access - Add mutex protection around conn.lastSeen = time.Now() in WorkerStream method - Ensures thread-safe access consistent with cleanupStaleConnections * Fix goroutine leaks in worker reconnection logic - Close streamExit in reconnect() before creating new connection - Close streamExit in attemptConnection() when sendRegistration fails - Prevents orphaned handleOutgoing/handleIncoming goroutines from previous connections - Ensures proper cleanup of goroutines competing for shared outgoing channel * Minor cleanup improvements for consistency and clarity - Remove redundant string checks in shouldInvalidateConnection that overlap with gRPC status codes - Add recover block to Stop() method for consistency with other channel close operations - Maintains valuable DNS and transport-specific error detection while eliminating redundancy * Improve topology update error handling - Return descriptive errors instead of silently preserving topology for invalid updates - Change nil topologyInfo case to return 'rejected invalid topology update: nil topologyInfo' - Change empty DataCenterInfos case to return 'rejected invalid topology update: empty DataCenterInfos (had X nodes, Y disks)' - Keep existing glog.Warningf calls but append error details to logs before returning errors - Allows callers to distinguish rejected updates and handle them appropriately * Refactor safe channel closing into helper method - Add safeCloseOutgoingChannel helper method to eliminate code duplication - Replace repeated recover blocks in Stop, unregisterWorker, and cleanupStaleConnections - Improves maintainability and ensures consistent error handling across all channel close operations - Maintains same panic recovery behavior with contextual source identification * Make connection invalidation string matching case-insensitive - Convert error string to lowercase once for all string.Contains checks - Improves robustness by catching error message variations from different sources - Eliminates need for separate 'DNS resolution' and 'dns' checks - Maintains same error detection coverage with better reliability * Clean up warning logs in UpdateTopology to avoid duplicating error text - Remove duplicated error phrases from glog.Warningf messages - Keep concise contextual warnings that don't repeat the fmt.Errorf content - Maintain same error returns for backward compatibility * Add robust validation to prevent topology wipeout during master restart - Reject topology updates with 0 nodes when current topology has nodes - Prevents transient empty topology from overwriting valid state - Improves resilience during master restart scenarios - Maintains backward compatibility for legitimate empty topology updates
830 lines
24 KiB
Go
830 lines
24 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
var (
|
|
ErrAlreadyConnected = errors.New("already connected")
|
|
)
|
|
|
|
// GrpcAdminClient implements AdminClient using gRPC bidirectional streaming
|
|
type GrpcAdminClient struct {
|
|
adminAddress string
|
|
workerID string
|
|
dialOption grpc.DialOption
|
|
|
|
cmds chan grpcCommand
|
|
|
|
// Reconnection parameters
|
|
maxReconnectAttempts int
|
|
reconnectBackoff time.Duration
|
|
maxReconnectBackoff time.Duration
|
|
reconnectMultiplier float64
|
|
|
|
// Channels for communication
|
|
outgoing chan *worker_pb.WorkerMessage
|
|
incoming chan *worker_pb.AdminMessage
|
|
responseChans map[string]chan *worker_pb.AdminMessage
|
|
}
|
|
|
|
type grpcAction string
|
|
|
|
const (
|
|
ActionConnect grpcAction = "connect"
|
|
ActionDisconnect grpcAction = "disconnect"
|
|
ActionReconnect grpcAction = "reconnect"
|
|
ActionStreamError grpcAction = "stream_error"
|
|
ActionRegisterWorker grpcAction = "register_worker"
|
|
ActionQueryReconnecting grpcAction = "query_reconnecting"
|
|
ActionQueryConnected grpcAction = "query_connected"
|
|
ActionQueryShouldReconnect grpcAction = "query_shouldreconnect"
|
|
)
|
|
|
|
type registrationRequest struct {
|
|
Worker *types.WorkerData
|
|
Resp chan error // Used to send the registration result back
|
|
}
|
|
|
|
type grpcCommand struct {
|
|
action grpcAction
|
|
data any
|
|
resp chan error // for reporting success/failure
|
|
}
|
|
|
|
type grpcState struct {
|
|
connected bool
|
|
reconnecting bool
|
|
shouldReconnect bool
|
|
conn *grpc.ClientConn
|
|
client worker_pb.WorkerServiceClient
|
|
stream worker_pb.WorkerService_WorkerStreamClient
|
|
streamCtx context.Context
|
|
streamCancel context.CancelFunc
|
|
lastWorkerInfo *types.WorkerData
|
|
reconnectStop chan struct{}
|
|
streamExit chan struct{}
|
|
}
|
|
|
|
// NewGrpcAdminClient creates a new gRPC admin client
|
|
func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.DialOption) *GrpcAdminClient {
|
|
// Admin uses HTTP port + 10000 as gRPC port
|
|
grpcAddress := pb.ServerToGrpcAddress(adminAddress)
|
|
|
|
c := &GrpcAdminClient{
|
|
adminAddress: grpcAddress,
|
|
workerID: workerID,
|
|
dialOption: dialOption,
|
|
maxReconnectAttempts: 0, // 0 means infinite attempts
|
|
reconnectBackoff: 1 * time.Second,
|
|
maxReconnectBackoff: 30 * time.Second,
|
|
reconnectMultiplier: 1.5,
|
|
outgoing: make(chan *worker_pb.WorkerMessage, 100),
|
|
incoming: make(chan *worker_pb.AdminMessage, 100),
|
|
responseChans: make(map[string]chan *worker_pb.AdminMessage),
|
|
cmds: make(chan grpcCommand),
|
|
}
|
|
go c.managerLoop()
|
|
return c
|
|
}
|
|
|
|
func (c *GrpcAdminClient) managerLoop() {
|
|
state := &grpcState{shouldReconnect: true}
|
|
|
|
out:
|
|
for cmd := range c.cmds {
|
|
switch cmd.action {
|
|
case ActionConnect:
|
|
c.handleConnect(cmd, state)
|
|
case ActionDisconnect:
|
|
c.handleDisconnect(cmd, state)
|
|
break out
|
|
case ActionReconnect:
|
|
if state.connected || state.reconnecting || !state.shouldReconnect {
|
|
cmd.resp <- ErrAlreadyConnected
|
|
continue
|
|
}
|
|
state.reconnecting = true // Manager acknowledges the attempt
|
|
err := c.reconnect(state)
|
|
state.reconnecting = false
|
|
cmd.resp <- err
|
|
case ActionStreamError:
|
|
state.connected = false
|
|
case ActionRegisterWorker:
|
|
req := cmd.data.(registrationRequest)
|
|
state.lastWorkerInfo = req.Worker
|
|
if !state.connected {
|
|
glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection")
|
|
// Respond immediately with success (registration will happen later)
|
|
req.Resp <- nil
|
|
continue
|
|
}
|
|
err := c.sendRegistration(req.Worker)
|
|
req.Resp <- err
|
|
case ActionQueryConnected:
|
|
respCh := cmd.data.(chan bool)
|
|
respCh <- state.connected
|
|
case ActionQueryReconnecting:
|
|
respCh := cmd.data.(chan bool)
|
|
respCh <- state.reconnecting
|
|
case ActionQueryShouldReconnect:
|
|
respCh := cmd.data.(chan bool)
|
|
respCh <- state.shouldReconnect
|
|
}
|
|
}
|
|
}
|
|
|
|
// Connect establishes gRPC connection to admin server with TLS detection
|
|
func (c *GrpcAdminClient) Connect() error {
|
|
resp := make(chan error)
|
|
c.cmds <- grpcCommand{
|
|
action: ActionConnect,
|
|
resp: resp,
|
|
}
|
|
return <-resp
|
|
}
|
|
|
|
func (c *GrpcAdminClient) handleConnect(cmd grpcCommand, s *grpcState) {
|
|
if s.connected {
|
|
cmd.resp <- fmt.Errorf("already connected")
|
|
return
|
|
}
|
|
|
|
// Start reconnection loop immediately (async)
|
|
stop := make(chan struct{})
|
|
s.reconnectStop = stop
|
|
go c.reconnectionLoop(stop)
|
|
|
|
// Attempt the initial connection
|
|
err := c.attemptConnection(s)
|
|
if err != nil {
|
|
glog.V(1).Infof("Initial connection failed, reconnection loop will retry: %v", err)
|
|
cmd.resp <- err
|
|
return
|
|
}
|
|
cmd.resp <- nil
|
|
}
|
|
|
|
// createConnection attempts to connect using the provided dial option
|
|
func (c *GrpcAdminClient) createConnection() (*grpc.ClientConn, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
conn, err := pb.GrpcDial(ctx, c.adminAddress, false, c.dialOption)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to admin server: %w", err)
|
|
}
|
|
|
|
glog.Infof("Connected to admin server at %s", c.adminAddress)
|
|
return conn, nil
|
|
}
|
|
|
|
// attemptConnection tries to establish the connection without managing the reconnection loop
|
|
func (c *GrpcAdminClient) attemptConnection(s *grpcState) error {
|
|
// Detect TLS support and create appropriate connection
|
|
conn, err := c.createConnection()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to admin server: %w", err)
|
|
}
|
|
|
|
s.conn = conn
|
|
s.client = worker_pb.NewWorkerServiceClient(conn)
|
|
|
|
// Create bidirectional stream
|
|
s.streamCtx, s.streamCancel = context.WithCancel(context.Background())
|
|
stream, err := s.client.WorkerStream(s.streamCtx)
|
|
glog.Infof("Worker stream created")
|
|
if err != nil {
|
|
s.conn.Close()
|
|
return fmt.Errorf("failed to create worker stream: %w", err)
|
|
}
|
|
s.connected = true
|
|
s.stream = stream
|
|
|
|
// Start stream handlers BEFORE sending registration
|
|
// This ensures handleIncoming is ready to receive the registration response
|
|
s.streamExit = make(chan struct{})
|
|
go handleOutgoing(s.stream, s.streamExit, c.outgoing, c.cmds)
|
|
go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds)
|
|
|
|
// Always check for worker info and send registration immediately as the very first message
|
|
if s.lastWorkerInfo != nil {
|
|
// Send registration via the normal outgoing channel and wait for response via incoming
|
|
if err := c.sendRegistration(s.lastWorkerInfo); err != nil {
|
|
close(s.streamExit)
|
|
s.streamCancel()
|
|
s.conn.Close()
|
|
s.connected = false
|
|
return fmt.Errorf("failed to register worker: %w", err)
|
|
}
|
|
glog.Infof("Worker registered successfully with admin server")
|
|
} else {
|
|
// No worker info yet - stream will wait for registration
|
|
glog.V(1).Infof("Connected to admin server, waiting for worker registration info")
|
|
}
|
|
|
|
glog.Infof("Connected to admin server at %s", c.adminAddress)
|
|
return nil
|
|
}
|
|
|
|
// reconnect attempts to re-establish the connection
|
|
func (c *GrpcAdminClient) reconnect(s *grpcState) error {
|
|
// Clean up existing connection completely
|
|
if s.streamExit != nil {
|
|
close(s.streamExit)
|
|
}
|
|
if s.streamCancel != nil {
|
|
s.streamCancel()
|
|
}
|
|
if s.conn != nil {
|
|
s.conn.Close()
|
|
}
|
|
s.connected = false
|
|
|
|
// Attempt to re-establish connection using the same logic as initial connection
|
|
if err := c.attemptConnection(s); err != nil {
|
|
return fmt.Errorf("failed to reconnect: %w", err)
|
|
}
|
|
|
|
// Registration is now handled in attemptConnection if worker info is available
|
|
return nil
|
|
}
|
|
|
|
// reconnectionLoop handles automatic reconnection with exponential backoff
|
|
func (c *GrpcAdminClient) reconnectionLoop(reconnectStop chan struct{}) {
|
|
backoff := c.reconnectBackoff
|
|
attempts := 0
|
|
|
|
for {
|
|
waitDuration := backoff
|
|
if attempts == 0 {
|
|
waitDuration = time.Second
|
|
}
|
|
select {
|
|
case <-reconnectStop:
|
|
return
|
|
case <-time.After(waitDuration):
|
|
}
|
|
resp := make(chan error, 1)
|
|
c.cmds <- grpcCommand{
|
|
action: ActionReconnect,
|
|
resp: resp,
|
|
}
|
|
err := <-resp
|
|
if err == nil {
|
|
// Successful reconnection
|
|
attempts = 0
|
|
backoff = c.reconnectBackoff
|
|
glog.Infof("Successfully reconnected to admin server")
|
|
} else if errors.Is(err, ErrAlreadyConnected) {
|
|
attempts = 0
|
|
backoff = c.reconnectBackoff
|
|
} else {
|
|
attempts++
|
|
glog.Errorf("Reconnection attempt %d failed: %v", attempts, err)
|
|
|
|
// Check if we should give up
|
|
if c.maxReconnectAttempts > 0 && attempts >= c.maxReconnectAttempts {
|
|
glog.Errorf("Max reconnection attempts (%d) reached, giving up", c.maxReconnectAttempts)
|
|
return
|
|
}
|
|
|
|
// Increase backoff
|
|
backoff = time.Duration(float64(backoff) * c.reconnectMultiplier)
|
|
if backoff > c.maxReconnectBackoff {
|
|
backoff = c.maxReconnectBackoff
|
|
}
|
|
glog.Infof("Waiting %v before next reconnection attempt", backoff)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleOutgoing processes outgoing messages to admin
|
|
func handleOutgoing(
|
|
stream worker_pb.WorkerService_WorkerStreamClient,
|
|
streamExit <-chan struct{},
|
|
outgoing <-chan *worker_pb.WorkerMessage,
|
|
cmds chan<- grpcCommand) {
|
|
|
|
msgCh := make(chan *worker_pb.WorkerMessage)
|
|
errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy
|
|
// Goroutine to handle blocking stream.Recv() and simultaneously handle exit
|
|
// signals
|
|
go func() {
|
|
for msg := range msgCh {
|
|
if err := stream.Send(msg); err != nil {
|
|
errCh <- err
|
|
return // Exit the receiver goroutine on error/EOF
|
|
}
|
|
}
|
|
close(errCh)
|
|
}()
|
|
|
|
for msg := range outgoing {
|
|
select {
|
|
case msgCh <- msg:
|
|
case err := <-errCh:
|
|
glog.Errorf("Failed to send message to admin: %v", err)
|
|
cmds <- grpcCommand{action: ActionStreamError, data: err}
|
|
return
|
|
case <-streamExit:
|
|
close(msgCh)
|
|
<-errCh
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleIncoming processes incoming messages from admin
|
|
func handleIncoming(
|
|
workerID string,
|
|
stream worker_pb.WorkerService_WorkerStreamClient,
|
|
streamExit <-chan struct{},
|
|
incoming chan<- *worker_pb.AdminMessage,
|
|
cmds chan<- grpcCommand) {
|
|
glog.V(1).Infof("INCOMING HANDLER STARTED: Worker %s incoming message handler started", workerID)
|
|
msgCh := make(chan *worker_pb.AdminMessage)
|
|
errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy
|
|
// Goroutine to handle blocking stream.Recv() and simultaneously handle exit
|
|
// signals
|
|
go func() {
|
|
for {
|
|
msg, err := stream.Recv()
|
|
if err != nil {
|
|
errCh <- err
|
|
return // Exit the receiver goroutine on error/EOF
|
|
}
|
|
msgCh <- msg
|
|
}
|
|
}()
|
|
|
|
for {
|
|
glog.V(4).Infof("LISTENING: Worker %s waiting for message from admin server", workerID)
|
|
|
|
select {
|
|
case msg := <-msgCh:
|
|
// Message successfully received from the stream
|
|
glog.V(4).Infof("MESSAGE RECEIVED: Worker %s received message from admin server: %T", workerID, msg.Message)
|
|
|
|
// Route message to waiting goroutines or general handler (original select logic)
|
|
select {
|
|
case incoming <- msg:
|
|
glog.V(3).Infof("MESSAGE ROUTED: Worker %s successfully routed message to handler", workerID)
|
|
case <-time.After(time.Second):
|
|
glog.Warningf("MESSAGE DROPPED: Worker %s incoming message buffer full, dropping message: %T", workerID, msg.Message)
|
|
}
|
|
|
|
case err := <-errCh:
|
|
// Stream Receiver goroutine reported an error (EOF or network error)
|
|
if err == io.EOF {
|
|
glog.Infof("STREAM CLOSED: Worker %s admin server closed the stream", workerID)
|
|
} else {
|
|
glog.Errorf("RECEIVE ERROR: Worker %s failed to receive message from admin: %v", workerID, err)
|
|
}
|
|
|
|
// Report the failure as a command to the managerLoop (blocking)
|
|
cmds <- grpcCommand{action: ActionStreamError, data: err}
|
|
|
|
// Exit the main handler loop
|
|
glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler due to stream error", workerID)
|
|
return
|
|
|
|
case <-streamExit:
|
|
// Manager closed this channel, signaling a controlled disconnection.
|
|
glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - received exit signal", workerID)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Connect establishes gRPC connection to admin server with TLS detection
|
|
func (c *GrpcAdminClient) Disconnect() error {
|
|
resp := make(chan error)
|
|
c.cmds <- grpcCommand{
|
|
action: ActionDisconnect,
|
|
resp: resp,
|
|
}
|
|
err := <-resp
|
|
return err
|
|
}
|
|
|
|
func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) {
|
|
if !s.connected {
|
|
cmd.resp <- fmt.Errorf("already disconnected")
|
|
return
|
|
}
|
|
|
|
// Send shutdown signal to stop reconnection loop
|
|
close(s.reconnectStop)
|
|
|
|
s.connected = false
|
|
s.shouldReconnect = false
|
|
|
|
// Send shutdown message
|
|
shutdownMsg := &worker_pb.WorkerMessage{
|
|
WorkerId: c.workerID,
|
|
Timestamp: time.Now().Unix(),
|
|
Message: &worker_pb.WorkerMessage_Shutdown{
|
|
Shutdown: &worker_pb.WorkerShutdown{
|
|
WorkerId: c.workerID,
|
|
Reason: "normal shutdown",
|
|
},
|
|
},
|
|
}
|
|
|
|
// Close outgoing/incoming
|
|
select {
|
|
case c.outgoing <- shutdownMsg:
|
|
case <-time.After(time.Second):
|
|
glog.Warningf("Failed to send shutdown message")
|
|
}
|
|
|
|
// Send shutdown signal to stop handlers loop
|
|
close(s.streamExit)
|
|
|
|
// Cancel stream context
|
|
if s.streamCancel != nil {
|
|
s.streamCancel()
|
|
}
|
|
|
|
// Close connection
|
|
if s.conn != nil {
|
|
s.conn.Close()
|
|
}
|
|
|
|
// Close channels to signal all goroutines to stop
|
|
// This will cause any pending sends/receives to fail gracefully
|
|
close(c.outgoing)
|
|
close(c.incoming)
|
|
|
|
glog.Infof("Disconnected from admin server")
|
|
cmd.resp <- nil
|
|
}
|
|
|
|
// RegisterWorker registers the worker with the admin server
|
|
func (c *GrpcAdminClient) RegisterWorker(worker *types.WorkerData) error {
|
|
respCh := make(chan error, 1)
|
|
request := registrationRequest{
|
|
Worker: worker,
|
|
Resp: respCh,
|
|
}
|
|
c.cmds <- grpcCommand{
|
|
action: ActionRegisterWorker,
|
|
data: request,
|
|
}
|
|
return <-respCh
|
|
}
|
|
|
|
// sendRegistration sends the registration message and waits for response
|
|
func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error {
|
|
capabilities := make([]string, len(worker.Capabilities))
|
|
for i, cap := range worker.Capabilities {
|
|
capabilities[i] = string(cap)
|
|
}
|
|
|
|
msg := &worker_pb.WorkerMessage{
|
|
WorkerId: c.workerID,
|
|
Timestamp: time.Now().Unix(),
|
|
Message: &worker_pb.WorkerMessage_Registration{
|
|
Registration: &worker_pb.WorkerRegistration{
|
|
WorkerId: c.workerID,
|
|
Address: worker.Address,
|
|
Capabilities: capabilities,
|
|
MaxConcurrent: int32(worker.MaxConcurrent),
|
|
Metadata: make(map[string]string),
|
|
},
|
|
},
|
|
}
|
|
|
|
select {
|
|
case c.outgoing <- msg:
|
|
case <-time.After(5 * time.Second):
|
|
return fmt.Errorf("failed to send registration message: timeout")
|
|
}
|
|
|
|
// Wait for registration response
|
|
timeout := time.NewTimer(10 * time.Second)
|
|
defer timeout.Stop()
|
|
|
|
for {
|
|
select {
|
|
case response := <-c.incoming:
|
|
if regResp := response.GetRegistrationResponse(); regResp != nil {
|
|
if regResp.Success {
|
|
glog.Infof("Worker registered successfully: %s", regResp.Message)
|
|
return nil
|
|
}
|
|
return fmt.Errorf("registration failed: %s", regResp.Message)
|
|
}
|
|
case <-timeout.C:
|
|
return fmt.Errorf("registration timeout")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *GrpcAdminClient) IsConnected() bool {
|
|
respCh := make(chan bool, 1)
|
|
|
|
c.cmds <- grpcCommand{
|
|
action: ActionQueryConnected,
|
|
data: respCh,
|
|
}
|
|
|
|
return <-respCh
|
|
}
|
|
|
|
func (c *GrpcAdminClient) IsReconnecting() bool {
|
|
respCh := make(chan bool, 1)
|
|
|
|
c.cmds <- grpcCommand{
|
|
action: ActionQueryReconnecting,
|
|
data: respCh,
|
|
}
|
|
|
|
return <-respCh
|
|
}
|
|
|
|
func (c *GrpcAdminClient) ShouldReconnect() bool {
|
|
respCh := make(chan bool, 1)
|
|
|
|
c.cmds <- grpcCommand{
|
|
action: ActionQueryShouldReconnect,
|
|
data: respCh,
|
|
}
|
|
|
|
return <-respCh
|
|
}
|
|
|
|
// SendHeartbeat sends heartbeat to admin server
|
|
func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error {
|
|
if !c.IsConnected() {
|
|
// If we're currently reconnecting, don't wait - just skip the heartbeat
|
|
reconnecting := c.IsReconnecting()
|
|
|
|
if reconnecting {
|
|
// Don't treat as an error - reconnection is in progress
|
|
glog.V(2).Infof("Skipping heartbeat during reconnection")
|
|
return nil
|
|
}
|
|
|
|
// Wait for reconnection for a short time
|
|
if err := c.waitForConnection(10 * time.Second); err != nil {
|
|
return fmt.Errorf("not connected to admin server: %w", err)
|
|
}
|
|
}
|
|
|
|
taskIds := make([]string, len(status.CurrentTasks))
|
|
for i, task := range status.CurrentTasks {
|
|
taskIds[i] = task.ID
|
|
}
|
|
|
|
msg := &worker_pb.WorkerMessage{
|
|
WorkerId: c.workerID,
|
|
Timestamp: time.Now().Unix(),
|
|
Message: &worker_pb.WorkerMessage_Heartbeat{
|
|
Heartbeat: &worker_pb.WorkerHeartbeat{
|
|
WorkerId: c.workerID,
|
|
Status: status.Status,
|
|
CurrentLoad: int32(status.CurrentLoad),
|
|
MaxConcurrent: int32(status.MaxConcurrent),
|
|
CurrentTaskIds: taskIds,
|
|
TasksCompleted: int32(status.TasksCompleted),
|
|
TasksFailed: int32(status.TasksFailed),
|
|
UptimeSeconds: int64(status.Uptime.Seconds()),
|
|
},
|
|
},
|
|
}
|
|
|
|
select {
|
|
case c.outgoing <- msg:
|
|
return nil
|
|
case <-time.After(time.Second):
|
|
return fmt.Errorf("failed to send heartbeat: timeout")
|
|
}
|
|
}
|
|
|
|
// RequestTask requests a new task from admin server
|
|
func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) {
|
|
if !c.IsConnected() {
|
|
// If we're currently reconnecting, don't wait - just return no task
|
|
reconnecting := c.IsReconnecting()
|
|
|
|
if reconnecting {
|
|
// Don't treat as an error - reconnection is in progress
|
|
glog.V(2).Infof("RECONNECTING: Worker %s skipping task request during reconnection", workerID)
|
|
return nil, nil
|
|
}
|
|
|
|
// Wait for reconnection for a short time
|
|
if err := c.waitForConnection(5 * time.Second); err != nil {
|
|
return nil, fmt.Errorf("not connected to admin server: %w", err)
|
|
}
|
|
}
|
|
|
|
caps := make([]string, len(capabilities))
|
|
for i, cap := range capabilities {
|
|
caps[i] = string(cap)
|
|
}
|
|
|
|
glog.V(3).Infof("📤 SENDING TASK REQUEST: Worker %s sending task request to admin server with capabilities: %v",
|
|
workerID, capabilities)
|
|
|
|
msg := &worker_pb.WorkerMessage{
|
|
WorkerId: c.workerID,
|
|
Timestamp: time.Now().Unix(),
|
|
Message: &worker_pb.WorkerMessage_TaskRequest{
|
|
TaskRequest: &worker_pb.TaskRequest{
|
|
WorkerId: c.workerID,
|
|
Capabilities: caps,
|
|
AvailableSlots: 1, // Request one task
|
|
},
|
|
},
|
|
}
|
|
|
|
select {
|
|
case c.outgoing <- msg:
|
|
glog.V(3).Infof("TASK REQUEST SENT: Worker %s successfully sent task request to admin server", workerID)
|
|
case <-time.After(time.Second):
|
|
glog.Errorf("TASK REQUEST TIMEOUT: Worker %s failed to send task request: timeout", workerID)
|
|
return nil, fmt.Errorf("failed to send task request: timeout")
|
|
}
|
|
|
|
// Wait for task assignment
|
|
glog.V(3).Infof("WAITING FOR RESPONSE: Worker %s waiting for task assignment response (5s timeout)", workerID)
|
|
timeout := time.NewTimer(5 * time.Second)
|
|
defer timeout.Stop()
|
|
|
|
for {
|
|
select {
|
|
case response := <-c.incoming:
|
|
glog.V(3).Infof("RESPONSE RECEIVED: Worker %s received response from admin server: %T", workerID, response.Message)
|
|
if taskAssign := response.GetTaskAssignment(); taskAssign != nil {
|
|
glog.V(1).Infof("Worker %s received task assignment in response: %s (type: %s, volume: %d)",
|
|
workerID, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId)
|
|
|
|
// Convert to our task type
|
|
task := &types.TaskInput{
|
|
ID: taskAssign.TaskId,
|
|
Type: types.TaskType(taskAssign.TaskType),
|
|
Status: types.TaskStatusAssigned,
|
|
VolumeID: taskAssign.Params.VolumeId,
|
|
Server: getServerFromParams(taskAssign.Params),
|
|
Collection: taskAssign.Params.Collection,
|
|
Priority: types.TaskPriority(taskAssign.Priority),
|
|
CreatedAt: time.Unix(taskAssign.CreatedTime, 0),
|
|
// Use typed protobuf parameters directly
|
|
TypedParams: taskAssign.Params,
|
|
}
|
|
return task, nil
|
|
} else {
|
|
glog.V(3).Infof("NON-TASK RESPONSE: Worker %s received non-task response: %T", workerID, response.Message)
|
|
}
|
|
case <-timeout.C:
|
|
glog.V(3).Infof("TASK REQUEST TIMEOUT: Worker %s - no task assignment received within 5 seconds", workerID)
|
|
return nil, nil // No task available
|
|
}
|
|
}
|
|
}
|
|
|
|
// CompleteTask reports task completion to admin server
|
|
func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error {
|
|
return c.CompleteTaskWithMetadata(taskID, success, errorMsg, nil)
|
|
}
|
|
|
|
// CompleteTaskWithMetadata reports task completion with additional metadata
|
|
func (c *GrpcAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error {
|
|
if !c.IsConnected() {
|
|
// If we're currently reconnecting, don't wait - just skip the completion report
|
|
reconnecting := c.IsReconnecting()
|
|
|
|
if reconnecting {
|
|
// Don't treat as an error - reconnection is in progress
|
|
glog.V(2).Infof("Skipping task completion report during reconnection for task %s", taskID)
|
|
return nil
|
|
}
|
|
|
|
// Wait for reconnection for a short time
|
|
if err := c.waitForConnection(5 * time.Second); err != nil {
|
|
return fmt.Errorf("not connected to admin server: %w", err)
|
|
}
|
|
}
|
|
|
|
taskComplete := &worker_pb.TaskComplete{
|
|
TaskId: taskID,
|
|
WorkerId: c.workerID,
|
|
Success: success,
|
|
ErrorMessage: errorMsg,
|
|
CompletionTime: time.Now().Unix(),
|
|
}
|
|
|
|
// Add metadata if provided
|
|
if metadata != nil {
|
|
taskComplete.ResultMetadata = metadata
|
|
}
|
|
|
|
msg := &worker_pb.WorkerMessage{
|
|
WorkerId: c.workerID,
|
|
Timestamp: time.Now().Unix(),
|
|
Message: &worker_pb.WorkerMessage_TaskComplete{
|
|
TaskComplete: taskComplete,
|
|
},
|
|
}
|
|
|
|
select {
|
|
case c.outgoing <- msg:
|
|
return nil
|
|
case <-time.After(time.Second):
|
|
return fmt.Errorf("failed to send task completion: timeout")
|
|
}
|
|
}
|
|
|
|
// UpdateTaskProgress updates task progress to admin server
|
|
func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error {
|
|
if !c.IsConnected() {
|
|
// If we're currently reconnecting, don't wait - just skip the progress update
|
|
reconnecting := c.IsReconnecting()
|
|
|
|
if reconnecting {
|
|
// Don't treat as an error - reconnection is in progress
|
|
glog.V(2).Infof("Skipping task progress update during reconnection for task %s", taskID)
|
|
return nil
|
|
}
|
|
|
|
// Wait for reconnection for a short time
|
|
if err := c.waitForConnection(5 * time.Second); err != nil {
|
|
return fmt.Errorf("not connected to admin server: %w", err)
|
|
}
|
|
}
|
|
|
|
msg := &worker_pb.WorkerMessage{
|
|
WorkerId: c.workerID,
|
|
Timestamp: time.Now().Unix(),
|
|
Message: &worker_pb.WorkerMessage_TaskUpdate{
|
|
TaskUpdate: &worker_pb.TaskUpdate{
|
|
TaskId: taskID,
|
|
WorkerId: c.workerID,
|
|
Status: "in_progress",
|
|
Progress: float32(progress),
|
|
},
|
|
},
|
|
}
|
|
|
|
select {
|
|
case c.outgoing <- msg:
|
|
return nil
|
|
case <-time.After(time.Second):
|
|
return fmt.Errorf("failed to send task progress: timeout")
|
|
}
|
|
}
|
|
|
|
// waitForConnection waits for the connection to be established or timeout
|
|
func (c *GrpcAdminClient) waitForConnection(timeout time.Duration) error {
|
|
deadline := time.Now().Add(timeout)
|
|
|
|
for time.Now().Before(deadline) {
|
|
connected := c.IsConnected()
|
|
shouldReconnect := c.ShouldReconnect()
|
|
|
|
if connected {
|
|
return nil
|
|
}
|
|
|
|
if !shouldReconnect {
|
|
return fmt.Errorf("reconnection is disabled")
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
|
|
return fmt.Errorf("timeout waiting for connection")
|
|
}
|
|
|
|
// GetIncomingChannel returns the incoming message channel for message processing
|
|
// This allows the worker to process admin messages directly
|
|
func (c *GrpcAdminClient) GetIncomingChannel() <-chan *worker_pb.AdminMessage {
|
|
return c.incoming
|
|
}
|
|
|
|
// CreateAdminClient creates an admin client with the provided dial option
|
|
func CreateAdminClient(adminServer string, workerID string, dialOption grpc.DialOption) (AdminClient, error) {
|
|
return NewGrpcAdminClient(adminServer, workerID, dialOption), nil
|
|
}
|
|
|
|
// getServerFromParams extracts server address from unified sources
|
|
func getServerFromParams(params *worker_pb.TaskParams) string {
|
|
if len(params.Sources) > 0 {
|
|
return params.Sources[0].Node
|
|
}
|
|
return ""
|
|
}
|