* fix: clear raft vote state file on non-resume startup The seaweedfs/raft library v1.1.7 added a persistent `state` file for currentTerm and votedFor. When RaftResumeState=false (the default), the log, conf, and snapshot directories are cleared but this state file was not. On repeated restarts, different masters accumulate divergent terms, causing AppendEntries rejections and preventing leader election. Fixes #8690 * fix: recover TopologyId from snapshot before clearing raft state When RaftResumeState=false clears log/conf/snapshot, the TopologyId (used for license validation) was lost. Now extract it from the latest snapshot before cleanup and restore it on the topology. Both seaweedfs/raft and hashicorp/raft paths are handled, with a shared recoverTopologyIdFromState helper in raft_common.go. * fix: stagger multi-master bootstrap delay by peer index Previously all masters used a fixed 1500ms delay before the bootstrap check. Now the delay is proportional to the peer's sorted index with randomization (matching the hashicorp raft path), giving the designated bootstrap node (peer 0) a head start while later peers wait for gRPC servers to be ready. Also adds diagnostic logging showing why DoJoinCommand was or wasn't called, making leader election issues easier to diagnose from logs. * fix: skip unreachable masters during leader reconnection When a master leader goes down, non-leader masters still redirect clients to the stale leader address. The masterClient would follow these redirects, fail, and retry — wasting round-trips each cycle. Now tryAllMasters tracks which masters failed within a cycle and skips redirects pointing to them, reducing log spam and connection overhead during leader failover. * fix: take snapshot after TopologyId generation for recovery After generating a new TopologyId on the leader, immediately take a raft snapshot so the ID can be recovered from the snapshot on future restarts with RaftResumeState=false. Without this, short-lived clusters would lose the TopologyId on restart since no automatic snapshot had been taken yet. * test: add multi-master raft failover integration tests Integration test framework and 5 test scenarios for 3-node master clusters: - TestLeaderConsistencyAcrossNodes: all nodes agree on leader and TopologyId - TestLeaderDownAndRecoverQuickly: leader stops, new leader elected, old leader rejoins as follower - TestLeaderDownSlowRecover: leader gone for extended period, cluster continues with 2/3 quorum - TestTwoMastersDownAndRestart: quorum lost (2/3 down), recovered when both restart - TestAllMastersDownAndRestart: full cluster restart, leader elected, all nodes agree on TopologyId * fix: address PR review comments - peerIndex: return -1 (not 0) when self not found, add warning log - recoverTopologyIdFromSnapshot: defer dir.Close() - tests: check GetTopologyId errors instead of discarding them * fix: address review comments on failover tests - Assert no leader after quorum loss (was only logging) - Verify follower cs.Leader matches expected leader via ServerAddress.ToHttpAddress() comparison - Check GetTopologyId error in TestTwoMastersDownAndRestart
539 lines
18 KiB
Go
539 lines
18 KiB
Go
package weed_server
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"net/http/httputil"
|
|
"net/url"
|
|
"os"
|
|
"regexp"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/cluster/maintenance"
|
|
"github.com/seaweedfs/seaweedfs/weed/stats"
|
|
"github.com/seaweedfs/seaweedfs/weed/telemetry"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/cluster"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
|
|
"github.com/gorilla/mux"
|
|
hashicorpRaft "github.com/hashicorp/raft"
|
|
"github.com/seaweedfs/raft"
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
"github.com/seaweedfs/seaweedfs/weed/sequence"
|
|
"github.com/seaweedfs/seaweedfs/weed/shell"
|
|
"github.com/seaweedfs/seaweedfs/weed/topology"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/version"
|
|
"github.com/seaweedfs/seaweedfs/weed/wdclient"
|
|
)
|
|
|
|
const (
|
|
SequencerType = "master.sequencer.type"
|
|
SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
|
|
raftApplyTimeout = 1 * time.Second
|
|
)
|
|
|
|
type MasterOption struct {
|
|
Master pb.ServerAddress
|
|
MetaFolder string
|
|
VolumeSizeLimitMB uint32
|
|
VolumePreallocate bool
|
|
MaxParallelVacuumPerServer int
|
|
// PulseSeconds int
|
|
DefaultReplicaPlacement string
|
|
GarbageThreshold float64
|
|
WhiteList []string
|
|
DisableHttp bool
|
|
MetricsAddress string
|
|
MetricsIntervalSec int
|
|
IsFollower bool
|
|
TelemetryUrl string
|
|
TelemetryEnabled bool
|
|
VolumeGrowthDisabled bool
|
|
}
|
|
|
|
type MasterServer struct {
|
|
master_pb.UnimplementedSeaweedServer
|
|
option *MasterOption
|
|
guard *security.Guard
|
|
|
|
preallocateSize int64
|
|
|
|
Topo *topology.Topology
|
|
vg *topology.VolumeGrowth
|
|
volumeGrowthRequestChan chan *topology.VolumeGrowRequest
|
|
|
|
// notifying clients
|
|
clientChansLock sync.RWMutex
|
|
clientChans map[string]chan *master_pb.KeepConnectedResponse
|
|
|
|
grpcDialOption grpc.DialOption
|
|
|
|
topologyIdGenLock sync.Mutex
|
|
|
|
MasterClient *wdclient.MasterClient
|
|
|
|
adminLocks *AdminLocks
|
|
|
|
Cluster *cluster.Cluster
|
|
|
|
// telemetry
|
|
telemetryCollector *telemetry.Collector
|
|
}
|
|
|
|
func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.ServerAddress) *MasterServer {
|
|
|
|
v := util.GetViper()
|
|
signingKey := v.GetString("jwt.signing.key")
|
|
v.SetDefault("jwt.signing.expires_after_seconds", 10)
|
|
expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
|
|
|
|
readSigningKey := v.GetString("jwt.signing.read.key")
|
|
v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
|
|
readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
|
|
|
|
v.SetDefault("master.replication.treat_replication_as_minimums", false)
|
|
replicationAsMin := v.GetBool("master.replication.treat_replication_as_minimums")
|
|
|
|
v.SetDefault("master.volume_growth.copy_1", topology.VolumeGrowStrategy.Copy1Count)
|
|
v.SetDefault("master.volume_growth.copy_2", topology.VolumeGrowStrategy.Copy2Count)
|
|
v.SetDefault("master.volume_growth.copy_3", topology.VolumeGrowStrategy.Copy3Count)
|
|
v.SetDefault("master.volume_growth.copy_other", topology.VolumeGrowStrategy.CopyOtherCount)
|
|
v.SetDefault("master.volume_growth.threshold", topology.VolumeGrowStrategy.Threshold)
|
|
v.SetDefault("master.volume_growth.disable", false)
|
|
option.VolumeGrowthDisabled = v.GetBool("master.volume_growth.disable")
|
|
|
|
topology.VolumeGrowStrategy.Copy1Count = v.GetUint32("master.volume_growth.copy_1")
|
|
topology.VolumeGrowStrategy.Copy2Count = v.GetUint32("master.volume_growth.copy_2")
|
|
topology.VolumeGrowStrategy.Copy3Count = v.GetUint32("master.volume_growth.copy_3")
|
|
topology.VolumeGrowStrategy.CopyOtherCount = v.GetUint32("master.volume_growth.copy_other")
|
|
topology.VolumeGrowStrategy.Threshold = v.GetFloat64("master.volume_growth.threshold")
|
|
whiteList := util.StringSplit(v.GetString("guard.white_list"), ",")
|
|
|
|
var preallocateSize int64
|
|
if option.VolumePreallocate {
|
|
preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
|
|
}
|
|
|
|
grpcDialOption := security.LoadClientTLS(v, "grpc.master")
|
|
ms := &MasterServer{
|
|
option: option,
|
|
preallocateSize: preallocateSize,
|
|
volumeGrowthRequestChan: make(chan *topology.VolumeGrowRequest, 1<<6),
|
|
clientChans: make(map[string]chan *master_pb.KeepConnectedResponse),
|
|
grpcDialOption: grpcDialOption,
|
|
MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", "", *pb.NewServiceDiscoveryFromMap(peers)),
|
|
adminLocks: NewAdminLocks(),
|
|
Cluster: cluster.NewCluster(),
|
|
}
|
|
|
|
ms.MasterClient.SetOnPeerUpdateFn(ms.OnPeerUpdate)
|
|
|
|
seq := ms.createSequencer(option)
|
|
if nil == seq {
|
|
glog.Fatalf("create sequencer failed.")
|
|
}
|
|
ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, 5, replicationAsMin)
|
|
ms.vg = topology.NewDefaultVolumeGrowth()
|
|
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
|
|
|
|
// Initialize telemetry after topology is created
|
|
if option.TelemetryEnabled && option.TelemetryUrl != "" {
|
|
telemetryClient := telemetry.NewClient(option.TelemetryUrl, option.TelemetryEnabled)
|
|
ms.telemetryCollector = telemetry.NewCollector(telemetryClient, ms.Topo, ms.Cluster)
|
|
ms.telemetryCollector.SetMasterServer(ms)
|
|
|
|
// Set version and OS information
|
|
ms.telemetryCollector.SetVersion(version.VERSION_NUMBER)
|
|
ms.telemetryCollector.SetOS(runtime.GOOS + "/" + runtime.GOARCH)
|
|
|
|
// Start periodic telemetry collection (every 24 hours)
|
|
ms.telemetryCollector.StartPeriodicCollection(24 * time.Hour)
|
|
}
|
|
|
|
ms.guard = security.NewGuard(append(ms.option.WhiteList, whiteList...), signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
|
|
|
|
handleStaticResources2(r)
|
|
r.HandleFunc("/", ms.proxyToLeader(requestIDMiddleware(ms.uiStatusHandler)))
|
|
r.HandleFunc("/ui/index.html", requestIDMiddleware(ms.uiStatusHandler))
|
|
if !ms.option.DisableHttp {
|
|
r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.dirAssignHandler))))
|
|
r.HandleFunc("/dir/lookup", ms.guard.WhiteList(requestIDMiddleware(ms.dirLookupHandler)))
|
|
r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.dirStatusHandler))))
|
|
r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.collectionDeleteHandler))))
|
|
r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.volumeGrowHandler))))
|
|
r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.volumeStatusHandler))))
|
|
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(requestIDMiddleware(ms.volumeVacuumHandler))))
|
|
r.HandleFunc("/submit", ms.guard.WhiteList(requestIDMiddleware(ms.submitFromMasterServerHandler)))
|
|
r.HandleFunc("/collection/info", ms.guard.WhiteList(requestIDMiddleware(ms.collectionInfoHandler)))
|
|
/*
|
|
r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
|
|
r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
|
|
r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
|
|
*/
|
|
r.HandleFunc("/{fileId}", requestIDMiddleware(ms.redirectHandler))
|
|
}
|
|
|
|
ms.Topo.SetAdminServerConnectedFunc(ms.isAdminServerConnectedFunc)
|
|
ms.Topo.StartRefreshWritableVolumes(
|
|
ms.grpcDialOption,
|
|
ms.option.GarbageThreshold,
|
|
ms.option.MaxParallelVacuumPerServer,
|
|
topology.VolumeGrowStrategy.Threshold,
|
|
ms.preallocateSize,
|
|
)
|
|
|
|
ms.ProcessGrowRequest()
|
|
|
|
if !option.IsFollower {
|
|
ms.startAdminScripts()
|
|
}
|
|
|
|
return ms
|
|
}
|
|
|
|
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
|
|
var raftServerName string
|
|
|
|
ms.Topo.RaftServerAccessLock.Lock()
|
|
if raftServer.raftServer != nil {
|
|
ms.Topo.RaftServer = raftServer.raftServer
|
|
ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
|
|
glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
|
|
stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc()
|
|
if ms.Topo.RaftServer.Leader() != "" {
|
|
glog.V(0).Infof("[%s] %s becomes leader.", ms.Topo.RaftServer.Name(), ms.Topo.RaftServer.Leader())
|
|
ms.Topo.SetLastLeaderChangeTime(time.Now())
|
|
if ms.Topo.RaftServer.Leader() == ms.Topo.RaftServer.Name() {
|
|
go ms.ensureTopologyId()
|
|
}
|
|
}
|
|
})
|
|
raftServerName = fmt.Sprintf("[%s]", ms.Topo.RaftServer.Name())
|
|
} else if raftServer.RaftHashicorp != nil {
|
|
ms.Topo.HashicorpRaft = raftServer.RaftHashicorp
|
|
raftServerName = ms.Topo.HashicorpRaft.String()
|
|
}
|
|
ms.Topo.RaftServerAccessLock.Unlock()
|
|
|
|
if ms.Topo.IsLeader() {
|
|
// Seed the warmup timestamp so IsWarmingUp() is active even if the
|
|
// leader change event hasn't fired yet (e.g. node is already leader
|
|
// on startup). Followers don't need warmup state.
|
|
ms.Topo.SetLastLeaderChangeTime(time.Now())
|
|
glog.V(0).Infof("%s I am the leader!", raftServerName)
|
|
go ms.ensureTopologyId()
|
|
} else {
|
|
var raftServerLeader string
|
|
ms.Topo.RaftServerAccessLock.RLock()
|
|
if ms.Topo.RaftServer != nil {
|
|
raftServerLeader = ms.Topo.RaftServer.Leader()
|
|
} else if ms.Topo.HashicorpRaft != nil {
|
|
raftServerName = ms.Topo.HashicorpRaft.String()
|
|
raftServerLeaderAddr, _ := ms.Topo.HashicorpRaft.LeaderWithID()
|
|
raftServerLeader = string(raftServerLeaderAddr)
|
|
}
|
|
ms.Topo.RaftServerAccessLock.RUnlock()
|
|
glog.V(0).Infof("%s %s - is the leader.", raftServerName, raftServerLeader)
|
|
}
|
|
}
|
|
|
|
func (ms *MasterServer) syncRaftForTopologyId(topologyId string) error {
|
|
ms.Topo.RaftServerAccessLock.RLock()
|
|
defer ms.Topo.RaftServerAccessLock.RUnlock()
|
|
|
|
if ms.Topo.RaftServer != nil {
|
|
_, err := ms.Topo.RaftServer.Do(topology.NewMaxVolumeIdCommand(ms.Topo.GetMaxVolumeId(), topologyId))
|
|
return err
|
|
} else if ms.Topo.HashicorpRaft != nil {
|
|
b, err := json.Marshal(topology.NewMaxVolumeIdCommand(ms.Topo.GetMaxVolumeId(), topologyId))
|
|
if err != nil {
|
|
return fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %v", err)
|
|
}
|
|
if future := ms.Topo.HashicorpRaft.Apply(b, raftApplyTimeout); future.Error() != nil {
|
|
return future.Error()
|
|
}
|
|
return nil
|
|
}
|
|
return fmt.Errorf("no raft server configured")
|
|
}
|
|
|
|
func (ms *MasterServer) ensureTopologyId() {
|
|
ms.topologyIdGenLock.Lock()
|
|
defer ms.topologyIdGenLock.Unlock()
|
|
|
|
// Send a no-op command to ensure all previous logs are applied (barrier)
|
|
// This handles the case where log replay is still in progress
|
|
glog.V(1).Infof("ensureTopologyId: sending barrier command")
|
|
for {
|
|
if !ms.Topo.IsLeader() {
|
|
glog.V(1).Infof("lost leadership while sending barrier command for topologyId")
|
|
return
|
|
}
|
|
if err := ms.syncRaftForTopologyId(ms.Topo.GetTopologyId()); err != nil {
|
|
glog.Errorf("failed to sync raft for topologyId: %v, retrying in 1s", err)
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
glog.V(1).Infof("ensureTopologyId: barrier command completed")
|
|
|
|
if !ms.Topo.IsLeader() {
|
|
return
|
|
}
|
|
|
|
currentId := ms.Topo.GetTopologyId()
|
|
glog.V(1).Infof("ensureTopologyId: current TopologyId after barrier: %s", currentId)
|
|
|
|
prevId := ms.Topo.GetTopologyId()
|
|
|
|
EnsureTopologyId(ms.Topo, func() bool {
|
|
return ms.Topo.IsLeader()
|
|
}, func(topologyId string) error {
|
|
return ms.syncRaftForTopologyId(topologyId)
|
|
})
|
|
|
|
// If a new TopologyId was generated, take a snapshot so it survives
|
|
// raft state cleanup on future non-resume restarts.
|
|
if prevId == "" && ms.Topo.GetTopologyId() != "" {
|
|
ms.Topo.RaftServerAccessLock.RLock()
|
|
if ms.Topo.RaftServer != nil {
|
|
if err := ms.Topo.RaftServer.TakeSnapshot(); err != nil {
|
|
glog.Warningf("snapshot after TopologyId generation: %v", err)
|
|
} else {
|
|
glog.V(0).Infof("snapshot taken to persist TopologyId %s", ms.Topo.GetTopologyId())
|
|
}
|
|
}
|
|
// Hashicorp raft snapshots are handled automatically.
|
|
ms.Topo.RaftServerAccessLock.RUnlock()
|
|
}
|
|
}
|
|
|
|
func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
if ms.Topo.IsLeader() {
|
|
f(w, r)
|
|
return
|
|
}
|
|
|
|
// get the current raft leader
|
|
leaderAddr, _ := ms.Topo.MaybeLeader()
|
|
raftServerLeader := leaderAddr.ToHttpAddress()
|
|
if raftServerLeader == "" {
|
|
f(w, r)
|
|
return
|
|
}
|
|
|
|
// determine the scheme based on HTTPS client configuration
|
|
scheme := util_http.GetGlobalHttpClient().GetHttpScheme()
|
|
|
|
targetUrl, err := url.Parse(scheme + "://" + raftServerLeader)
|
|
if err != nil {
|
|
writeJsonError(w, r, http.StatusInternalServerError,
|
|
fmt.Errorf("Leader URL %s://%s Parse Error: %v", scheme, raftServerLeader, err))
|
|
return
|
|
}
|
|
|
|
// proxy to leader
|
|
glog.V(4).Infoln("proxying to leader", raftServerLeader, "using", scheme)
|
|
proxy := httputil.NewSingleHostReverseProxy(targetUrl)
|
|
proxy.Transport = util_http.GetGlobalHttpClient().GetClientTransport()
|
|
proxy.ServeHTTP(w, r)
|
|
}
|
|
}
|
|
|
|
func (ms *MasterServer) isAdminServerConnectedFunc() bool {
|
|
if ms == nil || ms.adminLocks == nil {
|
|
return false
|
|
}
|
|
_, _, isLocked := ms.adminLocks.isLocked(cluster.AdminServerPresenceLockName)
|
|
return isLocked
|
|
}
|
|
|
|
func (ms *MasterServer) startAdminScripts() {
|
|
v := util.GetViper()
|
|
adminScripts := v.GetString("master.maintenance.scripts")
|
|
if adminScripts == "" {
|
|
return
|
|
}
|
|
glog.V(0).Infof("adminScripts: %v", adminScripts)
|
|
|
|
sleepMinutes := v.GetFloat64("master.maintenance.sleep_minutes")
|
|
if sleepMinutes <= 0 {
|
|
sleepMinutes = float64(maintenance.DefaultMaintenanceSleepMinutes)
|
|
}
|
|
|
|
scriptLines := strings.Split(adminScripts, "\n")
|
|
if !strings.Contains(adminScripts, "lock") {
|
|
scriptLines = append(append([]string{}, "lock"), scriptLines...)
|
|
scriptLines = append(scriptLines, "unlock")
|
|
}
|
|
|
|
masterAddress := string(ms.option.Master)
|
|
|
|
var shellOptions shell.ShellOptions
|
|
shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master")
|
|
shellOptions.Masters = &masterAddress
|
|
|
|
shellOptions.Directory = "/"
|
|
emptyFilerGroup := ""
|
|
shellOptions.FilerGroup = &emptyFilerGroup
|
|
|
|
commandEnv := shell.NewCommandEnv(&shellOptions)
|
|
|
|
reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`)
|
|
|
|
go commandEnv.MasterClient.KeepConnectedToMaster(context.Background())
|
|
|
|
go func() {
|
|
for {
|
|
time.Sleep(time.Duration(sleepMinutes) * time.Minute)
|
|
if ms.Topo.IsLeader() && ms.MasterClient.GetMaster(context.Background()) != "" {
|
|
if ms.isAdminServerConnectedFunc() {
|
|
glog.V(1).Infof("Skipping master maintenance scripts because admin server is connected")
|
|
continue
|
|
}
|
|
shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroupName(*shellOptions.FilerGroup))
|
|
if shellOptions.FilerAddress == "" {
|
|
continue
|
|
}
|
|
for _, line := range scriptLines {
|
|
for _, c := range strings.Split(line, ";") {
|
|
processEachCmd(reg, c, commandEnv)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func processEachCmd(reg *regexp.Regexp, line string, commandEnv *shell.CommandEnv) {
|
|
cmds := reg.FindAllString(line, -1)
|
|
if len(cmds) == 0 {
|
|
return
|
|
}
|
|
args := make([]string, len(cmds[1:]))
|
|
for i := range args {
|
|
args[i] = strings.Trim(string(cmds[1+i]), "\"'")
|
|
}
|
|
cmd := cmds[0]
|
|
|
|
for _, c := range shell.Commands {
|
|
if c.Name() == cmd {
|
|
if c.HasTag(shell.ResourceHeavy) {
|
|
glog.Warningf("%s is resource heavy and should not run on master", cmd)
|
|
continue
|
|
}
|
|
glog.V(0).Infof("executing: %s %v", cmd, args)
|
|
if err := c.Do(args, commandEnv, os.Stdout); err != nil {
|
|
glog.V(0).Infof("error: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer {
|
|
var seq sequence.Sequencer
|
|
v := util.GetViper()
|
|
seqType := strings.ToLower(v.GetString(SequencerType))
|
|
glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType)
|
|
switch strings.ToLower(seqType) {
|
|
case "snowflake":
|
|
var err error
|
|
snowflakeId := v.GetInt(SequencerSnowflakeId)
|
|
seq, err = sequence.NewSnowflakeSequencer(string(option.Master), snowflakeId)
|
|
if err != nil {
|
|
glog.Error(err)
|
|
seq = nil
|
|
}
|
|
case "raft":
|
|
fallthrough
|
|
default:
|
|
seq = sequence.NewMemorySequencer()
|
|
}
|
|
return seq
|
|
}
|
|
|
|
func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
|
|
ms.Topo.RaftServerAccessLock.RLock()
|
|
defer ms.Topo.RaftServerAccessLock.RUnlock()
|
|
|
|
if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil {
|
|
return
|
|
}
|
|
glog.V(4).Infof("OnPeerUpdate: %+v", update)
|
|
|
|
peerAddress := pb.ServerAddress(update.Address)
|
|
peerName := raftServerID(peerAddress)
|
|
if ms.Topo.HashicorpRaft.State() != hashicorpRaft.Leader {
|
|
return
|
|
}
|
|
if update.IsAdd {
|
|
raftServerFound := false
|
|
for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
|
|
if string(server.ID) == peerName {
|
|
raftServerFound = true
|
|
}
|
|
}
|
|
if !raftServerFound {
|
|
glog.V(0).Infof("adding new raft server: %s", peerName)
|
|
ms.Topo.HashicorpRaft.AddVoter(
|
|
hashicorpRaft.ServerID(peerName),
|
|
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
|
|
}
|
|
} else {
|
|
pb.WithMasterClient(false, peerAddress, ms.grpcDialOption, true, func(client master_pb.SeaweedClient) error {
|
|
ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
|
|
defer cancel()
|
|
if _, err := client.Ping(ctx, &master_pb.PingRequest{Target: string(peerAddress), TargetType: cluster.MasterType}); err != nil {
|
|
glog.V(0).Infof("master %s didn't respond to pings. remove raft server", peerName)
|
|
if err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
|
|
_, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
|
|
Id: peerName,
|
|
Force: false,
|
|
})
|
|
return err
|
|
}); err != nil {
|
|
glog.Warningf("failed removing old raft server: %v", err)
|
|
return err
|
|
}
|
|
} else {
|
|
glog.V(0).Infof("master %s successfully responded to ping", peerName)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
}
|
|
|
|
func (ms *MasterServer) Shutdown() {
|
|
if ms.Topo == nil || ms.Topo.HashicorpRaft == nil {
|
|
return
|
|
}
|
|
if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader {
|
|
ms.Topo.HashicorpRaft.LeadershipTransfer()
|
|
}
|
|
ms.Topo.HashicorpRaft.Shutdown()
|
|
}
|
|
|
|
func (ms *MasterServer) Reload() {
|
|
glog.V(0).Infoln("Reload master server...")
|
|
|
|
util.LoadConfiguration("security", false)
|
|
v := util.GetViper()
|
|
ms.guard.UpdateWhiteList(append(ms.option.WhiteList,
|
|
util.StringSplit(v.GetString("guard.white_list"), ",")...),
|
|
)
|
|
}
|