Add raft server access mutex to avoid races (#3503)
This commit is contained in:
@@ -3,7 +3,9 @@ package weed_server
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/raft"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
)
|
||||
@@ -11,11 +13,14 @@ import (
|
||||
func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_pb.RaftListClusterServersRequest) (*master_pb.RaftListClusterServersResponse, error) {
|
||||
resp := &master_pb.RaftListClusterServersResponse{}
|
||||
|
||||
ms.Topo.RaftServerAccessLock.RLock()
|
||||
if ms.Topo.HashicorpRaft == nil {
|
||||
ms.Topo.RaftServerAccessLock.RUnlock()
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
servers := ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers
|
||||
ms.Topo.RaftServerAccessLock.RUnlock()
|
||||
|
||||
for _, server := range servers {
|
||||
resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{
|
||||
@@ -30,6 +35,9 @@ func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_
|
||||
func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAddServerRequest) (*master_pb.RaftAddServerResponse, error) {
|
||||
resp := &master_pb.RaftAddServerResponse{}
|
||||
|
||||
ms.Topo.RaftServerAccessLock.RLock()
|
||||
defer ms.Topo.RaftServerAccessLock.RUnlock()
|
||||
|
||||
if ms.Topo.HashicorpRaft == nil {
|
||||
return resp, nil
|
||||
}
|
||||
@@ -54,6 +62,9 @@ func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAd
|
||||
func (ms *MasterServer) RaftRemoveServer(ctx context.Context, req *master_pb.RaftRemoveServerRequest) (*master_pb.RaftRemoveServerResponse, error) {
|
||||
resp := &master_pb.RaftRemoveServerResponse{}
|
||||
|
||||
ms.Topo.RaftServerAccessLock.RLock()
|
||||
defer ms.Topo.RaftServerAccessLock.RUnlock()
|
||||
|
||||
if ms.Topo.HashicorpRaft == nil {
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@@ -166,6 +166,8 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
|
||||
|
||||
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
|
||||
var raftServerName string
|
||||
|
||||
ms.Topo.RaftServerAccessLock.Lock()
|
||||
if raftServer.raftServer != nil {
|
||||
ms.Topo.RaftServer = raftServer.raftServer
|
||||
ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
|
||||
@@ -193,14 +195,18 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
|
||||
}()
|
||||
raftServerName = ms.Topo.HashicorpRaft.String()
|
||||
}
|
||||
ms.Topo.RaftServerAccessLock.Unlock()
|
||||
|
||||
if ms.Topo.IsLeader() {
|
||||
glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!")
|
||||
} else {
|
||||
ms.Topo.RaftServerAccessLock.RLock()
|
||||
if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
|
||||
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
|
||||
} else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
|
||||
glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.")
|
||||
}
|
||||
ms.Topo.RaftServerAccessLock.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -210,16 +216,15 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
|
||||
f(w, r)
|
||||
return
|
||||
}
|
||||
var raftServerLeader string
|
||||
if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
|
||||
raftServerLeader = ms.Topo.RaftServer.Leader()
|
||||
} else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
|
||||
raftServerLeader = string(ms.Topo.HashicorpRaft.Leader())
|
||||
}
|
||||
|
||||
// get the current raft leader
|
||||
leaderAddr, _ := ms.Topo.MaybeLeader()
|
||||
raftServerLeader := string(leaderAddr)
|
||||
if raftServerLeader == "" {
|
||||
f(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
ms.boundedLeaderChan <- 1
|
||||
defer func() { <-ms.boundedLeaderChan }()
|
||||
targetUrl, err := url.Parse("http://" + raftServerLeader)
|
||||
@@ -228,6 +233,8 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
|
||||
fmt.Errorf("Leader URL http://%s Parse Error: %v", raftServerLeader, err))
|
||||
return
|
||||
}
|
||||
|
||||
// proxy to leader
|
||||
glog.V(4).Infoln("proxying to leader", raftServerLeader)
|
||||
proxy := httputil.NewSingleHostReverseProxy(targetUrl)
|
||||
director := proxy.Director
|
||||
@@ -336,6 +343,9 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
|
||||
}
|
||||
|
||||
func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
|
||||
ms.Topo.RaftServerAccessLock.RLock()
|
||||
defer ms.Topo.RaftServerAccessLock.RUnlock()
|
||||
|
||||
if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -16,6 +16,10 @@ func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
|
||||
infos := make(map[string]interface{})
|
||||
infos["Up Time"] = time.Now().Sub(startTime).String()
|
||||
infos["Max Volume Id"] = ms.Topo.GetMaxVolumeId()
|
||||
|
||||
ms.Topo.RaftServerAccessLock.RLock()
|
||||
defer ms.Topo.RaftServerAccessLock.RUnlock()
|
||||
|
||||
if ms.Topo.RaftServer != nil {
|
||||
args := struct {
|
||||
Version string
|
||||
|
||||
Reference in New Issue
Block a user