Merge pull request #1505 from ustuzhanin/remove_raft_state
Resume raft state
This commit is contained in:
2
go.mod
2
go.mod
@@ -11,7 +11,7 @@ require (
|
|||||||
github.com/aws/aws-sdk-go v1.33.5
|
github.com/aws/aws-sdk-go v1.33.5
|
||||||
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
|
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
|
||||||
github.com/cespare/xxhash v1.1.0
|
github.com/cespare/xxhash v1.1.0
|
||||||
github.com/chrislusf/raft v1.0.1
|
github.com/chrislusf/raft v1.0.2-0.20201002174524-b13c3bfdb011
|
||||||
github.com/coreos/go-semver v0.3.0 // indirect
|
github.com/coreos/go-semver v0.3.0 // indirect
|
||||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||||
github.com/disintegration/imaging v1.6.2
|
github.com/disintegration/imaging v1.6.2
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -69,6 +69,8 @@ github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
|||||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||||
github.com/chrislusf/raft v1.0.1 h1:Wa4ffkmkysW7cX3T/gMC/Mk3PhnOXhsqOVwQJcMndhw=
|
github.com/chrislusf/raft v1.0.1 h1:Wa4ffkmkysW7cX3T/gMC/Mk3PhnOXhsqOVwQJcMndhw=
|
||||||
github.com/chrislusf/raft v1.0.1/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
|
github.com/chrislusf/raft v1.0.1/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
|
||||||
|
github.com/chrislusf/raft v1.0.2-0.20201002174524-b13c3bfdb011 h1:vN1GvfLgDg8kIPCdhuVKAjlYpxG1B86jiKejB6MC/Q0=
|
||||||
|
github.com/chrislusf/raft v1.0.2-0.20201002174524-b13c3bfdb011/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
|
||||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||||
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa h1:OaNxuTZr7kxeODyLWsRMC+OD03aFUH+mW6r2d+MWa5Y=
|
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa h1:OaNxuTZr7kxeODyLWsRMC+OD03aFUH+mW6r2d+MWa5Y=
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ type MasterOptions struct {
|
|||||||
disableHttp *bool
|
disableHttp *bool
|
||||||
metricsAddress *string
|
metricsAddress *string
|
||||||
metricsIntervalSec *int
|
metricsIntervalSec *int
|
||||||
|
raftResumeState *bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@@ -59,6 +60,7 @@ func init() {
|
|||||||
m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
|
m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
|
||||||
m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address <host>:<port>")
|
m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address <host>:<port>")
|
||||||
m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
|
m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
|
||||||
|
m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server")
|
||||||
}
|
}
|
||||||
|
|
||||||
var cmdMaster = &Command{
|
var cmdMaster = &Command{
|
||||||
@@ -118,10 +120,10 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
|
|||||||
glog.Fatalf("Master startup error: %v", e)
|
glog.Fatalf("Master startup error: %v", e)
|
||||||
}
|
}
|
||||||
// start raftServer
|
// start raftServer
|
||||||
raftServer := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"),
|
raftServer, err := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"),
|
||||||
peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, 5)
|
peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, 5, *masterOption.raftResumeState)
|
||||||
if raftServer == nil {
|
if raftServer == nil {
|
||||||
glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *masterOption.metaFolder)
|
glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
|
||||||
}
|
}
|
||||||
ms.SetRaftServer(raftServer)
|
ms.SetRaftServer(raftServer)
|
||||||
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
|
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
|
||||||
|
|||||||
@@ -81,6 +81,7 @@ func init() {
|
|||||||
masterOptions.garbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
|
masterOptions.garbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
|
||||||
masterOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address")
|
masterOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address")
|
||||||
masterOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
|
masterOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
|
||||||
|
masterOptions.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server")
|
||||||
|
|
||||||
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
|
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
|
||||||
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
|
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
|
||||||
|
|||||||
@@ -28,7 +28,31 @@ type RaftServer struct {
|
|||||||
*raft.GrpcServer
|
*raft.GrpcServer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
|
type StateMachine struct {
|
||||||
|
raft.StateMachine
|
||||||
|
topo *topology.Topology
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s StateMachine) Save() ([]byte, error) {
|
||||||
|
state := topology.MaxVolumeIdCommand{
|
||||||
|
MaxVolumeId: s.topo.GetMaxVolumeId(),
|
||||||
|
}
|
||||||
|
glog.V(1).Infof("Save raft state %+v", state)
|
||||||
|
return json.Marshal(state)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s StateMachine) Recovery(data []byte) error {
|
||||||
|
state := topology.MaxVolumeIdCommand{}
|
||||||
|
err := json.Unmarshal(data, &state)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
glog.V(1).Infof("Recovery raft state %+v", state)
|
||||||
|
s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int, raftResumeState bool) (*RaftServer, error) {
|
||||||
s := &RaftServer{
|
s := &RaftServer{
|
||||||
peers: peers,
|
peers: peers,
|
||||||
serverAddr: serverAddr,
|
serverAddr: serverAddr,
|
||||||
@@ -46,26 +70,41 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
|
|||||||
transporter := raft.NewGrpcTransporter(grpcDialOption)
|
transporter := raft.NewGrpcTransporter(grpcDialOption)
|
||||||
glog.V(0).Infof("Starting RaftServer with %v", serverAddr)
|
glog.V(0).Infof("Starting RaftServer with %v", serverAddr)
|
||||||
|
|
||||||
// always clear previous metadata
|
if !raftResumeState {
|
||||||
os.RemoveAll(path.Join(s.dataDir, "conf"))
|
// always clear previous metadata
|
||||||
os.RemoveAll(path.Join(s.dataDir, "log"))
|
os.RemoveAll(path.Join(s.dataDir, "conf"))
|
||||||
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
|
os.RemoveAll(path.Join(s.dataDir, "log"))
|
||||||
|
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
|
||||||
|
}
|
||||||
|
if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
// Clear old cluster configurations if peers are changed
|
// Clear old cluster configurations if peers are changed
|
||||||
if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed {
|
if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed {
|
||||||
glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers)
|
glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, nil, topo, "")
|
stateMachine := StateMachine{topo: topo}
|
||||||
|
s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infoln(err)
|
glog.V(0).Infoln(err)
|
||||||
return nil
|
return nil, err
|
||||||
}
|
}
|
||||||
s.raftServer.SetHeartbeatInterval(500 * time.Millisecond)
|
s.raftServer.SetHeartbeatInterval(500 * time.Millisecond)
|
||||||
s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond)
|
s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond)
|
||||||
s.raftServer.Start()
|
if err := s.raftServer.LoadSnapshot(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := s.raftServer.Start(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
for _, peer := range s.peers {
|
for _, peer := range s.peers {
|
||||||
s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer))
|
if err := s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
|
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
|
||||||
@@ -81,13 +120,13 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
|
|||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infoln(err)
|
glog.V(0).Infoln(err)
|
||||||
return nil
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
|
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
|
||||||
|
|
||||||
return s
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *RaftServer) Peers() (members []string) {
|
func (s *RaftServer) Peers() (members []string) {
|
||||||
|
|||||||
Reference in New Issue
Block a user