grow volumes on volume servers with slots freed by cloud storage

This commit is contained in:
Chris Lu
2019-12-03 21:36:42 -08:00
parent 7ae8b1cc86
commit e426bd541e
11 changed files with 226 additions and 145 deletions

View File

@@ -2,14 +2,13 @@ package topology
import (
"fmt"
"strconv"
"sync"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"strconv"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
)
@@ -44,15 +43,26 @@ func (dn *DataNode) String() string {
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) {
dn.Lock()
defer dn.Unlock()
if _, ok := dn.volumes[v.Id]; !ok {
if oldV, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v
dn.UpAdjustVolumeCountDelta(1)
if v.IsRemote() {
dn.UpAdjustRemoteVolumeCountDelta(1)
}
if !v.ReadOnly {
dn.UpAdjustActiveVolumeCountDelta(1)
}
dn.UpAdjustMaxVolumeId(v.Id)
isNew = true
} else {
if oldV.IsRemote() != v.IsRemote() {
if v.IsRemote() {
dn.UpAdjustRemoteVolumeCountDelta(1)
}
if oldV.IsRemote() {
dn.UpAdjustRemoteVolumeCountDelta(-1)
}
}
dn.volumes[v.Id] = v
}
return
@@ -70,7 +80,12 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
delete(dn.volumes, vid)
deletedVolumes = append(deletedVolumes, v)
dn.UpAdjustVolumeCountDelta(-1)
dn.UpAdjustActiveVolumeCountDelta(-1)
if v.IsRemote() {
dn.UpAdjustRemoteVolumeCountDelta(-1)
}
if !v.ReadOnly {
dn.UpAdjustActiveVolumeCountDelta(-1)
}
}
}
dn.Unlock()
@@ -88,7 +103,12 @@ func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.Vol
for _, v := range deletedVolumes {
delete(dn.volumes, v.Id)
dn.UpAdjustVolumeCountDelta(-1)
dn.UpAdjustActiveVolumeCountDelta(-1)
if v.IsRemote() {
dn.UpAdjustRemoteVolumeCountDelta(-1)
}
if !v.ReadOnly {
dn.UpAdjustActiveVolumeCountDelta(-1)
}
}
dn.Unlock()
for _, v := range newlVolumes {
@@ -160,6 +180,7 @@ func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
MaxVolumeCount: uint64(dn.GetMaxVolumeCount()),
FreeVolumeCount: uint64(dn.FreeSpace()),
ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()),
RemoteVolumeCount: uint64(dn.GetRemoteVolumeCount()),
}
for _, v := range dn.GetVolumes() {
m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage())