Files
seaweedFS/weed/server/master_server.go
Chris Lu 15f4a97029 fix: improve raft leader election reliability and failover speed (#8692)
* fix: clear raft vote state file on non-resume startup

The seaweedfs/raft library v1.1.7 added a persistent `state` file for
currentTerm and votedFor. When RaftResumeState=false (the default), the
log, conf, and snapshot directories are cleared but this state file was
not. On repeated restarts, different masters accumulate divergent terms,
causing AppendEntries rejections and preventing leader election.

Fixes #8690

* fix: recover TopologyId from snapshot before clearing raft state

When RaftResumeState=false clears log/conf/snapshot, the TopologyId
(used for license validation) was lost. Now extract it from the latest
snapshot before cleanup and restore it on the topology.

Both seaweedfs/raft and hashicorp/raft paths are handled, with a shared
recoverTopologyIdFromState helper in raft_common.go.

* fix: stagger multi-master bootstrap delay by peer index

Previously all masters used a fixed 1500ms delay before the bootstrap
check. Now the delay is proportional to the peer's sorted index with
randomization (matching the hashicorp raft path), giving the designated
bootstrap node (peer 0) a head start while later peers wait for gRPC
servers to be ready.

Also adds diagnostic logging showing why DoJoinCommand was or wasn't
called, making leader election issues easier to diagnose from logs.

* fix: skip unreachable masters during leader reconnection

When a master leader goes down, non-leader masters still redirect
clients to the stale leader address. The masterClient would follow
these redirects, fail, and retry — wasting round-trips each cycle.

Now tryAllMasters tracks which masters failed within a cycle and skips
redirects pointing to them, reducing log spam and connection overhead
during leader failover.

* fix: take snapshot after TopologyId generation for recovery

After generating a new TopologyId on the leader, immediately take a raft
snapshot so the ID can be recovered from the snapshot on future restarts
with RaftResumeState=false. Without this, short-lived clusters would
lose the TopologyId on restart since no automatic snapshot had been
taken yet.

* test: add multi-master raft failover integration tests

Integration test framework and 5 test scenarios for 3-node master
clusters:

- TestLeaderConsistencyAcrossNodes: all nodes agree on leader and
  TopologyId
- TestLeaderDownAndRecoverQuickly: leader stops, new leader elected,
  old leader rejoins as follower
- TestLeaderDownSlowRecover: leader gone for extended period, cluster
  continues with 2/3 quorum
- TestTwoMastersDownAndRestart: quorum lost (2/3 down), recovered
  when both restart
- TestAllMastersDownAndRestart: full cluster restart, leader elected,
  all nodes agree on TopologyId

* fix: address PR review comments

- peerIndex: return -1 (not 0) when self not found, add warning log
- recoverTopologyIdFromSnapshot: defer dir.Close()
- tests: check GetTopologyId errors instead of discarding them

* fix: address review comments on failover tests

- Assert no leader after quorum loss (was only logging)
- Verify follower cs.Leader matches expected leader via
  ServerAddress.ToHttpAddress() comparison
- Check GetTopologyId error in TestTwoMastersDownAndRestart
2026-03-18 23:28:07 -07:00

539 lines
18 KiB
Go

package weed_server
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"os"
"regexp"
"runtime"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster/maintenance"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/telemetry"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/gorilla/mux"
hashicorpRaft "github.com/hashicorp/raft"
"github.com/seaweedfs/raft"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/sequence"
"github.com/seaweedfs/seaweedfs/weed/shell"
"github.com/seaweedfs/seaweedfs/weed/topology"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
const (
SequencerType = "master.sequencer.type"
SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
raftApplyTimeout = 1 * time.Second
)
type MasterOption struct {
Master pb.ServerAddress
MetaFolder string
VolumeSizeLimitMB uint32
VolumePreallocate bool
MaxParallelVacuumPerServer int
// PulseSeconds int
DefaultReplicaPlacement string
GarbageThreshold float64
WhiteList []string
DisableHttp bool
MetricsAddress string
MetricsIntervalSec int
IsFollower bool
TelemetryUrl string
TelemetryEnabled bool
VolumeGrowthDisabled bool
}
type MasterServer struct {
master_pb.UnimplementedSeaweedServer
option *MasterOption
guard *security.Guard
preallocateSize int64
Topo *topology.Topology
vg *topology.VolumeGrowth
volumeGrowthRequestChan chan *topology.VolumeGrowRequest
// notifying clients
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.KeepConnectedResponse
grpcDialOption grpc.DialOption
topologyIdGenLock sync.Mutex
MasterClient *wdclient.MasterClient
adminLocks *AdminLocks
Cluster *cluster.Cluster
// telemetry
telemetryCollector *telemetry.Collector
}
func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.ServerAddress) *MasterServer {
v := util.GetViper()
signingKey := v.GetString("jwt.signing.key")
v.SetDefault("jwt.signing.expires_after_seconds", 10)
expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
readSigningKey := v.GetString("jwt.signing.read.key")
v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
v.SetDefault("master.replication.treat_replication_as_minimums", false)
replicationAsMin := v.GetBool("master.replication.treat_replication_as_minimums")
v.SetDefault("master.volume_growth.copy_1", topology.VolumeGrowStrategy.Copy1Count)
v.SetDefault("master.volume_growth.copy_2", topology.VolumeGrowStrategy.Copy2Count)
v.SetDefault("master.volume_growth.copy_3", topology.VolumeGrowStrategy.Copy3Count)
v.SetDefault("master.volume_growth.copy_other", topology.VolumeGrowStrategy.CopyOtherCount)
v.SetDefault("master.volume_growth.threshold", topology.VolumeGrowStrategy.Threshold)
v.SetDefault("master.volume_growth.disable", false)
option.VolumeGrowthDisabled = v.GetBool("master.volume_growth.disable")
topology.VolumeGrowStrategy.Copy1Count = v.GetUint32("master.volume_growth.copy_1")
topology.VolumeGrowStrategy.Copy2Count = v.GetUint32("master.volume_growth.copy_2")
topology.VolumeGrowStrategy.Copy3Count = v.GetUint32("master.volume_growth.copy_3")
topology.VolumeGrowStrategy.CopyOtherCount = v.GetUint32("master.volume_growth.copy_other")
topology.VolumeGrowStrategy.Threshold = v.GetFloat64("master.volume_growth.threshold")
whiteList := util.StringSplit(v.GetString("guard.white_list"), ",")
var preallocateSize int64
if option.VolumePreallocate {
preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
}
grpcDialOption := security.LoadClientTLS(v, "grpc.master")
ms := &MasterServer{
option: option,
preallocateSize: preallocateSize,
volumeGrowthRequestChan: make(chan *topology.VolumeGrowRequest, 1<<6),
clientChans: make(map[string]chan *master_pb.KeepConnectedResponse),
grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", "", *pb.NewServiceDiscoveryFromMap(peers)),
adminLocks: NewAdminLocks(),
Cluster: cluster.NewCluster(),
}
ms.MasterClient.SetOnPeerUpdateFn(ms.OnPeerUpdate)
seq := ms.createSequencer(option)
if nil == seq {
glog.Fatalf("create sequencer failed.")
}
ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, 5, replicationAsMin)
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
// Initialize telemetry after topology is created
if option.TelemetryEnabled && option.TelemetryUrl != "" {
telemetryClient := telemetry.NewClient(option.TelemetryUrl, option.TelemetryEnabled)
ms.telemetryCollector = telemetry.NewCollector(telemetryClient, ms.Topo, ms.Cluster)
ms.telemetryCollector.SetMasterServer(ms)
// Set version and OS information
ms.telemetryCollector.SetVersion(version.VERSION_NUMBER)
ms.telemetryCollector.SetOS(runtime.GOOS + "/" + runtime.GOARCH)
// Start periodic telemetry collection (every 24 hours)
ms.telemetryCollector.StartPeriodicCollection(24 * time.Hour)
}
ms.guard = security.NewGuard(append(ms.option.WhiteList, whiteList...), signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources2(r)
r.HandleFunc("/", ms.proxyToLeader(requestIDMiddleware(ms.uiStatusHandler)))
r.HandleFunc("/ui/index.html", requestIDMiddleware(ms.uiStatusHandler))
if !ms.option.DisableHttp {
r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.dirAssignHandler))))
r.HandleFunc("/dir/lookup", ms.guard.WhiteList(requestIDMiddleware(ms.dirLookupHandler)))
r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.dirStatusHandler))))
r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.collectionDeleteHandler))))
r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.volumeGrowHandler))))
r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.volumeStatusHandler))))
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.volumeVacuumHandler))))
r.HandleFunc("/submit", ms.guard.WhiteList(requestIDMiddleware(ms.submitFromMasterServerHandler)))
r.HandleFunc("/collection/info", ms.guard.WhiteList(requestIDMiddleware(ms.collectionInfoHandler)))
/*
r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
*/
r.HandleFunc("/{fileId}", requestIDMiddleware(ms.redirectHandler))
}
ms.Topo.SetAdminServerConnectedFunc(ms.isAdminServerConnectedFunc)
ms.Topo.StartRefreshWritableVolumes(
ms.grpcDialOption,
ms.option.GarbageThreshold,
ms.option.MaxParallelVacuumPerServer,
topology.VolumeGrowStrategy.Threshold,
ms.preallocateSize,
)
ms.ProcessGrowRequest()
if !option.IsFollower {
ms.startAdminScripts()
}
return ms
}
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
var raftServerName string
ms.Topo.RaftServerAccessLock.Lock()
if raftServer.raftServer != nil {
ms.Topo.RaftServer = raftServer.raftServer
ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc()
if ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infof("[%s] %s becomes leader.", ms.Topo.RaftServer.Name(), ms.Topo.RaftServer.Leader())
ms.Topo.SetLastLeaderChangeTime(time.Now())
if ms.Topo.RaftServer.Leader() == ms.Topo.RaftServer.Name() {
go ms.ensureTopologyId()
}
}
})
raftServerName = fmt.Sprintf("[%s]", ms.Topo.RaftServer.Name())
} else if raftServer.RaftHashicorp != nil {
ms.Topo.HashicorpRaft = raftServer.RaftHashicorp
raftServerName = ms.Topo.HashicorpRaft.String()
}
ms.Topo.RaftServerAccessLock.Unlock()
if ms.Topo.IsLeader() {
// Seed the warmup timestamp so IsWarmingUp() is active even if the
// leader change event hasn't fired yet (e.g. node is already leader
// on startup). Followers don't need warmup state.
ms.Topo.SetLastLeaderChangeTime(time.Now())
glog.V(0).Infof("%s I am the leader!", raftServerName)
go ms.ensureTopologyId()
} else {
var raftServerLeader string
ms.Topo.RaftServerAccessLock.RLock()
if ms.Topo.RaftServer != nil {
raftServerLeader = ms.Topo.RaftServer.Leader()
} else if ms.Topo.HashicorpRaft != nil {
raftServerName = ms.Topo.HashicorpRaft.String()
raftServerLeaderAddr, _ := ms.Topo.HashicorpRaft.LeaderWithID()
raftServerLeader = string(raftServerLeaderAddr)
}
ms.Topo.RaftServerAccessLock.RUnlock()
glog.V(0).Infof("%s %s - is the leader.", raftServerName, raftServerLeader)
}
}
func (ms *MasterServer) syncRaftForTopologyId(topologyId string) error {
ms.Topo.RaftServerAccessLock.RLock()
defer ms.Topo.RaftServerAccessLock.RUnlock()
if ms.Topo.RaftServer != nil {
_, err := ms.Topo.RaftServer.Do(topology.NewMaxVolumeIdCommand(ms.Topo.GetMaxVolumeId(), topologyId))
return err
} else if ms.Topo.HashicorpRaft != nil {
b, err := json.Marshal(topology.NewMaxVolumeIdCommand(ms.Topo.GetMaxVolumeId(), topologyId))
if err != nil {
return fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %v", err)
}
if future := ms.Topo.HashicorpRaft.Apply(b, raftApplyTimeout); future.Error() != nil {
return future.Error()
}
return nil
}
return fmt.Errorf("no raft server configured")
}
func (ms *MasterServer) ensureTopologyId() {
ms.topologyIdGenLock.Lock()
defer ms.topologyIdGenLock.Unlock()
// Send a no-op command to ensure all previous logs are applied (barrier)
// This handles the case where log replay is still in progress
glog.V(1).Infof("ensureTopologyId: sending barrier command")
for {
if !ms.Topo.IsLeader() {
glog.V(1).Infof("lost leadership while sending barrier command for topologyId")
return
}
if err := ms.syncRaftForTopologyId(ms.Topo.GetTopologyId()); err != nil {
glog.Errorf("failed to sync raft for topologyId: %v, retrying in 1s", err)
time.Sleep(time.Second)
continue
}
break
}
glog.V(1).Infof("ensureTopologyId: barrier command completed")
if !ms.Topo.IsLeader() {
return
}
currentId := ms.Topo.GetTopologyId()
glog.V(1).Infof("ensureTopologyId: current TopologyId after barrier: %s", currentId)
prevId := ms.Topo.GetTopologyId()
EnsureTopologyId(ms.Topo, func() bool {
return ms.Topo.IsLeader()
}, func(topologyId string) error {
return ms.syncRaftForTopologyId(topologyId)
})
// If a new TopologyId was generated, take a snapshot so it survives
// raft state cleanup on future non-resume restarts.
if prevId == "" && ms.Topo.GetTopologyId() != "" {
ms.Topo.RaftServerAccessLock.RLock()
if ms.Topo.RaftServer != nil {
if err := ms.Topo.RaftServer.TakeSnapshot(); err != nil {
glog.Warningf("snapshot after TopologyId generation: %v", err)
} else {
glog.V(0).Infof("snapshot taken to persist TopologyId %s", ms.Topo.GetTopologyId())
}
}
// Hashicorp raft snapshots are handled automatically.
ms.Topo.RaftServerAccessLock.RUnlock()
}
}
func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
f(w, r)
return
}
// get the current raft leader
leaderAddr, _ := ms.Topo.MaybeLeader()
raftServerLeader := leaderAddr.ToHttpAddress()
if raftServerLeader == "" {
f(w, r)
return
}
// determine the scheme based on HTTPS client configuration
scheme := util_http.GetGlobalHttpClient().GetHttpScheme()
targetUrl, err := url.Parse(scheme + "://" + raftServerLeader)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError,
fmt.Errorf("Leader URL %s://%s Parse Error: %v", scheme, raftServerLeader, err))
return
}
// proxy to leader
glog.V(4).Infoln("proxying to leader", raftServerLeader, "using", scheme)
proxy := httputil.NewSingleHostReverseProxy(targetUrl)
proxy.Transport = util_http.GetGlobalHttpClient().GetClientTransport()
proxy.ServeHTTP(w, r)
}
}
func (ms *MasterServer) isAdminServerConnectedFunc() bool {
if ms == nil || ms.adminLocks == nil {
return false
}
_, _, isLocked := ms.adminLocks.isLocked(cluster.AdminServerPresenceLockName)
return isLocked
}
func (ms *MasterServer) startAdminScripts() {
v := util.GetViper()
adminScripts := v.GetString("master.maintenance.scripts")
if adminScripts == "" {
return
}
glog.V(0).Infof("adminScripts: %v", adminScripts)
sleepMinutes := v.GetFloat64("master.maintenance.sleep_minutes")
if sleepMinutes <= 0 {
sleepMinutes = float64(maintenance.DefaultMaintenanceSleepMinutes)
}
scriptLines := strings.Split(adminScripts, "\n")
if !strings.Contains(adminScripts, "lock") {
scriptLines = append(append([]string{}, "lock"), scriptLines...)
scriptLines = append(scriptLines, "unlock")
}
masterAddress := string(ms.option.Master)
var shellOptions shell.ShellOptions
shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master")
shellOptions.Masters = &masterAddress
shellOptions.Directory = "/"
emptyFilerGroup := ""
shellOptions.FilerGroup = &emptyFilerGroup
commandEnv := shell.NewCommandEnv(&shellOptions)
reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`)
go commandEnv.MasterClient.KeepConnectedToMaster(context.Background())
go func() {
for {
time.Sleep(time.Duration(sleepMinutes) * time.Minute)
if ms.Topo.IsLeader() && ms.MasterClient.GetMaster(context.Background()) != "" {
if ms.isAdminServerConnectedFunc() {
glog.V(1).Infof("Skipping master maintenance scripts because admin server is connected")
continue
}
shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroupName(*shellOptions.FilerGroup))
if shellOptions.FilerAddress == "" {
continue
}
for _, line := range scriptLines {
for _, c := range strings.Split(line, ";") {
processEachCmd(reg, c, commandEnv)
}
}
}
}
}()
}
func processEachCmd(reg *regexp.Regexp, line string, commandEnv *shell.CommandEnv) {
cmds := reg.FindAllString(line, -1)
if len(cmds) == 0 {
return
}
args := make([]string, len(cmds[1:]))
for i := range args {
args[i] = strings.Trim(string(cmds[1+i]), "\"'")
}
cmd := cmds[0]
for _, c := range shell.Commands {
if c.Name() == cmd {
if c.HasTag(shell.ResourceHeavy) {
glog.Warningf("%s is resource heavy and should not run on master", cmd)
continue
}
glog.V(0).Infof("executing: %s %v", cmd, args)
if err := c.Do(args, commandEnv, os.Stdout); err != nil {
glog.V(0).Infof("error: %v", err)
}
}
}
}
func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer {
var seq sequence.Sequencer
v := util.GetViper()
seqType := strings.ToLower(v.GetString(SequencerType))
glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType)
switch strings.ToLower(seqType) {
case "snowflake":
var err error
snowflakeId := v.GetInt(SequencerSnowflakeId)
seq, err = sequence.NewSnowflakeSequencer(string(option.Master), snowflakeId)
if err != nil {
glog.Error(err)
seq = nil
}
case "raft":
fallthrough
default:
seq = sequence.NewMemorySequencer()
}
return seq
}
func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
ms.Topo.RaftServerAccessLock.RLock()
defer ms.Topo.RaftServerAccessLock.RUnlock()
if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil {
return
}
glog.V(4).Infof("OnPeerUpdate: %+v", update)
peerAddress := pb.ServerAddress(update.Address)
peerName := raftServerID(peerAddress)
if ms.Topo.HashicorpRaft.State() != hashicorpRaft.Leader {
return
}
if update.IsAdd {
raftServerFound := false
for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
if string(server.ID) == peerName {
raftServerFound = true
}
}
if !raftServerFound {
glog.V(0).Infof("adding new raft server: %s", peerName)
ms.Topo.HashicorpRaft.AddVoter(
hashicorpRaft.ServerID(peerName),
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
}
} else {
pb.WithMasterClient(false, peerAddress, ms.grpcDialOption, true, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
defer cancel()
if _, err := client.Ping(ctx, &master_pb.PingRequest{Target: string(peerAddress), TargetType: cluster.MasterType}); err != nil {
glog.V(0).Infof("master %s didn't respond to pings. remove raft server", peerName)
if err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
Id: peerName,
Force: false,
})
return err
}); err != nil {
glog.Warningf("failed removing old raft server: %v", err)
return err
}
} else {
glog.V(0).Infof("master %s successfully responded to ping", peerName)
}
return nil
})
}
}
func (ms *MasterServer) Shutdown() {
if ms.Topo == nil || ms.Topo.HashicorpRaft == nil {
return
}
if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader {
ms.Topo.HashicorpRaft.LeadershipTransfer()
}
ms.Topo.HashicorpRaft.Shutdown()
}
func (ms *MasterServer) Reload() {
glog.V(0).Infoln("Reload master server...")
util.LoadConfiguration("security", false)
v := util.GetViper()
ms.guard.UpdateWhiteList(append(ms.option.WhiteList,
util.StringSplit(v.GetString("guard.white_list"), ",")...),
)
}