* Fix master leader election startup issue Fixes #error-log-leader-not-selected-yet * Fix master leader election startup issue This change improves server address comparison using the 'Equals' method and handles recursion in topology leader lookup, resolving the 'leader not selected yet' error during master startup. * Merge user improvements: use MaybeLeader for non-blocking checks * not useful test * Address code review: optimize Equals, fix deadlock in IsLeader, safe access in Leader
507 lines
14 KiB
Go
507 lines
14 KiB
Go
package topology
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand/v2"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
|
|
backoff "github.com/cenkalti/backoff/v4"
|
|
|
|
hashicorpRaft "github.com/hashicorp/raft"
|
|
"github.com/seaweedfs/raft"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/sequence"
|
|
"github.com/seaweedfs/seaweedfs/weed/stats"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
type Topology struct {
|
|
vacuumLockCounter int64
|
|
NodeImpl
|
|
|
|
collectionMap *util.ConcurrentReadMap
|
|
ecShardMap map[needle.VolumeId]*EcShardLocations
|
|
ecShardMapLock sync.RWMutex
|
|
|
|
pulse int64
|
|
|
|
volumeSizeLimit uint64
|
|
replicationAsMin bool
|
|
isDisableVacuum bool
|
|
|
|
Sequence sequence.Sequencer
|
|
|
|
chanFullVolumes chan storage.VolumeInfo
|
|
chanCrowdedVolumes chan storage.VolumeInfo
|
|
|
|
Configuration *Configuration
|
|
|
|
RaftServer raft.Server
|
|
RaftServerAccessLock sync.RWMutex
|
|
HashicorpRaft *hashicorpRaft.Raft
|
|
barrierLock sync.Mutex
|
|
barrierDone bool
|
|
|
|
UuidAccessLock sync.RWMutex
|
|
UuidMap map[string][]string
|
|
|
|
topologyId string
|
|
topologyIdLock sync.RWMutex
|
|
|
|
LastLeaderChangeTime time.Time
|
|
}
|
|
|
|
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
|
|
t := &Topology{}
|
|
t.id = NodeId(id)
|
|
t.nodeType = "Topology"
|
|
t.NodeImpl.value = t
|
|
t.diskUsages = newDiskUsages()
|
|
t.children = make(map[NodeId]Node)
|
|
t.capacityReservations = newCapacityReservations()
|
|
t.collectionMap = util.NewConcurrentReadMap()
|
|
t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations)
|
|
t.pulse = int64(pulse)
|
|
t.volumeSizeLimit = volumeSizeLimit
|
|
t.replicationAsMin = replicationAsMin
|
|
|
|
t.Sequence = seq
|
|
|
|
t.chanFullVolumes = make(chan storage.VolumeInfo)
|
|
t.chanCrowdedVolumes = make(chan storage.VolumeInfo)
|
|
|
|
t.Configuration = &Configuration{}
|
|
|
|
return t
|
|
}
|
|
|
|
func (t *Topology) IsChildLocked() (bool, error) {
|
|
if t.IsLocked() {
|
|
return true, errors.New("topology is locked")
|
|
}
|
|
for _, dcNode := range t.Children() {
|
|
if dcNode.IsLocked() {
|
|
return true, fmt.Errorf("topology child %s is locked", dcNode.String())
|
|
}
|
|
for _, rackNode := range dcNode.Children() {
|
|
if rackNode.IsLocked() {
|
|
return true, fmt.Errorf("dc %s child %s is locked", dcNode.String(), rackNode.String())
|
|
}
|
|
for _, dataNode := range rackNode.Children() {
|
|
if dataNode.IsLocked() {
|
|
return true, fmt.Errorf("rack %s child %s is locked", rackNode.String(), dataNode.Id())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func (t *Topology) IsLeader() bool {
|
|
t.RaftServerAccessLock.RLock()
|
|
defer t.RaftServerAccessLock.RUnlock()
|
|
|
|
if t.RaftServer != nil {
|
|
if t.RaftServer.State() == raft.Leader {
|
|
return true
|
|
}
|
|
// Directly check leader to avoid re-acquiring lock via MaybeLeader()
|
|
leader := pb.ServerAddress(t.RaftServer.Leader())
|
|
if leader != "" {
|
|
if pb.ServerAddress(t.RaftServer.Name()).Equals(leader) {
|
|
return true
|
|
}
|
|
}
|
|
} else if t.HashicorpRaft != nil {
|
|
if t.HashicorpRaft.State() == hashicorpRaft.Leader {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (t *Topology) IsLeaderAndCanRead() bool {
|
|
if t.RaftServer != nil {
|
|
return t.IsLeader()
|
|
} else if t.HashicorpRaft != nil {
|
|
return t.IsLeader() && t.DoBarrier()
|
|
} else {
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (t *Topology) DoBarrier() bool {
|
|
t.barrierLock.Lock()
|
|
defer t.barrierLock.Unlock()
|
|
if t.barrierDone {
|
|
return true
|
|
}
|
|
|
|
glog.V(0).Infof("raft do barrier")
|
|
barrier := t.HashicorpRaft.Barrier(2 * time.Minute)
|
|
if err := barrier.Error(); err != nil {
|
|
glog.Errorf("failed to wait for barrier, error %s", err)
|
|
return false
|
|
|
|
}
|
|
|
|
t.barrierDone = true
|
|
glog.V(0).Infof("raft do barrier success")
|
|
return true
|
|
}
|
|
|
|
func (t *Topology) BarrierReset() {
|
|
t.barrierLock.Lock()
|
|
defer t.barrierLock.Unlock()
|
|
t.barrierDone = false
|
|
}
|
|
|
|
func (t *Topology) Leader() (l pb.ServerAddress, err error) {
|
|
exponentialBackoff := backoff.NewExponentialBackOff()
|
|
exponentialBackoff.InitialInterval = 100 * time.Millisecond
|
|
exponentialBackoff.MaxElapsedTime = 20 * time.Second
|
|
leaderNotSelected := errors.New("leader not selected yet")
|
|
l, err = backoff.RetryWithData(
|
|
func() (l pb.ServerAddress, err error) {
|
|
l, err = t.MaybeLeader()
|
|
if err == nil && l == "" {
|
|
// Thread-safe check if we are the leader
|
|
t.RaftServerAccessLock.RLock()
|
|
if t.RaftServer != nil && t.RaftServer.State() == raft.Leader {
|
|
l = pb.ServerAddress(t.RaftServer.Name())
|
|
}
|
|
t.RaftServerAccessLock.RUnlock()
|
|
|
|
if l != "" {
|
|
return l, nil
|
|
}
|
|
err = leaderNotSelected
|
|
}
|
|
return l, err
|
|
},
|
|
exponentialBackoff)
|
|
if err == leaderNotSelected {
|
|
l = ""
|
|
}
|
|
return l, err
|
|
}
|
|
|
|
func (t *Topology) MaybeLeader() (l pb.ServerAddress, err error) {
|
|
t.RaftServerAccessLock.RLock()
|
|
defer t.RaftServerAccessLock.RUnlock()
|
|
|
|
if t.RaftServer != nil {
|
|
l = pb.ServerAddress(t.RaftServer.Leader())
|
|
} else if t.HashicorpRaft != nil {
|
|
l = pb.ServerAddress(t.HashicorpRaft.Leader())
|
|
} else {
|
|
err = errors.New("Raft Server not ready yet!")
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) {
|
|
// maybe an issue if lots of collections?
|
|
if collection == "" {
|
|
for _, c := range t.collectionMap.Items() {
|
|
if list := c.(*Collection).Lookup(vid); list != nil {
|
|
return list
|
|
}
|
|
}
|
|
} else {
|
|
if c, ok := t.collectionMap.Find(collection); ok {
|
|
return c.(*Collection).Lookup(vid)
|
|
}
|
|
}
|
|
|
|
if locations, found := t.LookupEcShards(vid); found {
|
|
for _, loc := range locations.Locations {
|
|
dataNodes = append(dataNodes, loc...)
|
|
}
|
|
return dataNodes
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
|
|
if !t.IsLeaderAndCanRead() {
|
|
return 0, fmt.Errorf("as leader can not read yet")
|
|
|
|
}
|
|
vid := t.GetMaxVolumeId()
|
|
next := vid.Next()
|
|
|
|
t.RaftServerAccessLock.RLock()
|
|
defer t.RaftServerAccessLock.RUnlock()
|
|
|
|
if t.RaftServer != nil {
|
|
if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next, t.GetTopologyId())); err != nil {
|
|
return 0, err
|
|
}
|
|
} else if t.HashicorpRaft != nil {
|
|
b, err := json.Marshal(NewMaxVolumeIdCommand(next, t.GetTopologyId()))
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %+v", err)
|
|
}
|
|
if future := t.HashicorpRaft.Apply(b, time.Second); future.Error() != nil {
|
|
return 0, future.Error()
|
|
}
|
|
}
|
|
return next, nil
|
|
}
|
|
|
|
func (t *Topology) PickForWrite(requestedCount uint64, option *VolumeGrowOption, volumeLayout *VolumeLayout) (fileId string, count uint64, volumeLocationList *VolumeLocationList, shouldGrow bool, err error) {
|
|
var vid needle.VolumeId
|
|
vid, count, volumeLocationList, shouldGrow, err = volumeLayout.PickForWrite(requestedCount, option)
|
|
if err != nil {
|
|
return "", 0, nil, shouldGrow, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err)
|
|
}
|
|
if volumeLocationList == nil || volumeLocationList.Length() == 0 {
|
|
return "", 0, nil, shouldGrow, fmt.Errorf("%s available for collection:%s replication:%s ttl:%s", NoWritableVolumes, option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
|
|
}
|
|
nextFileId := t.Sequence.NextFileId(requestedCount)
|
|
fileId = needle.NewFileId(vid, nextFileId, rand.Uint32()).String()
|
|
return fileId, count, volumeLocationList, shouldGrow, nil
|
|
}
|
|
|
|
func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout {
|
|
return t.collectionMap.Get(collectionName, func() interface{} {
|
|
return NewCollection(collectionName, t.volumeSizeLimit, t.replicationAsMin)
|
|
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl, diskType)
|
|
}
|
|
|
|
func (t *Topology) ListCollections(includeNormalVolumes, includeEcVolumes bool) (ret []string) {
|
|
found := make(map[string]bool)
|
|
|
|
if includeNormalVolumes {
|
|
t.collectionMap.RLock()
|
|
for _, c := range t.collectionMap.Items() {
|
|
found[c.(*Collection).Name] = true
|
|
}
|
|
t.collectionMap.RUnlock()
|
|
}
|
|
|
|
if includeEcVolumes {
|
|
t.ecShardMapLock.RLock()
|
|
for _, ecVolumeLocation := range t.ecShardMap {
|
|
found[ecVolumeLocation.Collection] = true
|
|
}
|
|
t.ecShardMapLock.RUnlock()
|
|
}
|
|
|
|
for k := range found {
|
|
ret = append(ret, k)
|
|
}
|
|
slices.Sort(ret)
|
|
|
|
return ret
|
|
}
|
|
|
|
func (t *Topology) FindCollection(collectionName string) (*Collection, bool) {
|
|
c, hasCollection := t.collectionMap.Find(collectionName)
|
|
if !hasCollection {
|
|
return nil, false
|
|
}
|
|
return c.(*Collection), hasCollection
|
|
}
|
|
|
|
func (t *Topology) DeleteCollection(collectionName string) {
|
|
t.collectionMap.Delete(collectionName)
|
|
}
|
|
|
|
func (t *Topology) DeleteLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) {
|
|
collection, found := t.FindCollection(collectionName)
|
|
if !found {
|
|
return
|
|
}
|
|
collection.DeleteVolumeLayout(rp, ttl, diskType)
|
|
if len(collection.storageType2VolumeLayout.Items()) == 0 {
|
|
t.DeleteCollection(collectionName)
|
|
}
|
|
}
|
|
|
|
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
|
|
diskType := types.ToDiskType(v.DiskType)
|
|
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
|
|
vl.RegisterVolume(&v, dn)
|
|
vl.EnsureCorrectWritables(&v)
|
|
}
|
|
|
|
func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
|
|
glog.Infof("removing volume info: %+v from %v", v, dn.id)
|
|
if v.ReplicaPlacement.GetCopyCount() > 1 {
|
|
stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(0)
|
|
}
|
|
diskType := types.ToDiskType(v.DiskType)
|
|
volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
|
|
volumeLayout.UnRegisterVolume(&v, dn)
|
|
if volumeLayout.isEmpty() {
|
|
t.DeleteLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
|
|
}
|
|
}
|
|
|
|
func (t *Topology) DataCenterExists(dcName string) bool {
|
|
return dcName == "" || t.GetDataCenter(dcName) != nil
|
|
}
|
|
|
|
func (t *Topology) GetDataCenter(dcName string) (dc *DataCenter) {
|
|
t.RLock()
|
|
defer t.RUnlock()
|
|
for _, c := range t.children {
|
|
dc = c.(*DataCenter)
|
|
if string(dc.Id()) == dcName {
|
|
return dc
|
|
}
|
|
}
|
|
return dc
|
|
}
|
|
|
|
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
for _, c := range t.children {
|
|
dc := c.(*DataCenter)
|
|
if string(dc.Id()) == dcName {
|
|
return dc
|
|
}
|
|
}
|
|
dc := NewDataCenter(dcName)
|
|
t.doLinkChildNode(dc)
|
|
return dc
|
|
}
|
|
|
|
func (t *Topology) ListDataCenters() (dcs []string) {
|
|
t.RLock()
|
|
defer t.RUnlock()
|
|
for _, c := range t.children {
|
|
dcs = append(dcs, string(c.(*DataCenter).Id()))
|
|
}
|
|
return dcs
|
|
}
|
|
|
|
func (t *Topology) ListDCAndRacks() (dcs map[NodeId][]NodeId) {
|
|
t.RLock()
|
|
defer t.RUnlock()
|
|
dcs = make(map[NodeId][]NodeId)
|
|
for _, dcNode := range t.children {
|
|
dcNodeId := dcNode.(*DataCenter).Id()
|
|
for _, rackNode := range dcNode.Children() {
|
|
dcs[dcNodeId] = append(dcs[dcNodeId], rackNode.(*Rack).Id())
|
|
}
|
|
}
|
|
return dcs
|
|
}
|
|
|
|
func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) (newVolumes, deletedVolumes []storage.VolumeInfo) {
|
|
// convert into in memory struct storage.VolumeInfo
|
|
var volumeInfos []storage.VolumeInfo
|
|
for _, v := range volumes {
|
|
if vi, err := storage.NewVolumeInfo(v); err == nil {
|
|
volumeInfos = append(volumeInfos, vi)
|
|
} else {
|
|
glog.V(0).Infof("Fail to convert joined volume information: %v", err)
|
|
}
|
|
}
|
|
// find out the delta volumes
|
|
var changedVolumes []storage.VolumeInfo
|
|
newVolumes, deletedVolumes, changedVolumes = dn.UpdateVolumes(volumeInfos)
|
|
for _, v := range newVolumes {
|
|
t.RegisterVolumeLayout(v, dn)
|
|
}
|
|
for _, v := range deletedVolumes {
|
|
t.UnRegisterVolumeLayout(v, dn)
|
|
}
|
|
for _, v := range changedVolumes {
|
|
diskType := types.ToDiskType(v.DiskType)
|
|
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
|
|
vl.EnsureCorrectWritables(&v)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (t *Topology) IncrementalSyncDataNodeRegistration(newVolumes, deletedVolumes []*master_pb.VolumeShortInformationMessage, dn *DataNode) {
|
|
var newVis, oldVis []storage.VolumeInfo
|
|
for _, v := range newVolumes {
|
|
vi, err := storage.NewVolumeInfoFromShort(v)
|
|
if err != nil {
|
|
glog.V(0).Infof("NewVolumeInfoFromShort %v: %v", v, err)
|
|
continue
|
|
}
|
|
newVis = append(newVis, vi)
|
|
}
|
|
for _, v := range deletedVolumes {
|
|
vi, err := storage.NewVolumeInfoFromShort(v)
|
|
if err != nil {
|
|
glog.V(0).Infof("NewVolumeInfoFromShort %v: %v", v, err)
|
|
continue
|
|
}
|
|
oldVis = append(oldVis, vi)
|
|
}
|
|
dn.DeltaUpdateVolumes(newVis, oldVis)
|
|
|
|
for _, vi := range newVis {
|
|
t.RegisterVolumeLayout(vi, dn)
|
|
}
|
|
for _, vi := range oldVis {
|
|
t.UnRegisterVolumeLayout(vi, dn)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (t *Topology) DataNodeRegistration(dcName, rackName string, dn *DataNode) {
|
|
if dn.Parent() != nil {
|
|
return
|
|
}
|
|
// registration to topo
|
|
dc := t.GetOrCreateDataCenter(dcName)
|
|
rack := dc.GetOrCreateRack(rackName)
|
|
rack.LinkChildNode(dn)
|
|
glog.Infof("[%s] reLink To topo ", dn.Id())
|
|
}
|
|
|
|
func (t *Topology) DisableVacuum() {
|
|
glog.V(0).Infof("DisableVacuum")
|
|
t.isDisableVacuum = true
|
|
}
|
|
|
|
func (t *Topology) EnableVacuum() {
|
|
glog.V(0).Infof("EnableVacuum")
|
|
t.isDisableVacuum = false
|
|
}
|
|
|
|
func (t *Topology) GetTopologyId() string {
|
|
t.topologyIdLock.RLock()
|
|
defer t.topologyIdLock.RUnlock()
|
|
return t.topologyId
|
|
}
|
|
|
|
func (t *Topology) SetTopologyId(topologyId string) {
|
|
t.topologyIdLock.Lock()
|
|
defer t.topologyIdLock.Unlock()
|
|
if topologyId == "" {
|
|
return
|
|
}
|
|
if t.topologyId == "" {
|
|
t.topologyId = topologyId
|
|
return
|
|
}
|
|
if t.topologyId != topologyId {
|
|
glog.Fatalf("Split-brain detected! Current TopologyId is %s, but received %s. Stopping to prevent data corruption.", t.topologyId, topologyId)
|
|
}
|
|
}
|