Fix master leader election when grpc ports change (#8272)
* Fix master leader detection when grpc ports change * Canonicalize self peer entry to avoid raft self-alias panic * Normalize and deduplicate master peer addresses
This commit is contained in:
@@ -4,9 +4,11 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -338,8 +340,16 @@ func checkPeers(masterIp string, masterPort int, masterGrpcPort int, peers strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
peers = strings.TrimSpace(peers)
|
peers = strings.TrimSpace(peers)
|
||||||
|
seenPeers := make(map[string]struct{})
|
||||||
cleanedPeers = pb.ServerAddresses(peers).ToAddresses()
|
for _, peer := range pb.ServerAddresses(peers).ToAddresses() {
|
||||||
|
normalizedPeer := normalizeMasterPeerAddress(peer, masterAddress)
|
||||||
|
key := string(normalizedPeer)
|
||||||
|
if _, found := seenPeers[key]; found {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
seenPeers[key] = struct{}{}
|
||||||
|
cleanedPeers = append(cleanedPeers, normalizedPeer)
|
||||||
|
}
|
||||||
|
|
||||||
hasSelf := false
|
hasSelf := false
|
||||||
for _, peer := range cleanedPeers {
|
for _, peer := range cleanedPeers {
|
||||||
@@ -358,14 +368,31 @@ func checkPeers(masterIp string, masterPort int, masterGrpcPort int, peers strin
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func normalizeMasterPeerAddress(peer pb.ServerAddress, self pb.ServerAddress) pb.ServerAddress {
|
||||||
|
if peer.ToHttpAddress() == self.ToHttpAddress() {
|
||||||
|
return self
|
||||||
|
}
|
||||||
|
|
||||||
|
_, grpcPort, err := net.SplitHostPort(peer.ToGrpcAddress())
|
||||||
|
if err != nil {
|
||||||
|
return peer
|
||||||
|
}
|
||||||
|
grpcPortValue, err := strconv.Atoi(grpcPort)
|
||||||
|
if err != nil {
|
||||||
|
return peer
|
||||||
|
}
|
||||||
|
|
||||||
|
return pb.NewServerAddressWithGrpcPort(peer.ToHttpAddress(), grpcPortValue)
|
||||||
|
}
|
||||||
|
|
||||||
func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool {
|
func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool {
|
||||||
slices.SortFunc(peers, func(a, b pb.ServerAddress) int {
|
slices.SortFunc(peers, func(a, b pb.ServerAddress) int {
|
||||||
return strings.Compare(string(a), string(b))
|
return strings.Compare(a.ToHttpAddress(), b.ToHttpAddress())
|
||||||
})
|
})
|
||||||
if len(peers) <= 0 {
|
if len(peers) <= 0 {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return self == peers[0]
|
return self.ToHttpAddress() == peers[0].ToHttpAddress()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption {
|
func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption {
|
||||||
|
|||||||
66
weed/command/master_test.go
Normal file
66
weed/command/master_test.go
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
package command
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestIsTheFirstOneIgnoresGrpcPort(t *testing.T) {
|
||||||
|
self := pb.ServerAddress("127.0.0.1:9000.19000")
|
||||||
|
peers := []pb.ServerAddress{
|
||||||
|
"127.0.0.1:9000",
|
||||||
|
"127.0.0.1:9002.19002",
|
||||||
|
"127.0.0.1:9003.19003",
|
||||||
|
}
|
||||||
|
|
||||||
|
if !isTheFirstOne(self, peers) {
|
||||||
|
t.Fatalf("expected first peer match by HTTP address between %q and %+v", self, peers)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCheckPeersAddsSelfWhenGrpcPortMismatches(t *testing.T) {
|
||||||
|
self, peers := checkPeers("127.0.0.1", 9000, 19000, "127.0.0.1:9002,127.0.0.1:9003")
|
||||||
|
|
||||||
|
found := false
|
||||||
|
for _, peer := range peers {
|
||||||
|
if peer.ToHttpAddress() == self.ToHttpAddress() {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
t.Fatalf("expected peers %+v to contain self %s by HTTP address", peers, self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCheckPeersCanonicalizesSelfEntry(t *testing.T) {
|
||||||
|
self, peers := checkPeers("127.0.0.1", 9000, 19000, "127.0.0.1:9000,127.0.0.1:9002,127.0.0.1:9003")
|
||||||
|
|
||||||
|
for _, peer := range peers {
|
||||||
|
if peer.ToHttpAddress() == self.ToHttpAddress() && peer != self {
|
||||||
|
t.Fatalf("expected self peer to be canonicalized to %q, got %q", self, peer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCheckPeersDeduplicatesAliasPeers(t *testing.T) {
|
||||||
|
_, peers := checkPeers("127.0.0.1", 9000, 19000, "127.0.0.1:9002,127.0.0.1:9002.19002,127.0.0.1:9003")
|
||||||
|
|
||||||
|
if len(peers) != 3 {
|
||||||
|
t.Fatalf("expected 3 unique peers after normalization, got %d: %+v", len(peers), peers)
|
||||||
|
}
|
||||||
|
|
||||||
|
count9002 := 0
|
||||||
|
for _, peer := range peers {
|
||||||
|
if peer.ToHttpAddress() == "127.0.0.1:9002" {
|
||||||
|
count9002++
|
||||||
|
if string(peer) != "127.0.0.1:9002.19002" {
|
||||||
|
t.Fatalf("expected canonical peer 127.0.0.1:9002.19002, got %q", peer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if count9002 != 1 {
|
||||||
|
t.Fatalf("expected one peer for 127.0.0.1:9002, got %d in %+v", count9002, peers)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user