ahead of time volume assignment
This commit is contained in:
@@ -27,6 +27,7 @@ type volumeState string
|
||||
const (
|
||||
readOnlyState volumeState = "ReadOnly"
|
||||
oversizedState = "Oversized"
|
||||
crowdedState = "Crowded"
|
||||
)
|
||||
|
||||
type stateIndicator func(copyState) bool
|
||||
@@ -106,7 +107,8 @@ type VolumeLayout struct {
|
||||
ttl *needle.TTL
|
||||
diskType types.DiskType
|
||||
vid2location map[needle.VolumeId]*VolumeLocationList
|
||||
writables []needle.VolumeId // transient array of writable volume id
|
||||
writables []needle.VolumeId // transient array of writable volume id
|
||||
crowded map[needle.VolumeId]interface{}
|
||||
readonlyVolumes *volumesBinaryState // readonly volumes
|
||||
oversizedVolumes *volumesBinaryState // oversized volumes
|
||||
volumeSizeLimit uint64
|
||||
@@ -127,6 +129,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType
|
||||
diskType: diskType,
|
||||
vid2location: make(map[needle.VolumeId]*VolumeLocationList),
|
||||
writables: *new([]needle.VolumeId),
|
||||
crowded: make(map[needle.VolumeId]interface{}),
|
||||
readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()),
|
||||
oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()),
|
||||
volumeSizeLimit: volumeSizeLimit,
|
||||
@@ -273,7 +276,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n
|
||||
|
||||
lenWriters := len(vl.writables)
|
||||
if lenWriters <= 0 {
|
||||
glog.V(0).Infoln("No more writable volumes!")
|
||||
//glog.V(0).Infoln("No more writable volumes!")
|
||||
return nil, 0, nil, errors.New("No more writable volumes!")
|
||||
}
|
||||
if option.DataCenter == "" {
|
||||
@@ -307,14 +310,13 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n
|
||||
return &vid, count, locationList, nil
|
||||
}
|
||||
|
||||
func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int {
|
||||
func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (active, crowded int) {
|
||||
vl.accessLock.RLock()
|
||||
defer vl.accessLock.RUnlock()
|
||||
|
||||
if option.DataCenter == "" {
|
||||
return len(vl.writables)
|
||||
return len(vl.writables), len(vl.crowded)
|
||||
}
|
||||
counter := 0
|
||||
for _, v := range vl.writables {
|
||||
for _, dn := range vl.vid2location[v].list {
|
||||
if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
|
||||
@@ -324,11 +326,15 @@ func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int {
|
||||
if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
|
||||
continue
|
||||
}
|
||||
counter++
|
||||
active++
|
||||
info, _ := dn.GetVolumesById(v)
|
||||
if float64(info.Size) > float64(vl.volumeSizeLimit)*option.Threshold() {
|
||||
crowded++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return counter
|
||||
return
|
||||
}
|
||||
|
||||
func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool {
|
||||
@@ -342,6 +348,7 @@ func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool {
|
||||
if toDeleteIndex >= 0 {
|
||||
glog.V(0).Infoln("Volume", vid, "becomes unwritable")
|
||||
vl.writables = append(vl.writables[0:toDeleteIndex], vl.writables[toDeleteIndex+1:]...)
|
||||
vl.removeFromCrowded(vid)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
@@ -408,6 +415,32 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool {
|
||||
return vl.removeFromWritable(vid)
|
||||
}
|
||||
|
||||
func (vl *VolumeLayout) removeFromCrowded(vid needle.VolumeId) {
|
||||
delete(vl.crowded, vid)
|
||||
}
|
||||
|
||||
func (vl *VolumeLayout) setVolumeCrowded(vid needle.VolumeId) {
|
||||
if _, ok := vl.crowded[vid]; !ok {
|
||||
vl.crowded[vid] = nil
|
||||
glog.V(0).Infoln("Volume", vid, "becomes crowded")
|
||||
}
|
||||
}
|
||||
|
||||
func (vl *VolumeLayout) SetVolumeCrowded(vid needle.VolumeId) {
|
||||
// since delete is guarded by accessLock.Lock(),
|
||||
// and is always called in sequential order,
|
||||
// RLock() should be safe enough
|
||||
vl.accessLock.RLock()
|
||||
defer vl.accessLock.RUnlock()
|
||||
|
||||
for _, v := range vl.writables {
|
||||
if v == vid {
|
||||
vl.setVolumeCrowded(vid)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (vl *VolumeLayout) ToMap() map[string]interface{} {
|
||||
m := make(map[string]interface{})
|
||||
m["replication"] = vl.rp.String()
|
||||
|
||||
Reference in New Issue
Block a user