Files
seaweedFS/weed/wdclient/masterclient.go
Chris Lu 15f4a97029 fix: improve raft leader election reliability and failover speed (#8692)
* fix: clear raft vote state file on non-resume startup

The seaweedfs/raft library v1.1.7 added a persistent `state` file for
currentTerm and votedFor. When RaftResumeState=false (the default), the
log, conf, and snapshot directories are cleared but this state file was
not. On repeated restarts, different masters accumulate divergent terms,
causing AppendEntries rejections and preventing leader election.

Fixes #8690

* fix: recover TopologyId from snapshot before clearing raft state

When RaftResumeState=false clears log/conf/snapshot, the TopologyId
(used for license validation) was lost. Now extract it from the latest
snapshot before cleanup and restore it on the topology.

Both seaweedfs/raft and hashicorp/raft paths are handled, with a shared
recoverTopologyIdFromState helper in raft_common.go.

* fix: stagger multi-master bootstrap delay by peer index

Previously all masters used a fixed 1500ms delay before the bootstrap
check. Now the delay is proportional to the peer's sorted index with
randomization (matching the hashicorp raft path), giving the designated
bootstrap node (peer 0) a head start while later peers wait for gRPC
servers to be ready.

Also adds diagnostic logging showing why DoJoinCommand was or wasn't
called, making leader election issues easier to diagnose from logs.

* fix: skip unreachable masters during leader reconnection

When a master leader goes down, non-leader masters still redirect
clients to the stale leader address. The masterClient would follow
these redirects, fail, and retry — wasting round-trips each cycle.

Now tryAllMasters tracks which masters failed within a cycle and skips
redirects pointing to them, reducing log spam and connection overhead
during leader failover.

* fix: take snapshot after TopologyId generation for recovery

After generating a new TopologyId on the leader, immediately take a raft
snapshot so the ID can be recovered from the snapshot on future restarts
with RaftResumeState=false. Without this, short-lived clusters would
lose the TopologyId on restart since no automatic snapshot had been
taken yet.

* test: add multi-master raft failover integration tests

Integration test framework and 5 test scenarios for 3-node master
clusters:

- TestLeaderConsistencyAcrossNodes: all nodes agree on leader and
  TopologyId
- TestLeaderDownAndRecoverQuickly: leader stops, new leader elected,
  old leader rejoins as follower
- TestLeaderDownSlowRecover: leader gone for extended period, cluster
  continues with 2/3 quorum
- TestTwoMastersDownAndRestart: quorum lost (2/3 down), recovered
  when both restart
- TestAllMastersDownAndRestart: full cluster restart, leader elected,
  all nodes agree on TopologyId

* fix: address PR review comments

- peerIndex: return -1 (not 0) when self not found, add warning log
- recoverTopologyIdFromSnapshot: defer dir.Close()
- tests: check GetTopologyId errors instead of discarding them

* fix: address review comments on failover tests

- Assert no leader after quorum loss (was only logging)
- Verify follower cs.Leader matches expected leader via
  ServerAddress.ToHttpAddress() comparison
- Check GetTopologyId error in TestTwoMastersDownAndRestart
2026-03-18 23:28:07 -07:00

503 lines
18 KiB
Go

package wdclient
import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/version"
)
// masterVolumeProvider implements VolumeLocationProvider by querying master
// This is rarely called since master pushes updates proactively via KeepConnected stream
type masterVolumeProvider struct {
masterClient *MasterClient
}
func isCanceledErr(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return true
}
if statusErr, ok := status.FromError(err); ok {
switch statusErr.Code() {
case codes.Canceled, codes.DeadlineExceeded:
return true
}
}
return false
}
// LookupVolumeIds queries the master for volume locations (fallback when cache misses).
// Returns partial results with aggregated errors for volumes that failed.
// Retries on codes.Unavailable (e.g. master warming up after restart) with backoff.
func (p *masterVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
var result map[string][]Location
var lookupErrors []error
glog.V(2).Infof("Looking up %d volumes from master: %v", len(volumeIds), volumeIds)
retryErr := util.RetryWithBackoff(ctx, "lookup", 30*time.Second,
func(err error) bool {
st, ok := status.FromError(err)
return ok && st.Code() == codes.Unavailable
},
func() error {
result = make(map[string][]Location)
lookupErrors = nil
// Per-attempt timeout bounds both master resolution and the RPC
// so a single attempt cannot consume the entire retry budget.
timeoutCtx, cancel := context.WithTimeout(ctx, p.masterClient.grpcTimeout)
defer cancel()
master := p.masterClient.GetMaster(timeoutCtx)
if master == "" {
if ctx.Err() != nil {
return ctx.Err()
}
return status.Errorf(codes.Unavailable, "no master available")
}
return pb.WithMasterClient(false, master, p.masterClient.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(timeoutCtx, &master_pb.LookupVolumeRequest{
VolumeOrFileIds: volumeIds,
})
if err != nil {
return err
}
for _, vidLoc := range resp.VolumeIdLocations {
// Preserve per-volume errors from master response
// These could indicate misconfiguration, volume deletion, etc.
if vidLoc.Error != "" {
lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: %s", vidLoc.VolumeOrFileId, vidLoc.Error))
glog.V(1).Infof("volume %s lookup error from master: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
continue
}
// Parse volume ID from response
parts := strings.Split(vidLoc.VolumeOrFileId, ",")
vidOnly := parts[0]
vid, err := strconv.ParseUint(vidOnly, 10, 32)
if err != nil {
lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: invalid volume ID format: %w", vidLoc.VolumeOrFileId, err))
glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err)
continue
}
var locations []Location
for _, masterLoc := range vidLoc.Locations {
loc := Location{
Url: masterLoc.Url,
PublicUrl: masterLoc.PublicUrl,
GrpcPort: int(masterLoc.GrpcPort),
DataCenter: masterLoc.DataCenter,
}
// Update cache with the location
p.masterClient.addLocation(uint32(vid), loc)
locations = append(locations, loc)
}
if len(locations) > 0 {
result[vidOnly] = locations
}
}
return nil
})
})
if retryErr != nil {
return nil, retryErr
}
// Return partial results with detailed errors
// Callers should check both result map and error
if len(lookupErrors) > 0 {
glog.V(2).Infof("MasterClient: looked up %d volumes, found %d, %d errors", len(volumeIds), len(result), len(lookupErrors))
return result, fmt.Errorf("master volume lookup errors: %w", errors.Join(lookupErrors...))
}
glog.V(3).Infof("MasterClient: looked up %d volumes, found %d", len(volumeIds), len(result))
return result, nil
}
// MasterClient connects to master servers and maintains volume location cache
// It receives real-time updates via KeepConnected streaming and uses vidMapClient for caching
type MasterClient struct {
*vidMapClient // Embedded cache with shared logic
FilerGroup string
clientType string
clientHost pb.ServerAddress
rack string
currentMaster pb.ServerAddress
currentMasterLock sync.RWMutex
masters pb.ServerDiscovery
grpcDialOption grpc.DialOption
grpcTimeout time.Duration // Timeout for gRPC calls to master
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
OnPeerUpdateLock sync.RWMutex
}
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
mc := &MasterClient{
FilerGroup: filerGroup,
clientType: clientType,
clientHost: clientHost,
rack: rack,
masters: masters,
grpcDialOption: grpcDialOption,
grpcTimeout: 5 * time.Second, // Default: 5 seconds for gRPC calls to master
}
// Create provider that references this MasterClient
provider := &masterVolumeProvider{masterClient: mc}
// Initialize embedded vidMapClient with the provider and default cache size
mc.vidMapClient = newVidMapClient(provider, clientDataCenter, DefaultVidMapCacheSize)
return mc
}
func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)) {
mc.OnPeerUpdateLock.Lock()
mc.OnPeerUpdate = onPeerUpdate
mc.OnPeerUpdateLock.Unlock()
}
func (mc *MasterClient) tryAllMasters(ctx context.Context) {
var nextHintedLeader pb.ServerAddress
failedMasters := make(map[pb.ServerAddress]struct{})
mc.masters.RefreshBySrvIfAvailable()
for _, master := range mc.masters.GetInstances() {
if _, failed := failedMasters[master]; failed {
continue
}
nextHintedLeader = mc.tryConnectToMaster(ctx, master)
for nextHintedLeader != "" {
if _, failed := failedMasters[nextHintedLeader]; failed {
break // don't follow redirect to a known-unreachable master
}
select {
case <-ctx.Done():
glog.V(0).Infof("Connection attempt to all masters stopped: %v", ctx.Err())
return
default:
target := nextHintedLeader
nextHintedLeader = mc.tryConnectToMaster(ctx, target)
if nextHintedLeader == "" {
// connection to target failed; remember it so we skip
// stale redirects pointing back to it this cycle
failedMasters[target] = struct{}{}
}
}
}
mc.setCurrentMaster("")
}
}
func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) {
glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master)
stats.MasterClientConnectCounter.WithLabelValues("total").Inc()
connectStartTime := time.Now()
gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := client.KeepConnected(ctx)
if err != nil {
glog.V(1).Infof("%s.%s masterClient failed to keep connected to %s: %v", mc.FilerGroup, mc.clientType, master, err)
stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToKeepConnected).Inc()
return err
}
glog.V(0).Infof("%s.%s masterClient gRPC stream established to %s in %v", mc.FilerGroup, mc.clientType, master, time.Since(connectStartTime))
if err = stream.Send(&master_pb.KeepConnectedRequest{
FilerGroup: mc.FilerGroup,
DataCenter: mc.GetDataCenter(),
Rack: mc.rack,
ClientType: mc.clientType,
ClientAddress: string(mc.clientHost),
Version: version.Version(),
}); err != nil {
glog.V(0).Infof("%s.%s masterClient failed to send to %s: %v", mc.FilerGroup, mc.clientType, master, err)
stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToSend).Inc()
return err
}
glog.V(1).Infof("%s.%s masterClient Connected to %v", mc.FilerGroup, mc.clientType, master)
resp, err := stream.Recv()
if err != nil {
canceled := isCanceledErr(err) || ctx.Err() != nil
if canceled {
glog.V(1).Infof("%s.%s masterClient stream closed from %s: %v", mc.FilerGroup, mc.clientType, master, err)
} else {
glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err)
stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc()
}
return err
}
// check if it is the leader to determine whether to reset the vidMap
if resp.VolumeLocation != nil {
if resp.VolumeLocation.Leader != "" && string(master) != resp.VolumeLocation.Leader {
glog.V(0).Infof("master %v redirected to leader %v", master, resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc()
return nil
}
mc.resetVidMap()
mc.updateVidMap(resp)
} else {
// First message from master is not VolumeLocation (e.g., ClusterNodeUpdate)
// Still need to reset cache to ensure we don't use stale data from previous master
mc.resetVidMap()
}
mc.setCurrentMaster(master)
for {
resp, err := stream.Recv()
if err != nil {
canceled := isCanceledErr(err) || ctx.Err() != nil
if canceled {
glog.V(1).Infof("%s.%s masterClient stream closed from %s: %v", mc.FilerGroup, mc.clientType, master, err)
} else {
glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err)
stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc()
}
return err
}
if resp.VolumeLocation != nil {
// Check for leader change during the stream
// If master announces a new leader, reconnect to it
if resp.VolumeLocation.Leader != "" && string(mc.GetMaster(ctx)) != resp.VolumeLocation.Leader {
glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(ctx), resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc()
return nil
}
mc.updateVidMap(resp)
}
if resp.ClusterNodeUpdate != nil {
update := resp.ClusterNodeUpdate
mc.OnPeerUpdateLock.RLock()
if mc.OnPeerUpdate != nil {
if update.FilerGroup == mc.FilerGroup {
if update.IsAdd {
glog.V(0).Infof("+ %s@%s noticed %s.%s %s\n", mc.clientType, mc.clientHost, update.FilerGroup, update.NodeType, update.Address)
} else {
glog.V(0).Infof("- %s@%s noticed %s.%s %s\n", mc.clientType, mc.clientHost, update.FilerGroup, update.NodeType, update.Address)
}
stats.MasterClientConnectCounter.WithLabelValues(stats.OnPeerUpdate).Inc()
mc.OnPeerUpdate(update, time.Now())
}
}
mc.OnPeerUpdateLock.RUnlock()
}
if err := ctx.Err(); err != nil {
if isCanceledErr(err) {
glog.V(1).Infof("Connection attempt to master stopped: %v", err)
} else {
glog.V(0).Infof("Connection attempt to master stopped: %v", err)
}
return err
}
}
})
if gprcErr != nil {
if isCanceledErr(gprcErr) || ctx.Err() != nil {
glog.V(1).Infof("%s.%s masterClient connection closed to %v: %v", mc.FilerGroup, mc.clientType, master, gprcErr)
return nextHintedLeader
}
stats.MasterClientConnectCounter.WithLabelValues(stats.Failed).Inc()
glog.V(1).Infof("%s.%s masterClient failed to connect with master %v: %v", mc.FilerGroup, mc.clientType, master, gprcErr)
}
return nextHintedLeader
}
func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
if resp.VolumeLocation.IsEmptyUrl() {
glog.V(0).Infof("updateVidMap ignore short heartbeat: %+v", resp)
return
}
// process new volume location
loc := Location{
Url: resp.VolumeLocation.Url,
PublicUrl: resp.VolumeLocation.PublicUrl,
DataCenter: resp.VolumeLocation.DataCenter,
GrpcPort: int(resp.VolumeLocation.GrpcPort),
}
for _, newVid := range resp.VolumeLocation.NewVids {
glog.V(2).Infof("%s.%s: %s masterClient adds volume %d", mc.FilerGroup, mc.clientType, loc.Url, newVid)
mc.addLocation(newVid, loc)
}
for _, deletedVid := range resp.VolumeLocation.DeletedVids {
glog.V(2).Infof("%s.%s: %s masterClient removes volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedVid)
mc.deleteLocation(deletedVid, loc)
}
for _, newEcVid := range resp.VolumeLocation.NewEcVids {
glog.V(2).Infof("%s.%s: %s masterClient adds ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, newEcVid)
mc.addEcLocation(newEcVid, loc)
}
for _, deletedEcVid := range resp.VolumeLocation.DeletedEcVids {
glog.V(2).Infof("%s.%s: %s masterClient removes ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedEcVid)
mc.deleteEcLocation(deletedEcVid, loc)
}
glog.V(1).Infof("updateVidMap(%s) %s.%s: %s volume add: %d, del: %d, add ec: %d del ec: %d",
resp.VolumeLocation.DataCenter, mc.FilerGroup, mc.clientType, loc.Url,
len(resp.VolumeLocation.NewVids), len(resp.VolumeLocation.DeletedVids),
len(resp.VolumeLocation.NewEcVids), len(resp.VolumeLocation.DeletedEcVids))
}
func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
getMasterF := func() pb.ServerAddress {
return mc.GetMaster(context.Background())
}
return mc.WithClientCustomGetMaster(getMasterF, streamingMode, fn)
}
func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAddress, streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
return util.Retry("master grpc", func() error {
return pb.WithMasterClient(streamingMode, getMasterF(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
return fn(client)
})
})
}
func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
mc.currentMasterLock.RLock()
defer mc.currentMasterLock.RUnlock()
return mc.currentMaster
}
func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
mc.currentMasterLock.Lock()
mc.currentMaster = master
mc.currentMasterLock.Unlock()
}
// GetMaster returns the current master address, blocking until connected.
//
// IMPORTANT: This method blocks until KeepConnectedToMaster successfully establishes
// a connection to a master server. If KeepConnectedToMaster hasn't been started in a
// background goroutine, this will block indefinitely (or until ctx is canceled).
//
// Typical initialization pattern:
//
// mc := wdclient.NewMasterClient(...)
// go mc.KeepConnectedToMaster(ctx) // Start connection management
// // ... later ...
// master := mc.GetMaster(ctx) // Will block until connected
//
// If called before KeepConnectedToMaster establishes a connection, this may cause
// unexpected timeouts in LookupVolumeIds and other operations that depend on it.
func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress {
mc.WaitUntilConnected(ctx)
return mc.getCurrentMaster()
}
// GetMasters returns all configured master addresses, blocking until connected.
// See GetMaster() for important initialization contract details.
func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress {
mc.WaitUntilConnected(ctx)
return mc.masters.GetInstances()
}
// WaitUntilConnected blocks until a master connection is established or ctx is canceled.
// This does NOT initiate connections - it only waits for KeepConnectedToMaster to succeed.
func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {
attempts := 0
for {
select {
case <-ctx.Done():
return
default:
currentMaster := mc.getCurrentMaster()
if currentMaster != "" {
return
}
attempts++
if attempts%100 == 0 { // Log every 100 attempts (roughly every 20 seconds)
glog.V(0).Infof("%s.%s WaitUntilConnected still waiting for master connection (attempt %d)...", mc.FilerGroup, mc.clientType, attempts)
}
// Use select with time.After to respect context cancellation during sleep
sleepDuration := time.Duration(rand.Int31n(200)) * time.Millisecond
select {
case <-ctx.Done():
return
case <-time.After(sleepDuration):
// continue to next iteration
}
}
}
}
func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) {
glog.V(0).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
reconnectCount := 0
for {
select {
case <-ctx.Done():
if isCanceledErr(ctx.Err()) {
glog.V(1).Infof("Connection to masters stopped: %v", ctx.Err())
} else {
glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err())
}
return
default:
reconnectStart := time.Now()
if reconnectCount > 0 {
glog.V(0).Infof("%s.%s masterClient reconnection attempt #%d", mc.FilerGroup, mc.clientType, reconnectCount)
}
mc.tryAllMasters(ctx)
reconnectCount++
glog.V(1).Infof("%s.%s masterClient connection cycle completed in %v, sleeping before retry",
mc.FilerGroup, mc.clientType, time.Since(reconnectStart))
time.Sleep(time.Second)
}
}
}
func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) {
for _, master := range mc.masters.GetInstances() {
if master == myMasterAddress {
continue
}
if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
defer cancel()
resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return err
}
leader = resp.Leader
return nil
}); grpcErr != nil {
glog.V(0).Infof("connect to %s: %v", master, grpcErr)
}
if leader != "" {
glog.V(0).Infof("existing leader is %s", leader)
return
}
}
glog.V(0).Infof("No existing leader found!")
return
}