master: check peers for existing leader before starting a leader election

fix https://github.com/chrislusf/seaweedfs/issues/1509
This commit is contained in:
Chris Lu
2020-10-07 01:25:39 -07:00
parent c543762e23
commit da4edf3651
8 changed files with 216 additions and 192 deletions

View File

@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"net"
"strings"
"time"
@@ -302,3 +303,19 @@ func (ms *MasterServer) ListMasterClients(ctx context.Context, req *master_pb.Li
}
return resp, nil
}
func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) {
// tell the volume servers about the leader
leader, _ := ms.Topo.Leader()
resp := &master_pb.GetMasterConfigurationResponse{
MetricsAddress: ms.option.MetricsAddress,
MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
StorageBackends: backend.ToPbStorageBackends(),
DefaultReplication: ms.option.DefaultReplicaPlacement,
Leader: leader,
}
return resp, nil
}

View File

@@ -3,8 +3,6 @@ package weed_server
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -180,14 +178,3 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku
return resp, nil
}
func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) {
resp := &master_pb.GetMasterConfigurationResponse{
MetricsAddress: ms.option.MetricsAddress,
MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
StorageBackends: backend.ToPbStorageBackends(),
DefaultReplication: ms.option.DefaultReplicaPlacement,
}
return resp, nil
}

View File

@@ -138,14 +138,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
ms.Topo.RaftServer = raftServer.raftServer
ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
glog.V(0).Infof("event: %+v", e)
glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
if ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
}
})
ms.Topo.RaftServer.AddEventListener(raft.StateChangeEventType, func(e raft.Event) {
glog.V(0).Infof("state change: %+v", e)
})
if ms.Topo.IsLeader() {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
} else {

View File

@@ -2,10 +2,8 @@ package weed_server
import (
"encoding/json"
"io/ioutil"
"os"
"path"
"reflect"
"sort"
"time"
@@ -80,11 +78,6 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
return nil, err
}
// Clear old cluster configurations if peers are changed
if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed {
glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers)
}
stateMachine := StateMachine{topo: topo}
s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "")
if err != nil {
@@ -107,19 +100,20 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
}
// Remove deleted peers
if raftResumeState {
for existsPeerName, _ := range s.raftServer.Peers() {
exists := false
for _, peer := range s.peers {
if peer == existsPeerName {
exists = true
}
for existsPeerName := range s.raftServer.Peers() {
exists, existingPeer := false, ""
for _, peer := range s.peers {
if pb.ServerToGrpcAddress(peer) == existsPeerName {
exists, existingPeer = true, peer
break
}
if !exists {
if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
glog.V(0).Infoln(err)
return nil, err
}
}
if exists {
if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
glog.V(0).Infoln(err)
return nil, err
} else {
glog.V(0).Infof("removing old peer %s", existingPeer)
}
}
}
@@ -128,17 +122,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) {
// Initialize the server by joining itself.
glog.V(0).Infoln("Initializing new cluster")
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
})
if err != nil {
glog.V(0).Infoln(err)
return nil, err
}
// s.DoJoinCommand()
}
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
@@ -156,34 +140,6 @@ func (s *RaftServer) Peers() (members []string) {
return
}
func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, changed bool) {
confPath := path.Join(dir, "conf")
// open conf file
b, err := ioutil.ReadFile(confPath)
if err != nil {
return oldPeers, true
}
conf := &raft.Config{}
if err = json.Unmarshal(b, conf); err != nil {
return oldPeers, true
}
for _, p := range conf.Peers {
oldPeers = append(oldPeers, p.Name)
}
oldPeers = append(oldPeers, self)
if len(peers) == 0 && len(oldPeers) <= 1 {
return oldPeers, false
}
sort.Strings(peers)
sort.Strings(oldPeers)
return oldPeers, !reflect.DeepEqual(peers, oldPeers)
}
func isTheFirstOne(self string, peers []string) bool {
sort.Strings(peers)
if len(peers) <= 0 {
@@ -191,3 +147,16 @@ func isTheFirstOne(self string, peers []string) bool {
}
return self == peers[0]
}
func (s *RaftServer) DoJoinCommand() {
glog.V(0).Infoln("Initializing new cluster")
if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
}); err != nil {
glog.Errorf("fail to send join command: %v", err)
}
}