Boostrap persistent state for volume servers. (#7984)
This PR implements logic load/save persistent state information for storages associated with volume servers, and reporting state changes back to masters via heartbeat messages. More work ensues! See https://github.com/seaweedfs/seaweedfs/issues/7977 for details.
This commit is contained in:
@@ -16,6 +16,7 @@ import (
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/stats"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
@@ -25,6 +26,7 @@ import (
|
||||
|
||||
const (
|
||||
MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
|
||||
HEARTBEAT_CHAN_SIZE = 1024
|
||||
)
|
||||
|
||||
type ReadOption struct {
|
||||
@@ -69,6 +71,8 @@ type Store struct {
|
||||
rack string // optional information, overwriting master setting if exists
|
||||
connected bool
|
||||
NeedleMapKind NeedleMapKind
|
||||
State *State
|
||||
StateUpdateChan chan *volume_server_pb.VolumeServerState
|
||||
NewVolumesChan chan master_pb.VolumeShortInformationMessage
|
||||
DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
|
||||
NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage
|
||||
@@ -81,16 +85,31 @@ func (s *Store) String() (str string) {
|
||||
return
|
||||
}
|
||||
|
||||
func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, publicUrl string, id string, dirnames []string, maxVolumeCounts []int32,
|
||||
minFreeSpaces []util.MinFreeSpace, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType, ldbTimeout int64) (s *Store) {
|
||||
s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, GrpcPort: grpcPort, PublicUrl: publicUrl, Id: id, NeedleMapKind: needleMapKind}
|
||||
s.Locations = make([]*DiskLocation, 0)
|
||||
func NewStore(
|
||||
grpcDialOption grpc.DialOption,
|
||||
ip string, port int, grpcPort int, publicUrl string, id string,
|
||||
dirnames []string, maxVolumeCounts []int32, minFreeSpaces []util.MinFreeSpace,
|
||||
idxFolder string,
|
||||
needleMapKind NeedleMapKind,
|
||||
diskTypes []DiskType,
|
||||
ldbTimeout int64,
|
||||
) (s *Store) {
|
||||
s = &Store{
|
||||
grpcDialOption: grpcDialOption,
|
||||
Port: port,
|
||||
Ip: ip,
|
||||
GrpcPort: grpcPort,
|
||||
PublicUrl: publicUrl,
|
||||
Id: id,
|
||||
NeedleMapKind: needleMapKind,
|
||||
Locations: make([]*DiskLocation, 0),
|
||||
|
||||
s.NewVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 1024)
|
||||
s.DeletedVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 1024)
|
||||
|
||||
s.NewEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 1024)
|
||||
s.DeletedEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 1024)
|
||||
StateUpdateChan: make(chan *volume_server_pb.VolumeServerState, HEARTBEAT_CHAN_SIZE),
|
||||
NewVolumesChan: make(chan master_pb.VolumeShortInformationMessage, HEARTBEAT_CHAN_SIZE),
|
||||
DeletedVolumesChan: make(chan master_pb.VolumeShortInformationMessage, HEARTBEAT_CHAN_SIZE),
|
||||
NewEcShardsChan: make(chan master_pb.VolumeEcShardInformationMessage, HEARTBEAT_CHAN_SIZE),
|
||||
DeletedEcShardsChan: make(chan master_pb.VolumeEcShardInformationMessage, HEARTBEAT_CHAN_SIZE),
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < len(dirnames); i++ {
|
||||
@@ -130,8 +149,36 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int,
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
var err error
|
||||
s.State, err = NewState(idxFolder)
|
||||
if err != nil {
|
||||
glog.Fatalf("failed to resolve state for volume %s: %v", id, err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Store) LoadState() error {
|
||||
err := s.State.Load()
|
||||
if s.State.Pb != nil && err == nil {
|
||||
s.StateUpdateChan <- s.State.Pb
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) SaveState() error {
|
||||
if s.State.Pb == nil {
|
||||
glog.Warningf("tried to save empty state for store %s", s.Id)
|
||||
return nil
|
||||
}
|
||||
|
||||
err := s.State.Save()
|
||||
if s.State.Pb != nil && err == nil {
|
||||
s.StateUpdateChan <- s.State.Pb
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, ver needle.Version, MemoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error {
|
||||
rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
|
||||
if e != nil {
|
||||
@@ -144,6 +191,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap
|
||||
e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, ver, MemoryMapMaxSizeMb, diskType, ldbTimeout)
|
||||
return e
|
||||
}
|
||||
|
||||
func (s *Store) DeleteCollection(collection string) (e error) {
|
||||
for _, location := range s.Locations {
|
||||
e = location.DeleteCollectionFromDiskLocation(collection)
|
||||
|
||||
Reference in New Issue
Block a user