Files
seaweedFS/weed/worker/client.go
Chris Lu 9a4f32fc49 feat: add automatic port detection and fallback for mini command (#7836)
* feat: add automatic port detection and fallback for mini command

- Added port availability detection using TCP binding tests
- Implemented port fallback mechanism searching for available ports
- Support for both HTTP and gRPC port handling
- IP-aware port checking using actual service bind address
- Dual-interface verification (specific IP and wildcard 0.0.0.0)
- All services (Master, Volume, Filer, S3, WebDAV, Admin) auto-reallocate to available ports
- Enables multiple mini instances to run simultaneously without conflicts

* fix: use actual bind IP for service health checks

- Previously health checks were hardcoded to localhost (127.0.0.1)
- This caused failures when services bind to actual IP (e.g., 10.21.153.8)
- Now health checks use the same IP that services are binding to
- Fixes Volume and other service health check failures on non-localhost IPs

* refactor: improve port detection logic and remove gRPC handling duplication

- findAvailablePortOnIP now returns 0 on failure instead of unavailable port
  Allows callers to detect when port finding fails and handle appropriately

- Remove duplicate gRPC port handling from ensureAllPortsAvailableOnIP
  All gRPC port logic is now centralized in initializeGrpcPortsOnIP

- Log final port configuration only after all ports are finalized
  Both HTTP and gRPC ports are now correctly initialized before logging

- Add error logging when port allocation fails
  Makes debugging easier when ports can't be found

* refactor: fix race condition and clean up port detection code

- Convert parallel HTTP port checks to sequential to prevent race conditions
  where multiple goroutines could allocate the same available port
- Remove unused 'sync' import since WaitGroup is no longer used
- Add documentation to localhost wrapper functions explaining they are
  kept for backwards compatibility and future use
- All gRPC port logic is now exclusively handled in initializeGrpcPortsOnIP
  eliminating any duplication in ensureAllPortsAvailableOnIP

* refactor: address code review comments - constants, helper function, and cleanup

- Define GrpcPortOffset constant (10000) to replace magic numbers throughout
  the code for better maintainability and consistency
- Extract bindIp determination logic into getBindIp() helper function
  to eliminate code duplication between runMini and startMiniServices
- Remove redundant 'calculatedPort = calculatedPort' assignment that had no effect
- Update all gRPC port calculations to use GrpcPortOffset constant
  (lines 489, 886 and the error logging at line 501)

* refactor: remove unused wrapper functions and update documentation

- Remove unused localhost wrapper functions that were never called:
  - isPortOpen() - wrapper around isPortOpenOnIP with hardcoded 127.0.0.1
  - findAvailablePort() - wrapper around findAvailablePortOnIP with hardcoded 127.0.0.1
  - ensurePortAvailable() - wrapper around ensurePortAvailableOnIP with hardcoded 127.0.0.1
  - ensureAllPortsAvailable() - wrapper around ensureAllPortsAvailableOnIP with hardcoded 127.0.0.1

  Since this is new functionality with no backwards compatibility concerns,
  these wrapper functions were not needed. The comments claiming they were
  'kept for future use or backwards compatibility' are no longer valid.

- Update documentation to reference GrpcPortOffset constant instead of hardcoded 10000:
  - Update comment in ensureAllPortsAvailableOnIP to use GrpcPortOffset
  - Update admin.port.grpc flag help text to reference GrpcPortOffset

Note: getBindIp() is actually being used and should be retained (contrary to
the review comment suggesting it was unused - it's called in both runMini
and startMiniServices functions)

* refactor: prevent HTTP/gRPC port collisions and improve error handling

- Add upfront reservation of all calculated gRPC ports before allocating HTTP ports
  to prevent collisions where an HTTP port allocation could use a port that will
  later be needed for a gRPC port calculation.

  Example scenario that is now prevented:
  - Master HTTP reallocated from 9333 to 9334 (original in use)
  - Filer HTTP search finds 19334 available and assigns it
  - Master gRPC calculated as 9334 + GrpcPortOffset = 19334 → collision!

  Now: reserved gRPC ports are tracked upfront and HTTP port search skips them.

- Improve admin server gRPC port fallback error handling:
  - Change from silent V(1) verbose log to Warningf to make the error visible
  - Update comment to clarify this indicates a problem in the port initialization sequence
  - Add explanation that the fallback calculation may cause bind failure

- Update ensureAllPortsAvailableOnIP comment to clarify it avoids reserved ports

* fix: enforce reserved ports in HTTP allocation and improve admin gRPC fallback

Critical fixes for port allocation safety:

1. Make findAvailablePortOnIP and ensurePortAvailableOnIP aware of reservedPorts:
   - Add reservedPorts map parameter to both functions
   - findAvailablePortOnIP now skips reserved ports when searching for alternatives
   - ensurePortAvailableOnIP passes reservedPorts through to findAvailablePortOnIP
   - This prevents HTTP ports from being allocated to ports reserved for gRPC

2. Update ensureAllPortsAvailableOnIP to pass reservedPorts:
   - Pass the reservedPorts map to ensurePortAvailableOnIP calls
   - Maintains the map updates (delete/add) for accuracy as ports change

3. Replace blind admin gRPC port fallback with proper availability checks:
   - Previous code just calculated *miniAdminOptions.port + GrpcPortOffset
   - New code checks both the calculated port and finds alternatives if needed
   - Uses the same availability checking logic as initializeGrpcPortsOnIP
   - Properly logs the fallback process and any port changes
   - Will fail gracefully if no available ports found (consistent with other services)

These changes eliminate two critical vulnerabilities:
- HTTP port allocation can no longer accidentally claim gRPC ports
- Admin gRPC port fallback no longer blindly uses an unchecked port

* fix: prevent gRPC port collisions during multi-service fallback allocation

Critical fix for gRPC port allocation safety across multiple services:

Problem: When multiple services need gRPC port fallback allocation in sequence
(e.g., Master gRPC unavailable → finds alternative, then Filer gRPC unavailable
→ searches from calculated port), there was no tracking of previously allocated
gRPC ports. This could allow two services to claim the same port.

Scenario that is now prevented:
- Master gRPC: calculated 19333 unavailable → finds 19334 → assigns 19334
- Filer gRPC: calculated 18888 unavailable → searches from 18889, might land on
  19334 if consecutive ports in range are unavailable (especially with custom
  port configurations or in high-port-contention environments)

Solution:
- Add allocatedGrpcPorts map to track gRPC ports allocated within the function
- Check allocatedGrpcPorts before using calculated port for each service
- Pass allocatedGrpcPorts to findAvailablePortOnIP when finding fallback ports
- Add allocatedGrpcPorts[port] = true after each successful allocation
- This ensures no two services can allocate the same gRPC port

The fix handles both:
1. Calculated gRPC ports (when grpcPort == 0)
2. Explicitly set gRPC ports (when user provides -service.port.grpc value)

While default port spacing makes collision unlikely, this fix is essential for:
- Custom port configurations
- High-contention environments
- Edge cases with many unavailable consecutive ports
- Correctness and safety guarantees

* feat: enforce hard-fail behavior for explicitly specified ports

When users explicitly specify a port via command-line flags (e.g., -s3.port=8333),
the server should fail immediately if the port is unavailable, rather than silently
falling back to an alternative port. This prevents user confusion and makes misconfiguration
failures obvious.

Changes:
- Modified ensurePortAvailableOnIP() to check if a port was explicitly passed via isFlagPassed()
- If an explicit port is unavailable, return error instead of silently allocating alternative
- Updated ensureAllPortsAvailableOnIP() to handle the returned error and fail startup
- Modified runMini() to check error from ensureAllPortsAvailableOnIP() and return false on failure
- Default ports (not explicitly specified) continue to fallback to available alternatives

This ensures:
- Explicit ports: fail if unavailable (e.g., -s3.port=8333 fails if 8333 is taken)
- Default ports: fallback to alternatives (e.g., s3.port without flag falls back to 8334 if 8333 taken)

* fix: accurate error messages for explicitly specified unavailable ports

When a port is explicitly specified via CLI flags but is unavailable, the error message
now correctly reports the originally requested port instead of reporting a fallback port
that was calculated internally.

The issue was that the config file applied after CLI flag parsing caused isFlagPassed()
to return true for ports loaded from the config file (since flag.Visit() was called during
config file application), incorrectly marking them as explicitly specified.

Solution: Capture which port flags were explicitly passed on the CLI BEFORE the config file
is applied, storing them in the explicitPortFlags map. This preserves the accurate
distinction between user-specified ports and defaults/config-file ports.

Example:
- User runs: weed mini -dir=. -s3.port=22
- Now correctly shows: 'port 22 for S3 (specified by flag s3.port) is not available'
- Previously incorrectly showed: 'port 8334 for S3...' (some calculated fallback)

* fix: respect explicitly specified ports and prevent config file override

When a port is explicitly specified via CLI flags (e.g., -s3.port=8333),
the config file options should NOT override it. Previously, config file
options would be applied if the flag value differed from default, but
this check wasn't sufficient to prevent override in all cases.

Solution: Check the explicitPortFlags map before applying any config file
port options. If a port was explicitly passed on the CLI, skip applying
the config file option for that port.

This ensures:
- Explicit ports take absolute precedence over config file ports
- Config file ports are only used if port wasn't specified on CLI
- Example: 'weed mini -s3.port=8333' will use 8333, never the config file value

* fix: don't print usage on port allocation error

When a port allocation fails (e.g., explicit port is unavailable), exit
immediately without showing the usage example. This provides cleaner
error output when the error is expected (port conflict).

* fix: increase worker registration timeout for reconnections

Increase the worker registration timeout from 10 seconds to 30 seconds.
The 10-second timeout was too aggressive for reconnections when the admin
server might be busy processing other operations. Reconnecting workers need
more time to:
1. Re-establish the gRPC connection
2. Send the registration message
3. Wait for the admin server to process and respond

This prevents spurious "registration timeout" errors during long-running
mini instances when brief network hiccups or admin server load cause delays.

* refactor: clean up code quality issues

Remove no-op assignment (calculatedPort = calculatedPort) that had no effect.
The variable already holds the correct value when no alternative port is
found.

Improve documentation for the defensive gRPC port initialization fallback
in startAdminServer. While this code shouldn't execute in normal flow
because ensureAllPortsAvailableOnIP is called earlier in runMini, the
fallback handles edge cases where port initialization may have been skipped
or failed silently due to configuration changes or error handling paths.
2025-12-21 23:25:30 -08:00

840 lines
24 KiB
Go

package worker
import (
"context"
"errors"
"fmt"
"io"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/grpc"
)
var (
ErrAlreadyConnected = errors.New("already connected")
)
// GrpcAdminClient implements AdminClient using gRPC bidirectional streaming
type GrpcAdminClient struct {
adminAddress string
workerID string
dialOption grpc.DialOption
cmds chan grpcCommand
// Reconnection parameters
maxReconnectAttempts int
reconnectBackoff time.Duration
maxReconnectBackoff time.Duration
reconnectMultiplier float64
// Channels for communication
outgoing chan *worker_pb.WorkerMessage
incoming chan *worker_pb.AdminMessage
responseChans map[string]chan *worker_pb.AdminMessage
}
type grpcAction string
const (
ActionConnect grpcAction = "connect"
ActionDisconnect grpcAction = "disconnect"
ActionReconnect grpcAction = "reconnect"
ActionStreamError grpcAction = "stream_error"
ActionRegisterWorker grpcAction = "register_worker"
ActionQueryReconnecting grpcAction = "query_reconnecting"
ActionQueryConnected grpcAction = "query_connected"
ActionQueryShouldReconnect grpcAction = "query_shouldreconnect"
)
type registrationRequest struct {
Worker *types.WorkerData
Resp chan error // Used to send the registration result back
}
type grpcCommand struct {
action grpcAction
data any
resp chan error // for reporting success/failure
}
type grpcState struct {
connected bool
reconnecting bool
shouldReconnect bool
conn *grpc.ClientConn
client worker_pb.WorkerServiceClient
stream worker_pb.WorkerService_WorkerStreamClient
streamCtx context.Context
streamCancel context.CancelFunc
lastWorkerInfo *types.WorkerData
reconnectStop chan struct{}
streamExit chan struct{}
}
// NewGrpcAdminClient creates a new gRPC admin client
func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.DialOption) *GrpcAdminClient {
// Admin uses HTTP port + 10000 as gRPC port
grpcAddress := pb.ServerToGrpcAddress(adminAddress)
c := &GrpcAdminClient{
adminAddress: grpcAddress,
workerID: workerID,
dialOption: dialOption,
maxReconnectAttempts: 0, // 0 means infinite attempts
reconnectBackoff: 1 * time.Second,
maxReconnectBackoff: 30 * time.Second,
reconnectMultiplier: 1.5,
outgoing: make(chan *worker_pb.WorkerMessage, 100),
incoming: make(chan *worker_pb.AdminMessage, 100),
responseChans: make(map[string]chan *worker_pb.AdminMessage),
cmds: make(chan grpcCommand),
}
go c.managerLoop()
return c
}
// safeCloseChannel safely closes a channel and sets it to nil to prevent double-close panics.
// NOTE: This function is NOT thread-safe. It is safe to use in this codebase because all calls
// are serialized within the managerLoop goroutine. If this function is used in concurrent contexts
// in the future, synchronization (e.g., sync.Mutex) should be added.
func (c *GrpcAdminClient) safeCloseChannel(chPtr *chan struct{}) {
if *chPtr != nil {
close(*chPtr)
*chPtr = nil
}
}
func (c *GrpcAdminClient) managerLoop() {
state := &grpcState{shouldReconnect: true}
out:
for cmd := range c.cmds {
switch cmd.action {
case ActionConnect:
c.handleConnect(cmd, state)
case ActionDisconnect:
c.handleDisconnect(cmd, state)
break out
case ActionReconnect:
if state.connected || state.reconnecting || !state.shouldReconnect {
cmd.resp <- ErrAlreadyConnected
continue
}
state.reconnecting = true // Manager acknowledges the attempt
err := c.reconnect(state)
state.reconnecting = false
cmd.resp <- err
case ActionStreamError:
state.connected = false
case ActionRegisterWorker:
req := cmd.data.(registrationRequest)
state.lastWorkerInfo = req.Worker
if !state.connected {
glog.V(1).Infof("Not connected yet, worker info stored for registration upon connection")
// Respond immediately with success (registration will happen later)
req.Resp <- nil
continue
}
err := c.sendRegistration(req.Worker)
req.Resp <- err
case ActionQueryConnected:
respCh := cmd.data.(chan bool)
respCh <- state.connected
case ActionQueryReconnecting:
respCh := cmd.data.(chan bool)
respCh <- state.reconnecting
case ActionQueryShouldReconnect:
respCh := cmd.data.(chan bool)
respCh <- state.shouldReconnect
}
}
}
// Connect establishes gRPC connection to admin server with TLS detection
func (c *GrpcAdminClient) Connect() error {
resp := make(chan error)
c.cmds <- grpcCommand{
action: ActionConnect,
resp: resp,
}
return <-resp
}
func (c *GrpcAdminClient) handleConnect(cmd grpcCommand, s *grpcState) {
if s.connected {
cmd.resp <- fmt.Errorf("already connected")
return
}
// Start reconnection loop immediately (async)
stop := make(chan struct{})
s.reconnectStop = stop
go c.reconnectionLoop(stop)
// Attempt the initial connection
err := c.attemptConnection(s)
if err != nil {
glog.V(1).Infof("Initial connection failed, reconnection loop will retry: %v", err)
cmd.resp <- err
return
}
cmd.resp <- nil
}
// createConnection attempts to connect using the provided dial option
func (c *GrpcAdminClient) createConnection() (*grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := pb.GrpcDial(ctx, c.adminAddress, false, c.dialOption)
if err != nil {
return nil, fmt.Errorf("failed to connect to admin server: %w", err)
}
glog.Infof("Connected to admin server at %s", c.adminAddress)
return conn, nil
}
// attemptConnection tries to establish the connection without managing the reconnection loop
func (c *GrpcAdminClient) attemptConnection(s *grpcState) error {
// Detect TLS support and create appropriate connection
conn, err := c.createConnection()
if err != nil {
return fmt.Errorf("failed to connect to admin server: %w", err)
}
s.conn = conn
s.client = worker_pb.NewWorkerServiceClient(conn)
// Create bidirectional stream
s.streamCtx, s.streamCancel = context.WithCancel(context.Background())
stream, err := s.client.WorkerStream(s.streamCtx)
glog.Infof("Worker stream created")
if err != nil {
s.conn.Close()
return fmt.Errorf("failed to create worker stream: %w", err)
}
s.connected = true
s.stream = stream
// Start stream handlers BEFORE sending registration
// This ensures handleIncoming is ready to receive the registration response
s.streamExit = make(chan struct{})
go handleOutgoing(s.stream, s.streamExit, c.outgoing, c.cmds)
go handleIncoming(c.workerID, s.stream, s.streamExit, c.incoming, c.cmds)
// Always check for worker info and send registration immediately as the very first message
if s.lastWorkerInfo != nil {
// Send registration via the normal outgoing channel and wait for response via incoming
if err := c.sendRegistration(s.lastWorkerInfo); err != nil {
c.safeCloseChannel(&s.streamExit)
s.streamCancel()
s.conn.Close()
s.connected = false
return fmt.Errorf("failed to register worker: %w", err)
}
glog.Infof("Worker registered successfully with admin server")
} else {
// No worker info yet - stream will wait for registration
glog.V(1).Infof("Connected to admin server, waiting for worker registration info")
}
glog.Infof("Connected to admin server at %s", c.adminAddress)
return nil
}
// reconnect attempts to re-establish the connection
func (c *GrpcAdminClient) reconnect(s *grpcState) error {
// Clean up existing connection completely
c.safeCloseChannel(&s.streamExit)
if s.streamCancel != nil {
s.streamCancel()
}
if s.conn != nil {
s.conn.Close()
}
s.connected = false
// Attempt to re-establish connection using the same logic as initial connection
if err := c.attemptConnection(s); err != nil {
return fmt.Errorf("failed to reconnect: %w", err)
}
// Registration is now handled in attemptConnection if worker info is available
return nil
}
// reconnectionLoop handles automatic reconnection with exponential backoff
func (c *GrpcAdminClient) reconnectionLoop(reconnectStop chan struct{}) {
backoff := c.reconnectBackoff
attempts := 0
for {
waitDuration := backoff
if attempts == 0 {
waitDuration = time.Second
}
select {
case <-reconnectStop:
return
case <-time.After(waitDuration):
}
resp := make(chan error, 1)
c.cmds <- grpcCommand{
action: ActionReconnect,
resp: resp,
}
err := <-resp
if err == nil {
// Successful reconnection
attempts = 0
backoff = c.reconnectBackoff
glog.Infof("Successfully reconnected to admin server")
} else if errors.Is(err, ErrAlreadyConnected) {
attempts = 0
backoff = c.reconnectBackoff
} else {
attempts++
glog.Errorf("Reconnection attempt %d failed: %v", attempts, err)
// Check if we should give up
if c.maxReconnectAttempts > 0 && attempts >= c.maxReconnectAttempts {
glog.Errorf("Max reconnection attempts (%d) reached, giving up", c.maxReconnectAttempts)
return
}
// Increase backoff
backoff = time.Duration(float64(backoff) * c.reconnectMultiplier)
if backoff > c.maxReconnectBackoff {
backoff = c.maxReconnectBackoff
}
glog.Infof("Waiting %v before next reconnection attempt", backoff)
}
}
}
// handleOutgoing processes outgoing messages to admin
func handleOutgoing(
stream worker_pb.WorkerService_WorkerStreamClient,
streamExit <-chan struct{},
outgoing <-chan *worker_pb.WorkerMessage,
cmds chan<- grpcCommand) {
msgCh := make(chan *worker_pb.WorkerMessage)
errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy
// Goroutine to handle blocking stream.Recv() and simultaneously handle exit
// signals
go func() {
for msg := range msgCh {
if err := stream.Send(msg); err != nil {
errCh <- err
return // Exit the receiver goroutine on error/EOF
}
}
close(errCh)
}()
for msg := range outgoing {
select {
case msgCh <- msg:
case err := <-errCh:
glog.Errorf("Failed to send message to admin: %v", err)
cmds <- grpcCommand{action: ActionStreamError, data: err}
return
case <-streamExit:
close(msgCh)
<-errCh
return
}
}
}
// handleIncoming processes incoming messages from admin
func handleIncoming(
workerID string,
stream worker_pb.WorkerService_WorkerStreamClient,
streamExit <-chan struct{},
incoming chan<- *worker_pb.AdminMessage,
cmds chan<- grpcCommand) {
glog.V(1).Infof("INCOMING HANDLER STARTED: Worker %s incoming message handler started", workerID)
msgCh := make(chan *worker_pb.AdminMessage)
errCh := make(chan error, 1) // Buffered to prevent blocking if the manager is busy
// Goroutine to handle blocking stream.Recv() and simultaneously handle exit
// signals
go func() {
for {
msg, err := stream.Recv()
if err != nil {
errCh <- err
return // Exit the receiver goroutine on error/EOF
}
msgCh <- msg
}
}()
for {
glog.V(4).Infof("LISTENING: Worker %s waiting for message from admin server", workerID)
select {
case msg := <-msgCh:
// Message successfully received from the stream
glog.V(4).Infof("MESSAGE RECEIVED: Worker %s received message from admin server: %T", workerID, msg.Message)
// Route message to waiting goroutines or general handler (original select logic)
select {
case incoming <- msg:
glog.V(3).Infof("MESSAGE ROUTED: Worker %s successfully routed message to handler", workerID)
case <-time.After(time.Second):
glog.Warningf("MESSAGE DROPPED: Worker %s incoming message buffer full, dropping message: %T", workerID, msg.Message)
}
case err := <-errCh:
// Stream Receiver goroutine reported an error (EOF or network error)
if err == io.EOF {
glog.Infof("STREAM CLOSED: Worker %s admin server closed the stream", workerID)
} else {
glog.Errorf("RECEIVE ERROR: Worker %s failed to receive message from admin: %v", workerID, err)
}
// Report the failure as a command to the managerLoop (blocking)
cmds <- grpcCommand{action: ActionStreamError, data: err}
// Exit the main handler loop
glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler due to stream error", workerID)
return
case <-streamExit:
// Manager closed this channel, signaling a controlled disconnection.
glog.V(1).Infof("INCOMING HANDLER STOPPED: Worker %s stopping incoming handler - received exit signal", workerID)
return
}
}
}
// Connect establishes gRPC connection to admin server with TLS detection
func (c *GrpcAdminClient) Disconnect() error {
resp := make(chan error)
c.cmds <- grpcCommand{
action: ActionDisconnect,
resp: resp,
}
err := <-resp
return err
}
func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) {
if !s.connected {
cmd.resp <- fmt.Errorf("already disconnected")
return
}
// Send shutdown signal to stop reconnection loop
c.safeCloseChannel(&s.reconnectStop)
s.connected = false
s.shouldReconnect = false
// Send shutdown message
shutdownMsg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_Shutdown{
Shutdown: &worker_pb.WorkerShutdown{
WorkerId: c.workerID,
Reason: "normal shutdown",
},
},
}
// Close outgoing/incoming
select {
case c.outgoing <- shutdownMsg:
case <-time.After(time.Second):
glog.Warningf("Failed to send shutdown message")
}
// Send shutdown signal to stop handlers loop
c.safeCloseChannel(&s.streamExit)
// Cancel stream context
if s.streamCancel != nil {
s.streamCancel()
}
// Close connection
if s.conn != nil {
s.conn.Close()
}
// Close channels to signal all goroutines to stop
// This will cause any pending sends/receives to fail gracefully
close(c.outgoing)
close(c.incoming)
glog.Infof("Disconnected from admin server")
cmd.resp <- nil
}
// RegisterWorker registers the worker with the admin server
func (c *GrpcAdminClient) RegisterWorker(worker *types.WorkerData) error {
respCh := make(chan error, 1)
request := registrationRequest{
Worker: worker,
Resp: respCh,
}
c.cmds <- grpcCommand{
action: ActionRegisterWorker,
data: request,
}
return <-respCh
}
// sendRegistration sends the registration message and waits for response
func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error {
capabilities := make([]string, len(worker.Capabilities))
for i, cap := range worker.Capabilities {
capabilities[i] = string(cap)
}
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_Registration{
Registration: &worker_pb.WorkerRegistration{
WorkerId: c.workerID,
Address: worker.Address,
Capabilities: capabilities,
MaxConcurrent: int32(worker.MaxConcurrent),
Metadata: make(map[string]string),
},
},
}
select {
case c.outgoing <- msg:
case <-time.After(5 * time.Second):
return fmt.Errorf("failed to send registration message: timeout")
}
// Wait for registration response
// Use longer timeout for reconnections since admin server might be busy
timeout := time.NewTimer(30 * time.Second)
defer timeout.Stop()
for {
select {
case response := <-c.incoming:
if regResp := response.GetRegistrationResponse(); regResp != nil {
if regResp.Success {
glog.Infof("Worker registered successfully: %s", regResp.Message)
return nil
}
return fmt.Errorf("registration failed: %s", regResp.Message)
}
case <-timeout.C:
return fmt.Errorf("registration timeout")
}
}
}
func (c *GrpcAdminClient) IsConnected() bool {
respCh := make(chan bool, 1)
c.cmds <- grpcCommand{
action: ActionQueryConnected,
data: respCh,
}
return <-respCh
}
func (c *GrpcAdminClient) IsReconnecting() bool {
respCh := make(chan bool, 1)
c.cmds <- grpcCommand{
action: ActionQueryReconnecting,
data: respCh,
}
return <-respCh
}
func (c *GrpcAdminClient) ShouldReconnect() bool {
respCh := make(chan bool, 1)
c.cmds <- grpcCommand{
action: ActionQueryShouldReconnect,
data: respCh,
}
return <-respCh
}
// SendHeartbeat sends heartbeat to admin server
func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerStatus) error {
if !c.IsConnected() {
// If we're currently reconnecting, don't wait - just skip the heartbeat
reconnecting := c.IsReconnecting()
if reconnecting {
// Don't treat as an error - reconnection is in progress
glog.V(2).Infof("Skipping heartbeat during reconnection")
return nil
}
// Wait for reconnection for a short time
if err := c.waitForConnection(10 * time.Second); err != nil {
return fmt.Errorf("not connected to admin server: %w", err)
}
}
taskIds := make([]string, len(status.CurrentTasks))
for i, task := range status.CurrentTasks {
taskIds[i] = task.ID
}
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_Heartbeat{
Heartbeat: &worker_pb.WorkerHeartbeat{
WorkerId: c.workerID,
Status: status.Status,
CurrentLoad: int32(status.CurrentLoad),
MaxConcurrent: int32(status.MaxConcurrent),
CurrentTaskIds: taskIds,
TasksCompleted: int32(status.TasksCompleted),
TasksFailed: int32(status.TasksFailed),
UptimeSeconds: int64(status.Uptime.Seconds()),
},
},
}
select {
case c.outgoing <- msg:
return nil
case <-time.After(time.Second):
return fmt.Errorf("failed to send heartbeat: timeout")
}
}
// RequestTask requests a new task from admin server
func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) {
if !c.IsConnected() {
// If we're currently reconnecting, don't wait - just return no task
reconnecting := c.IsReconnecting()
if reconnecting {
// Don't treat as an error - reconnection is in progress
glog.V(2).Infof("RECONNECTING: Worker %s skipping task request during reconnection", workerID)
return nil, nil
}
// Wait for reconnection for a short time
if err := c.waitForConnection(5 * time.Second); err != nil {
return nil, fmt.Errorf("not connected to admin server: %w", err)
}
}
caps := make([]string, len(capabilities))
for i, cap := range capabilities {
caps[i] = string(cap)
}
glog.V(3).Infof("📤 SENDING TASK REQUEST: Worker %s sending task request to admin server with capabilities: %v",
workerID, capabilities)
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_TaskRequest{
TaskRequest: &worker_pb.TaskRequest{
WorkerId: c.workerID,
Capabilities: caps,
AvailableSlots: 1, // Request one task
},
},
}
select {
case c.outgoing <- msg:
glog.V(3).Infof("TASK REQUEST SENT: Worker %s successfully sent task request to admin server", workerID)
case <-time.After(time.Second):
glog.Errorf("TASK REQUEST TIMEOUT: Worker %s failed to send task request: timeout", workerID)
return nil, fmt.Errorf("failed to send task request: timeout")
}
// Wait for task assignment
glog.V(3).Infof("WAITING FOR RESPONSE: Worker %s waiting for task assignment response (5s timeout)", workerID)
timeout := time.NewTimer(5 * time.Second)
defer timeout.Stop()
for {
select {
case response := <-c.incoming:
glog.V(3).Infof("RESPONSE RECEIVED: Worker %s received response from admin server: %T", workerID, response.Message)
if taskAssign := response.GetTaskAssignment(); taskAssign != nil {
glog.V(1).Infof("Worker %s received task assignment in response: %s (type: %s, volume: %d)",
workerID, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId)
// Convert to our task type
task := &types.TaskInput{
ID: taskAssign.TaskId,
Type: types.TaskType(taskAssign.TaskType),
Status: types.TaskStatusAssigned,
VolumeID: taskAssign.Params.VolumeId,
Server: getServerFromParams(taskAssign.Params),
Collection: taskAssign.Params.Collection,
Priority: types.TaskPriority(taskAssign.Priority),
CreatedAt: time.Unix(taskAssign.CreatedTime, 0),
// Use typed protobuf parameters directly
TypedParams: taskAssign.Params,
}
return task, nil
} else {
glog.V(3).Infof("NON-TASK RESPONSE: Worker %s received non-task response: %T", workerID, response.Message)
}
case <-timeout.C:
glog.V(3).Infof("TASK REQUEST TIMEOUT: Worker %s - no task assignment received within 5 seconds", workerID)
return nil, nil // No task available
}
}
}
// CompleteTask reports task completion to admin server
func (c *GrpcAdminClient) CompleteTask(taskID string, success bool, errorMsg string) error {
return c.CompleteTaskWithMetadata(taskID, success, errorMsg, nil)
}
// CompleteTaskWithMetadata reports task completion with additional metadata
func (c *GrpcAdminClient) CompleteTaskWithMetadata(taskID string, success bool, errorMsg string, metadata map[string]string) error {
if !c.IsConnected() {
// If we're currently reconnecting, don't wait - just skip the completion report
reconnecting := c.IsReconnecting()
if reconnecting {
// Don't treat as an error - reconnection is in progress
glog.V(2).Infof("Skipping task completion report during reconnection for task %s", taskID)
return nil
}
// Wait for reconnection for a short time
if err := c.waitForConnection(5 * time.Second); err != nil {
return fmt.Errorf("not connected to admin server: %w", err)
}
}
taskComplete := &worker_pb.TaskComplete{
TaskId: taskID,
WorkerId: c.workerID,
Success: success,
ErrorMessage: errorMsg,
CompletionTime: time.Now().Unix(),
}
// Add metadata if provided
if metadata != nil {
taskComplete.ResultMetadata = metadata
}
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_TaskComplete{
TaskComplete: taskComplete,
},
}
select {
case c.outgoing <- msg:
return nil
case <-time.After(time.Second):
return fmt.Errorf("failed to send task completion: timeout")
}
}
// UpdateTaskProgress updates task progress to admin server
func (c *GrpcAdminClient) UpdateTaskProgress(taskID string, progress float64) error {
if !c.IsConnected() {
// If we're currently reconnecting, don't wait - just skip the progress update
reconnecting := c.IsReconnecting()
if reconnecting {
// Don't treat as an error - reconnection is in progress
glog.V(2).Infof("Skipping task progress update during reconnection for task %s", taskID)
return nil
}
// Wait for reconnection for a short time
if err := c.waitForConnection(5 * time.Second); err != nil {
return fmt.Errorf("not connected to admin server: %w", err)
}
}
msg := &worker_pb.WorkerMessage{
WorkerId: c.workerID,
Timestamp: time.Now().Unix(),
Message: &worker_pb.WorkerMessage_TaskUpdate{
TaskUpdate: &worker_pb.TaskUpdate{
TaskId: taskID,
WorkerId: c.workerID,
Status: "in_progress",
Progress: float32(progress),
},
},
}
select {
case c.outgoing <- msg:
return nil
case <-time.After(time.Second):
return fmt.Errorf("failed to send task progress: timeout")
}
}
// waitForConnection waits for the connection to be established or timeout
func (c *GrpcAdminClient) waitForConnection(timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
connected := c.IsConnected()
shouldReconnect := c.ShouldReconnect()
if connected {
return nil
}
if !shouldReconnect {
return fmt.Errorf("reconnection is disabled")
}
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf("timeout waiting for connection")
}
// GetIncomingChannel returns the incoming message channel for message processing
// This allows the worker to process admin messages directly
func (c *GrpcAdminClient) GetIncomingChannel() <-chan *worker_pb.AdminMessage {
return c.incoming
}
// CreateAdminClient creates an admin client with the provided dial option
func CreateAdminClient(adminServer string, workerID string, dialOption grpc.DialOption) (AdminClient, error) {
return NewGrpcAdminClient(adminServer, workerID, dialOption), nil
}
// getServerFromParams extracts server address from unified sources
func getServerFromParams(params *worker_pb.TaskParams) string {
if len(params.Sources) > 0 {
return params.Sources[0].Node
}
return ""
}