Files
seaweedFS/weed/wdclient/masterclient.go
Chris Lu 5b86d33c3c Fix worker reconnection race condition causing context canceled errors (#7825)
* Fix worker reconnection race condition causing context canceled errors

Fixes #7824

This commit fixes critical connection stability issues between admin server
and workers that manifested as rapid reconnection cycles with 'context canceled'
errors, particularly after 24+ hours of operation in containerized environments.

Root Cause:
-----------
Race condition where TWO goroutines were calling stream.Recv() on the same
gRPC bidirectional stream concurrently:

1. sendRegistrationSync() started a goroutine that calls stream.Recv()
2. handleIncoming() also calls stream.Recv() in a loop

Per gRPC specification, only ONE goroutine can call Recv() on a stream at a
time. Concurrent Recv() calls cause undefined behavior, manifesting as
'context canceled' errors and stream corruption.

The race occurred during worker reconnection:
- Sometimes sendRegistrationSync goroutine read the registration response first (success)
- Sometimes handleIncoming read it first, causing sendRegistrationSync to timeout
- This left the stream in an inconsistent state, triggering 'context canceled' error
- The error triggered rapid reconnection attempts, creating a reconnection storm

Why it happened after 24 hours:
Container orchestration systems (Docker Swarm/Kubernetes) periodically restart
pods. Over time, workers reconnect multiple times. Each reconnection had a chance
of hitting the race condition. Eventually the race manifested and caused the
connection storm.

Changes:
--------

weed/worker/client.go:
- Start handleIncoming and handleOutgoing goroutines BEFORE sending registration
- Use sendRegistration() instead of sendRegistrationSync()
- Ensures only ONE goroutine (handleIncoming) calls stream.Recv()
- Eliminates race condition entirely

weed/admin/dash/worker_grpc_server.go:
- Clean up old connection when worker reconnects with same ID
- Cancel old connection context to stop its goroutines
- Prevents resource leaks and stale connection accumulation

Impact:
-------
Before: Random 'context canceled' errors during reconnection, rapid reconnection
        cycles, resource leaks, requires manual restart to recover
After:  Reliable reconnection, single Recv() goroutine, proper cleanup,
        stable operation over 24+ hours

Testing:
--------
Build verified successful with no compilation errors.

How to reproduce the bug:
1. Start admin server and worker
2. Restart admin server (simulates container recreation)
3. Worker reconnects
4. Race condition may manifest, causing 'context canceled' error
5. Observe rapid reconnection cycles in logs

The fix is backward compatible and requires no configuration changes.

* Add MaxConnectionAge to gRPC server for Docker Swarm DNS handling

- Configure MaxConnectionAge and MaxConnectionAgeGrace for gRPC server
- Expand error detection in shouldInvalidateConnection for better cache invalidation
- Add connection lifecycle logging for debugging

* Add topology validation and nil-safety checks

- Add validation guards in UpdateTopology to prevent invalid updates
- Add nil-safety checks in rebuildIndexes
- Add GetDiskCount method for diagnostic purposes

* Fix worker registration race condition

- Reorder goroutine startup in WorkerStream to prevent race conditions
- Add defensive cleanup in unregisterWorker with panic-safe channel closing

* Add comprehensive topology update logging

- Enhance UpdateTopologyInfo with detailed logging of datacenter/node/disk counts
- Add metrics logging for topology changes

* Add periodic diagnostic status logging

- Implement topologyStatusLoop running every 5 minutes
- Add logTopologyStatus function reporting system metrics
- Run as background goroutine in maintenance manager

* Enhance master client connection logging

- Add connection timing logs in tryConnectToMaster
- Add reconnection attempt counting in KeepConnectedToMaster
- Improve diagnostic visibility for connection issues

* Remove unused sendRegistrationSync function

- Function is no longer called after switching to asynchronous sendRegistration
- Contains the problematic concurrent stream.Recv() pattern that caused race conditions
- Cleanup as suggested in PR review

* Clarify comment for channel closing during disconnection

- Improve comment to explain why channels are closed and their effect
- Make the code more self-documenting as suggested in PR review

* Address code review feedback: refactor and improvements

- Extract topology counting logic to shared helper function
  CountTopologyResources() to eliminate duplication between
  topology_management.go and maintenance_integration.go

- Use gRPC status codes for more robust error detection in
  shouldInvalidateConnection(), falling back to string matching
  for transport-level errors

- Add recover wrapper for channel close consistency in
  cleanupStaleConnections() to match unregisterWorker() pattern

* Update grpc_client_server.go

* Fix data race on lastSeen field access

- Add mutex protection around conn.lastSeen = time.Now() in WorkerStream method
- Ensures thread-safe access consistent with cleanupStaleConnections

* Fix goroutine leaks in worker reconnection logic

- Close streamExit in reconnect() before creating new connection
- Close streamExit in attemptConnection() when sendRegistration fails
- Prevents orphaned handleOutgoing/handleIncoming goroutines from previous connections
- Ensures proper cleanup of goroutines competing for shared outgoing channel

* Minor cleanup improvements for consistency and clarity

- Remove redundant string checks in shouldInvalidateConnection that overlap with gRPC status codes
- Add recover block to Stop() method for consistency with other channel close operations
- Maintains valuable DNS and transport-specific error detection while eliminating redundancy

* Improve topology update error handling

- Return descriptive errors instead of silently preserving topology for invalid updates
- Change nil topologyInfo case to return 'rejected invalid topology update: nil topologyInfo'
- Change empty DataCenterInfos case to return 'rejected invalid topology update: empty DataCenterInfos (had X nodes, Y disks)'
- Keep existing glog.Warningf calls but append error details to logs before returning errors
- Allows callers to distinguish rejected updates and handle them appropriately

* Refactor safe channel closing into helper method

- Add safeCloseOutgoingChannel helper method to eliminate code duplication
- Replace repeated recover blocks in Stop, unregisterWorker, and cleanupStaleConnections
- Improves maintainability and ensures consistent error handling across all channel close operations
- Maintains same panic recovery behavior with contextual source identification

* Make connection invalidation string matching case-insensitive

- Convert error string to lowercase once for all string.Contains checks
- Improves robustness by catching error message variations from different sources
- Eliminates need for separate 'DNS resolution' and 'dns' checks
- Maintains same error detection coverage with better reliability

* Clean up warning logs in UpdateTopology to avoid duplicating error text

- Remove duplicated error phrases from glog.Warningf messages
- Keep concise contextual warnings that don't repeat the fmt.Errorf content
- Maintain same error returns for backward compatibility

* Add robust validation to prevent topology wipeout during master restart

- Reject topology updates with 0 nodes when current topology has nodes
- Prevents transient empty topology from overwriting valid state
- Improves resilience during master restart scenarios
- Maintains backward compatibility for legitimate empty topology updates
2025-12-19 19:02:56 -08:00

431 lines
16 KiB
Go

package wdclient
import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"time"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/version"
)
// masterVolumeProvider implements VolumeLocationProvider by querying master
// This is rarely called since master pushes updates proactively via KeepConnected stream
type masterVolumeProvider struct {
masterClient *MasterClient
}
// LookupVolumeIds queries the master for volume locations (fallback when cache misses)
// Returns partial results with aggregated errors for volumes that failed
func (p *masterVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
result := make(map[string][]Location)
var lookupErrors []error
glog.V(2).Infof("Looking up %d volumes from master: %v", len(volumeIds), volumeIds)
// Use a timeout for the master lookup to prevent indefinite blocking
timeoutCtx, cancel := context.WithTimeout(ctx, p.masterClient.grpcTimeout)
defer cancel()
err := pb.WithMasterClient(false, p.masterClient.GetMaster(ctx), p.masterClient.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(timeoutCtx, &master_pb.LookupVolumeRequest{
VolumeOrFileIds: volumeIds,
})
if err != nil {
return fmt.Errorf("master lookup failed: %v", err)
}
for _, vidLoc := range resp.VolumeIdLocations {
// Preserve per-volume errors from master response
// These could indicate misconfiguration, volume deletion, etc.
if vidLoc.Error != "" {
lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: %s", vidLoc.VolumeOrFileId, vidLoc.Error))
glog.V(1).Infof("volume %s lookup error from master: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
continue
}
// Parse volume ID from response
parts := strings.Split(vidLoc.VolumeOrFileId, ",")
vidOnly := parts[0]
vid, err := strconv.ParseUint(vidOnly, 10, 32)
if err != nil {
lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: invalid volume ID format: %w", vidLoc.VolumeOrFileId, err))
glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err)
continue
}
var locations []Location
for _, masterLoc := range vidLoc.Locations {
loc := Location{
Url: masterLoc.Url,
PublicUrl: masterLoc.PublicUrl,
GrpcPort: int(masterLoc.GrpcPort),
DataCenter: masterLoc.DataCenter,
}
// Update cache with the location
p.masterClient.addLocation(uint32(vid), loc)
locations = append(locations, loc)
}
if len(locations) > 0 {
result[vidOnly] = locations
}
}
return nil
})
if err != nil {
return nil, err
}
// Return partial results with detailed errors
// Callers should check both result map and error
if len(lookupErrors) > 0 {
glog.V(2).Infof("MasterClient: looked up %d volumes, found %d, %d errors", len(volumeIds), len(result), len(lookupErrors))
return result, fmt.Errorf("master volume lookup errors: %w", errors.Join(lookupErrors...))
}
glog.V(3).Infof("MasterClient: looked up %d volumes, found %d", len(volumeIds), len(result))
return result, nil
}
// MasterClient connects to master servers and maintains volume location cache
// It receives real-time updates via KeepConnected streaming and uses vidMapClient for caching
type MasterClient struct {
*vidMapClient // Embedded cache with shared logic
FilerGroup string
clientType string
clientHost pb.ServerAddress
rack string
currentMaster pb.ServerAddress
currentMasterLock sync.RWMutex
masters pb.ServerDiscovery
grpcDialOption grpc.DialOption
grpcTimeout time.Duration // Timeout for gRPC calls to master
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
OnPeerUpdateLock sync.RWMutex
}
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
mc := &MasterClient{
FilerGroup: filerGroup,
clientType: clientType,
clientHost: clientHost,
rack: rack,
masters: masters,
grpcDialOption: grpcDialOption,
grpcTimeout: 5 * time.Second, // Default: 5 seconds for gRPC calls to master
}
// Create provider that references this MasterClient
provider := &masterVolumeProvider{masterClient: mc}
// Initialize embedded vidMapClient with the provider and default cache size
mc.vidMapClient = newVidMapClient(provider, clientDataCenter, DefaultVidMapCacheSize)
return mc
}
func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)) {
mc.OnPeerUpdateLock.Lock()
mc.OnPeerUpdate = onPeerUpdate
mc.OnPeerUpdateLock.Unlock()
}
func (mc *MasterClient) tryAllMasters(ctx context.Context) {
var nextHintedLeader pb.ServerAddress
mc.masters.RefreshBySrvIfAvailable()
for _, master := range mc.masters.GetInstances() {
nextHintedLeader = mc.tryConnectToMaster(ctx, master)
for nextHintedLeader != "" {
select {
case <-ctx.Done():
glog.V(0).Infof("Connection attempt to all masters stopped: %v", ctx.Err())
return
default:
nextHintedLeader = mc.tryConnectToMaster(ctx, nextHintedLeader)
}
}
mc.setCurrentMaster("")
}
}
func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) {
glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master)
stats.MasterClientConnectCounter.WithLabelValues("total").Inc()
connectStartTime := time.Now()
gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := client.KeepConnected(ctx)
if err != nil {
glog.V(1).Infof("%s.%s masterClient failed to keep connected to %s: %v", mc.FilerGroup, mc.clientType, master, err)
stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToKeepConnected).Inc()
return err
}
glog.V(0).Infof("%s.%s masterClient gRPC stream established to %s in %v", mc.FilerGroup, mc.clientType, master, time.Since(connectStartTime))
if err = stream.Send(&master_pb.KeepConnectedRequest{
FilerGroup: mc.FilerGroup,
DataCenter: mc.GetDataCenter(),
Rack: mc.rack,
ClientType: mc.clientType,
ClientAddress: string(mc.clientHost),
Version: version.Version(),
}); err != nil {
glog.V(0).Infof("%s.%s masterClient failed to send to %s: %v", mc.FilerGroup, mc.clientType, master, err)
stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToSend).Inc()
return err
}
glog.V(1).Infof("%s.%s masterClient Connected to %v", mc.FilerGroup, mc.clientType, master)
resp, err := stream.Recv()
if err != nil {
glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err)
stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc()
return err
}
// check if it is the leader to determine whether to reset the vidMap
if resp.VolumeLocation != nil {
if resp.VolumeLocation.Leader != "" && string(master) != resp.VolumeLocation.Leader {
glog.V(0).Infof("master %v redirected to leader %v", master, resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc()
return nil
}
mc.resetVidMap()
mc.updateVidMap(resp)
} else {
// First message from master is not VolumeLocation (e.g., ClusterNodeUpdate)
// Still need to reset cache to ensure we don't use stale data from previous master
mc.resetVidMap()
}
mc.setCurrentMaster(master)
for {
resp, err := stream.Recv()
if err != nil {
glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err)
stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc()
return err
}
if resp.VolumeLocation != nil {
// Check for leader change during the stream
// If master announces a new leader, reconnect to it
if resp.VolumeLocation.Leader != "" && string(mc.GetMaster(ctx)) != resp.VolumeLocation.Leader {
glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(ctx), resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc()
return nil
}
mc.updateVidMap(resp)
}
if resp.ClusterNodeUpdate != nil {
update := resp.ClusterNodeUpdate
mc.OnPeerUpdateLock.RLock()
if mc.OnPeerUpdate != nil {
if update.FilerGroup == mc.FilerGroup {
if update.IsAdd {
glog.V(0).Infof("+ %s@%s noticed %s.%s %s\n", mc.clientType, mc.clientHost, update.FilerGroup, update.NodeType, update.Address)
} else {
glog.V(0).Infof("- %s@%s noticed %s.%s %s\n", mc.clientType, mc.clientHost, update.FilerGroup, update.NodeType, update.Address)
}
stats.MasterClientConnectCounter.WithLabelValues(stats.OnPeerUpdate).Inc()
mc.OnPeerUpdate(update, time.Now())
}
}
mc.OnPeerUpdateLock.RUnlock()
}
if err := ctx.Err(); err != nil {
glog.V(0).Infof("Connection attempt to master stopped: %v", err)
return err
}
}
})
if gprcErr != nil {
stats.MasterClientConnectCounter.WithLabelValues(stats.Failed).Inc()
glog.V(1).Infof("%s.%s masterClient failed to connect with master %v: %v", mc.FilerGroup, mc.clientType, master, gprcErr)
}
return nextHintedLeader
}
func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
if resp.VolumeLocation.IsEmptyUrl() {
glog.V(0).Infof("updateVidMap ignore short heartbeat: %+v", resp)
return
}
// process new volume location
loc := Location{
Url: resp.VolumeLocation.Url,
PublicUrl: resp.VolumeLocation.PublicUrl,
DataCenter: resp.VolumeLocation.DataCenter,
GrpcPort: int(resp.VolumeLocation.GrpcPort),
}
for _, newVid := range resp.VolumeLocation.NewVids {
glog.V(2).Infof("%s.%s: %s masterClient adds volume %d", mc.FilerGroup, mc.clientType, loc.Url, newVid)
mc.addLocation(newVid, loc)
}
for _, deletedVid := range resp.VolumeLocation.DeletedVids {
glog.V(2).Infof("%s.%s: %s masterClient removes volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedVid)
mc.deleteLocation(deletedVid, loc)
}
for _, newEcVid := range resp.VolumeLocation.NewEcVids {
glog.V(2).Infof("%s.%s: %s masterClient adds ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, newEcVid)
mc.addEcLocation(newEcVid, loc)
}
for _, deletedEcVid := range resp.VolumeLocation.DeletedEcVids {
glog.V(2).Infof("%s.%s: %s masterClient removes ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedEcVid)
mc.deleteEcLocation(deletedEcVid, loc)
}
glog.V(1).Infof("updateVidMap(%s) %s.%s: %s volume add: %d, del: %d, add ec: %d del ec: %d",
resp.VolumeLocation.DataCenter, mc.FilerGroup, mc.clientType, loc.Url,
len(resp.VolumeLocation.NewVids), len(resp.VolumeLocation.DeletedVids),
len(resp.VolumeLocation.NewEcVids), len(resp.VolumeLocation.DeletedEcVids))
}
func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
getMasterF := func() pb.ServerAddress {
return mc.GetMaster(context.Background())
}
return mc.WithClientCustomGetMaster(getMasterF, streamingMode, fn)
}
func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAddress, streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
return util.Retry("master grpc", func() error {
return pb.WithMasterClient(streamingMode, getMasterF(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
return fn(client)
})
})
}
func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
mc.currentMasterLock.RLock()
defer mc.currentMasterLock.RUnlock()
return mc.currentMaster
}
func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
mc.currentMasterLock.Lock()
mc.currentMaster = master
mc.currentMasterLock.Unlock()
}
// GetMaster returns the current master address, blocking until connected.
//
// IMPORTANT: This method blocks until KeepConnectedToMaster successfully establishes
// a connection to a master server. If KeepConnectedToMaster hasn't been started in a
// background goroutine, this will block indefinitely (or until ctx is canceled).
//
// Typical initialization pattern:
//
// mc := wdclient.NewMasterClient(...)
// go mc.KeepConnectedToMaster(ctx) // Start connection management
// // ... later ...
// master := mc.GetMaster(ctx) // Will block until connected
//
// If called before KeepConnectedToMaster establishes a connection, this may cause
// unexpected timeouts in LookupVolumeIds and other operations that depend on it.
func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress {
mc.WaitUntilConnected(ctx)
return mc.getCurrentMaster()
}
// GetMasters returns all configured master addresses, blocking until connected.
// See GetMaster() for important initialization contract details.
func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress {
mc.WaitUntilConnected(ctx)
return mc.masters.GetInstances()
}
// WaitUntilConnected blocks until a master connection is established or ctx is canceled.
// This does NOT initiate connections - it only waits for KeepConnectedToMaster to succeed.
func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {
attempts := 0
for {
select {
case <-ctx.Done():
return
default:
currentMaster := mc.getCurrentMaster()
if currentMaster != "" {
return
}
attempts++
if attempts%100 == 0 { // Log every 100 attempts (roughly every 20 seconds)
glog.V(0).Infof("%s.%s WaitUntilConnected still waiting for master connection (attempt %d)...", mc.FilerGroup, mc.clientType, attempts)
}
// Use select with time.After to respect context cancellation during sleep
sleepDuration := time.Duration(rand.Int31n(200)) * time.Millisecond
select {
case <-ctx.Done():
return
case <-time.After(sleepDuration):
// continue to next iteration
}
}
}
}
func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) {
glog.V(0).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
reconnectCount := 0
for {
select {
case <-ctx.Done():
glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err())
return
default:
reconnectStart := time.Now()
if reconnectCount > 0 {
glog.V(0).Infof("%s.%s masterClient reconnection attempt #%d", mc.FilerGroup, mc.clientType, reconnectCount)
}
mc.tryAllMasters(ctx)
reconnectCount++
glog.V(1).Infof("%s.%s masterClient connection cycle completed in %v, sleeping before retry",
mc.FilerGroup, mc.clientType, time.Since(reconnectStart))
time.Sleep(time.Second)
}
}
}
func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) {
for _, master := range mc.masters.GetInstances() {
if master == myMasterAddress {
continue
}
if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
defer cancel()
resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return err
}
leader = resp.Leader
return nil
}); grpcErr != nil {
glog.V(0).Infof("connect to %s: %v", master, grpcErr)
}
if leader != "" {
glog.V(0).Infof("existing leader is %s", leader)
return
}
}
glog.V(0).Infof("No existing leader found!")
return
}