fix: improve raft leader election reliability and failover speed (#8692)

* fix: clear raft vote state file on non-resume startup

The seaweedfs/raft library v1.1.7 added a persistent `state` file for
currentTerm and votedFor. When RaftResumeState=false (the default), the
log, conf, and snapshot directories are cleared but this state file was
not. On repeated restarts, different masters accumulate divergent terms,
causing AppendEntries rejections and preventing leader election.

Fixes #8690

* fix: recover TopologyId from snapshot before clearing raft state

When RaftResumeState=false clears log/conf/snapshot, the TopologyId
(used for license validation) was lost. Now extract it from the latest
snapshot before cleanup and restore it on the topology.

Both seaweedfs/raft and hashicorp/raft paths are handled, with a shared
recoverTopologyIdFromState helper in raft_common.go.

* fix: stagger multi-master bootstrap delay by peer index

Previously all masters used a fixed 1500ms delay before the bootstrap
check. Now the delay is proportional to the peer's sorted index with
randomization (matching the hashicorp raft path), giving the designated
bootstrap node (peer 0) a head start while later peers wait for gRPC
servers to be ready.

Also adds diagnostic logging showing why DoJoinCommand was or wasn't
called, making leader election issues easier to diagnose from logs.

* fix: skip unreachable masters during leader reconnection

When a master leader goes down, non-leader masters still redirect
clients to the stale leader address. The masterClient would follow
these redirects, fail, and retry — wasting round-trips each cycle.

Now tryAllMasters tracks which masters failed within a cycle and skips
redirects pointing to them, reducing log spam and connection overhead
during leader failover.

* fix: take snapshot after TopologyId generation for recovery

After generating a new TopologyId on the leader, immediately take a raft
snapshot so the ID can be recovered from the snapshot on future restarts
with RaftResumeState=false. Without this, short-lived clusters would
lose the TopologyId on restart since no automatic snapshot had been
taken yet.

* test: add multi-master raft failover integration tests

Integration test framework and 5 test scenarios for 3-node master
clusters:

- TestLeaderConsistencyAcrossNodes: all nodes agree on leader and
  TopologyId
- TestLeaderDownAndRecoverQuickly: leader stops, new leader elected,
  old leader rejoins as follower
- TestLeaderDownSlowRecover: leader gone for extended period, cluster
  continues with 2/3 quorum
- TestTwoMastersDownAndRestart: quorum lost (2/3 down), recovered
  when both restart
- TestAllMastersDownAndRestart: full cluster restart, leader elected,
  all nodes agree on TopologyId

* fix: address PR review comments

- peerIndex: return -1 (not 0) when self not found, add warning log
- recoverTopologyIdFromSnapshot: defer dir.Close()
- tests: check GetTopologyId errors instead of discarding them

* fix: address review comments on failover tests

- Assert no leader after quorum loss (was only logging)
- Verify follower cs.Leader matches expected leader via
  ServerAddress.ToHttpAddress() comparison
- Check GetTopologyId error in TestTwoMastersDownAndRestart
This commit is contained in:
Chris Lu
2026-03-18 23:28:07 -07:00
committed by GitHub
parent c197206897
commit 15f4a97029
9 changed files with 908 additions and 11 deletions

View File

@@ -0,0 +1,437 @@
package multi_master
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
)
const (
waitTimeout = 30 * time.Second
waitTick = 200 * time.Millisecond
)
// masterNode represents a single master process in the cluster.
type masterNode struct {
port int
grpcPort int
dataDir string
cmd *exec.Cmd
logFile string
stopped bool
}
// MasterCluster manages a 3-node master raft cluster for integration tests.
type MasterCluster struct {
t testing.TB
weedBinary string
baseDir string
logsDir string
keepLogs bool
nodes [3]*masterNode
mu sync.Mutex
// peers string shared by all nodes, e.g. "127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"
peersStr string
}
// clusterStatus is the JSON returned by /cluster/status.
type clusterStatus struct {
IsLeader bool `json:"IsLeader"`
Leader string `json:"Leader"`
Peers []string `json:"Peers"`
}
// StartMasterCluster boots a 3-node master raft cluster and waits for a leader.
func StartMasterCluster(t testing.TB) *MasterCluster {
t.Helper()
weedBinary, err := findOrBuildWeedBinary()
if err != nil {
t.Fatalf("resolve weed binary: %v", err)
}
keepLogs := os.Getenv("MULTI_MASTER_IT_KEEP_LOGS") == "1"
baseDir, err := os.MkdirTemp("", "seaweedfs_multi_master_it_")
if err != nil {
t.Fatalf("create temp dir: %v", err)
}
logsDir := filepath.Join(baseDir, "logs")
os.MkdirAll(logsDir, 0o755)
// Allocate 3 port pairs (http, grpc) atomically to prevent reuse.
portPairs, err := allocateMultipleMasterPortPairs(3)
if err != nil {
t.Fatalf("allocate ports: %v", err)
}
var nodes [3]*masterNode
var peerParts []string
for i, pp := range portPairs {
dataDir := filepath.Join(baseDir, fmt.Sprintf("m%d", i))
os.MkdirAll(dataDir, 0o755)
nodes[i] = &masterNode{
port: pp[0],
grpcPort: pp[1],
dataDir: dataDir,
logFile: filepath.Join(logsDir, fmt.Sprintf("master%d.log", i)),
}
peerParts = append(peerParts, fmt.Sprintf("127.0.0.1:%d", pp[0]))
}
mc := &MasterCluster{
t: t,
weedBinary: weedBinary,
baseDir: baseDir,
logsDir: logsDir,
keepLogs: keepLogs,
nodes: nodes,
peersStr: strings.Join(peerParts, ","),
}
for i := range 3 {
mc.StartNode(i)
}
if err := mc.WaitForLeader(waitTimeout); err != nil {
mc.DumpLogs()
mc.StopAll()
t.Fatalf("cluster did not elect a leader: %v", err)
}
// Wait for TopologyId to be generated and propagated. This is async
// after leader election, and we need it committed before tests can
// reliably stop/restart nodes.
if err := mc.WaitForTopologyId(waitTimeout); err != nil {
mc.DumpLogs()
mc.StopAll()
t.Fatalf("TopologyId not generated: %v", err)
}
t.Cleanup(func() {
mc.StopAll()
})
return mc
}
// StartNode starts the master process at the given index (02).
func (mc *MasterCluster) StartNode(i int) {
mc.t.Helper()
mc.mu.Lock()
defer mc.mu.Unlock()
n := mc.nodes[i]
if n.cmd != nil && !n.stopped {
return // already running
}
logFile, err := os.OpenFile(n.logFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
mc.t.Fatalf("create log for node %d: %v", i, err)
}
args := []string{
"master",
"-ip=127.0.0.1",
"-port=" + strconv.Itoa(n.port),
"-port.grpc=" + strconv.Itoa(n.grpcPort),
"-mdir=" + n.dataDir,
"-peers=" + mc.peersStr,
"-electionTimeout=3s",
"-volumeSizeLimitMB=32",
"-defaultReplication=000",
}
n.cmd = exec.Command(mc.weedBinary, args...)
n.cmd.Dir = mc.baseDir
n.cmd.Stdout = logFile
n.cmd.Stderr = logFile
n.stopped = false
if err := n.cmd.Start(); err != nil {
mc.t.Fatalf("start node %d: %v", i, err)
}
}
// StopNode gracefully stops the master at the given index.
func (mc *MasterCluster) StopNode(i int) {
mc.mu.Lock()
defer mc.mu.Unlock()
mc.stopNodeLocked(i)
}
func (mc *MasterCluster) stopNodeLocked(i int) {
n := mc.nodes[i]
if n.cmd == nil || n.stopped {
return
}
n.stopped = true
_ = n.cmd.Process.Signal(os.Interrupt)
done := make(chan error, 1)
go func() { done <- n.cmd.Wait() }()
select {
case <-time.After(10 * time.Second):
_ = n.cmd.Process.Kill()
<-done
case <-done:
}
}
// StopAll stops all running master nodes.
func (mc *MasterCluster) StopAll() {
mc.mu.Lock()
defer mc.mu.Unlock()
for i := range 3 {
mc.stopNodeLocked(i)
}
if !mc.keepLogs && !mc.t.Failed() {
os.RemoveAll(mc.baseDir)
} else if mc.baseDir != "" {
mc.t.Logf("multi-master logs kept at %s", mc.baseDir)
}
}
// NodeURL returns the HTTP URL for node i.
func (mc *MasterCluster) NodeURL(i int) string {
return fmt.Sprintf("http://127.0.0.1:%d", mc.nodes[i].port)
}
// NodeAddress returns "127.0.0.1:port" for node i.
func (mc *MasterCluster) NodeAddress(i int) string {
return fmt.Sprintf("127.0.0.1:%d", mc.nodes[i].port)
}
// NodeGRPCAddress returns "127.0.0.1:grpcPort" for node i.
func (mc *MasterCluster) NodeGRPCAddress(i int) string {
return fmt.Sprintf("127.0.0.1:%d", mc.nodes[i].grpcPort)
}
// IsNodeRunning returns true if the node at index i has a live process.
func (mc *MasterCluster) IsNodeRunning(i int) bool {
mc.mu.Lock()
defer mc.mu.Unlock()
n := mc.nodes[i]
return n.cmd != nil && !n.stopped
}
// GetClusterStatus fetches /cluster/status from node i.
func (mc *MasterCluster) GetClusterStatus(i int) (*clusterStatus, error) {
client := &http.Client{Timeout: 2 * time.Second}
resp, err := client.Get(mc.NodeURL(i) + "/cluster/status")
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
var cs clusterStatus
if err := json.Unmarshal(body, &cs); err != nil {
return nil, fmt.Errorf("parse cluster/status: %w (body: %s)", err, string(body))
}
return &cs, nil
}
// GetTopologyId fetches the TopologyId from /dir/status on node i.
func (mc *MasterCluster) GetTopologyId(i int) (string, error) {
client := &http.Client{Timeout: 2 * time.Second}
resp, err := client.Get(mc.NodeURL(i) + "/dir/status")
if err != nil {
return "", err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
var raw map[string]any
if err := json.Unmarshal(body, &raw); err != nil {
return "", err
}
if id, ok := raw["TopologyId"].(string); ok {
return id, nil
}
return "", nil
}
// FindLeader returns the index of the leader node and its address.
// Returns -1 if no leader is found.
func (mc *MasterCluster) FindLeader() (int, string) {
for i := range 3 {
if !mc.IsNodeRunning(i) {
continue
}
cs, err := mc.GetClusterStatus(i)
if err != nil {
continue
}
if cs.IsLeader {
return i, mc.NodeAddress(i)
}
}
return -1, ""
}
// WaitForLeader polls until a leader is elected or timeout.
func (mc *MasterCluster) WaitForLeader(timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if idx, _ := mc.FindLeader(); idx >= 0 {
return nil
}
time.Sleep(waitTick)
}
return fmt.Errorf("no leader elected within %v", timeout)
}
// WaitForNewLeader waits for a leader that is different from the given address.
func (mc *MasterCluster) WaitForNewLeader(oldLeaderAddr string, timeout time.Duration) (int, string, error) {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
idx, addr := mc.FindLeader()
if idx >= 0 && addr != oldLeaderAddr {
return idx, addr, nil
}
time.Sleep(waitTick)
}
return -1, "", fmt.Errorf("no new leader (different from %s) within %v", oldLeaderAddr, timeout)
}
// WaitForTopologyId waits until the leader reports a non-empty TopologyId.
func (mc *MasterCluster) WaitForTopologyId(timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if idx, _ := mc.FindLeader(); idx >= 0 {
if id, err := mc.GetTopologyId(idx); err == nil && id != "" {
return nil
}
}
time.Sleep(waitTick)
}
return fmt.Errorf("TopologyId not available within %v", timeout)
}
// WaitForNodeReady waits for node i to respond to HTTP.
func (mc *MasterCluster) WaitForNodeReady(i int, timeout time.Duration) error {
client := &http.Client{Timeout: 1 * time.Second}
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
resp, err := client.Get(mc.NodeURL(i) + "/cluster/status")
if err == nil {
resp.Body.Close()
return nil
}
time.Sleep(waitTick)
}
return fmt.Errorf("node %d not ready within %v", i, timeout)
}
// DumpLogs prints the tail of all master logs.
func (mc *MasterCluster) DumpLogs() {
for i := range 3 {
mc.t.Logf("=== master%d log tail ===\n%s", i, mc.tailLog(i))
}
}
func (mc *MasterCluster) tailLog(i int) string {
f, err := os.Open(mc.nodes[i].logFile)
if err != nil {
return "(no log)"
}
defer f.Close()
scanner := bufio.NewScanner(f)
lines := make([]string, 0, 50)
for scanner.Scan() {
lines = append(lines, scanner.Text())
if len(lines) > 50 {
lines = lines[1:]
}
}
return strings.Join(lines, "\n")
}
// --- port and binary helpers (adapted from test/volume_server/framework) ---
// allocateMultipleMasterPortPairs finds n non-overlapping (http, grpc) port
// pairs, holding all listeners until all are found, then releasing them
// together to avoid races between consecutive allocations.
func allocateMultipleMasterPortPairs(n int) ([][2]int, error) {
var listeners []net.Listener
var pairs [][2]int
defer func() {
for _, l := range listeners {
l.Close()
}
}()
for masterPort := 10000; masterPort <= 55535 && len(pairs) < n; masterPort++ {
grpcPort := masterPort + 10000
l1, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(masterPort)))
if err != nil {
continue
}
l2, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(grpcPort)))
if err != nil {
l1.Close()
continue
}
listeners = append(listeners, l1, l2)
pairs = append(pairs, [2]int{masterPort, grpcPort})
}
if len(pairs) < n {
return nil, fmt.Errorf("could only allocate %d of %d master port pairs", len(pairs), n)
}
return pairs, nil
}
func findOrBuildWeedBinary() (string, error) {
if fromEnv := os.Getenv("WEED_BINARY"); fromEnv != "" {
if isExecutableFile(fromEnv) {
return fromEnv, nil
}
return "", fmt.Errorf("WEED_BINARY not executable: %s", fromEnv)
}
repoRoot := ""
if _, file, _, ok := runtime.Caller(0); ok {
repoRoot = filepath.Clean(filepath.Join(filepath.Dir(file), "..", ".."))
}
if repoRoot == "" {
return "", fmt.Errorf("unable to detect repository root")
}
// Check if already built
binDir := filepath.Join(os.TempDir(), "seaweedfs_multi_master_it_bin")
os.MkdirAll(binDir, 0o755)
binPath := filepath.Join(binDir, "weed")
if isExecutableFile(binPath) {
return binPath, nil
}
cmd := exec.Command("go", "build", "-o", binPath, ".")
cmd.Dir = filepath.Join(repoRoot, "weed")
var out bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &out
if err := cmd.Run(); err != nil {
return "", fmt.Errorf("build weed binary: %w\n%s", err, out.String())
}
return binPath, nil
}
func isExecutableFile(path string) bool {
info, err := os.Stat(path)
if err != nil || info.IsDir() {
return false
}
return info.Mode().Perm()&0o111 != 0
}