fix allocate reduplicated volumeId to different volume (#5811)

* fix allocate reduplicated volumeId to different volume

* only check barrier when read

---------

Co-authored-by: Yang Wang <yangwang@weride.ai>
This commit is contained in:
wyang
2024-07-27 12:48:36 +08:00
committed by GitHub
parent c1bffca246
commit 4b1f539ab8
4 changed files with 105 additions and 56 deletions

View File

@@ -50,8 +50,11 @@ type Topology struct {
RaftServer raft.Server
RaftServerAccessLock sync.RWMutex
HashicorpRaft *hashicorpRaft.Raft
UuidAccessLock sync.RWMutex
UuidMap map[string][]string
barrierLock sync.Mutex
barrierDone bool
UuidAccessLock sync.RWMutex
UuidMap map[string][]string
}
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
@@ -120,6 +123,42 @@ func (t *Topology) IsLeader() bool {
return false
}
func (t *Topology) IsLeaderAndCanRead() bool {
if t.RaftServer != nil {
return t.IsLeader()
} else if t.HashicorpRaft != nil {
return t.IsLeader() && t.DoBarrier()
} else {
return false
}
}
func (t *Topology) DoBarrier() bool {
t.barrierLock.Lock()
defer t.barrierLock.Unlock()
if t.barrierDone {
return true
}
glog.V(0).Infof("raft do barrier")
barrier := t.HashicorpRaft.Barrier(2 * time.Minute)
if err := barrier.Error(); err != nil {
glog.Errorf("failed to wait for barrier, error %s", err)
return false
}
t.barrierDone = true
glog.V(0).Infof("raft do barrier success")
return true
}
func (t *Topology) BarrierReset() {
t.barrierLock.Lock()
defer t.barrierLock.Unlock()
t.barrierDone = false
}
func (t *Topology) Leader() (l pb.ServerAddress, err error) {
exponentialBackoff := backoff.NewExponentialBackOff()
exponentialBackoff.InitialInterval = 100 * time.Millisecond
@@ -180,6 +219,10 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*
}
func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
if !t.IsLeaderAndCanRead() {
return 0, fmt.Errorf("as leader can not read yet")
}
vid := t.GetMaxVolumeId()
next := vid.Next()