Fix no more writable volumes by delay judgment (#4548)

* fix nomore writables volumes while disk free space is sufficient by time delay

* reset

---------

Co-authored-by: wang wusong <wangwusong@virtaitech.com>
This commit is contained in:
wusong
2023-06-06 01:17:21 +08:00
committed by GitHub
parent fb4b61036c
commit 26f15d0079
22 changed files with 1277 additions and 1103 deletions

View File

@@ -6,6 +6,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/stats"
@@ -249,15 +250,28 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
dn := c.(*DataNode) //can not cast n to DataNode
dn.RLock()
for _, v := range dn.GetVolumes() {
topo := n.GetTopology()
diskType := types.ToDiskType(v.DiskType)
vl := topo.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
if v.Size >= volumeSizeLimit {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
n.GetTopology().chanFullVolumes <- v
vl.accessLock.RLock()
vacuumTime, ok := vl.vacuumedVolumes[v.Id]
vl.accessLock.RUnlock()
// If a volume has been vacuumed in the past 20 seconds, we do not check whether it has reached full capacity.
// After 20s(grpc timeout), theoretically all the heartbeats of the volume server have reached the master,
// the volume size should be correct, not the size before the vacuum.
if !ok || time.Now().Add(-20*time.Second).After(vacuumTime) {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
topo.chanFullVolumes <- v
}
} else if float64(v.Size) > float64(volumeSizeLimit)*growThreshold {
n.GetTopology().chanCrowdedVolumes <- v
topo.chanCrowdedVolumes <- v
}
copyCount := v.ReplicaPlacement.GetCopyCount()
if copyCount > 1 {
if copyCount > len(n.GetTopology().Lookup(v.Collection, v.Id)) {
if copyCount > len(topo.Lookup(v.Collection, v.Id)) {
stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(1)
} else {
stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(0)

View File

@@ -123,14 +123,20 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, vacuumLocationList, locationList *VolumeLocationList) bool {
isCommitSuccess := true
isReadOnly := false
isFullCapacity := false
for _, dn := range vacuumLocationList.list {
glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
VolumeId: uint32(vid),
})
if resp != nil && resp.IsReadOnly {
isReadOnly = true
if resp != nil {
if resp.IsReadOnly {
isReadOnly = true
}
if resp.VolumeSize > t.volumeSizeLimit {
isFullCapacity = true
}
}
return err
})
@@ -157,8 +163,13 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
resp, err := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
VolumeId: uint32(vid),
})
if resp != nil && resp.IsReadOnly {
isReadOnly = true
if resp != nil {
if resp.IsReadOnly {
isReadOnly = true
}
if resp.VolumeSize > t.volumeSizeLimit {
isFullCapacity = true
}
}
return err
})
@@ -187,8 +198,13 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
dn.Unlock()
}
//record vacuum time of volume
vl.accessLock.Lock()
vl.vacuumedVolumes[vid] = time.Now()
vl.accessLock.Unlock()
for _, dn := range vacuumLocationList.list {
vl.SetVolumeAvailable(dn, vid, isReadOnly)
vl.SetVolumeAvailable(dn, vid, isReadOnly, isFullCapacity)
}
}
return isCommitSuccess

View File

@@ -3,12 +3,13 @@ package topology
import (
"errors"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
@@ -114,6 +115,7 @@ type VolumeLayout struct {
crowded map[needle.VolumeId]struct{}
readonlyVolumes *volumesBinaryState // readonly volumes
oversizedVolumes *volumesBinaryState // oversized volumes
vacuumedVolumes map[needle.VolumeId]time.Time
volumeSizeLimit uint64
replicationAsMin bool
accessLock sync.RWMutex
@@ -135,6 +137,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType
crowded: make(map[needle.VolumeId]struct{}),
readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()),
oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()),
vacuumedVolumes: make(map[needle.VolumeId]time.Time),
volumeSizeLimit: volumeSizeLimit,
replicationAsMin: replicationAsMin,
}
@@ -436,7 +439,7 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId)
}
return false
}
func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, isReadOnly bool) bool {
func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, isReadOnly, isFullCapacity bool) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
@@ -447,7 +450,7 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, is
vl.vid2location[vid].Set(dn)
if vInfo.ReadOnly || isReadOnly {
if vInfo.ReadOnly || isReadOnly || isFullCapacity {
return false
}