add remove volumes with version info
This commit is contained in:
@@ -189,7 +189,6 @@ func (t *Topology) IncrementalSyncDataNodeRegistration(newVolumes, deletedVolume
|
||||
continue
|
||||
}
|
||||
newVis = append(newVis, vi)
|
||||
t.RegisterVolumeLayout(vi, dn)
|
||||
}
|
||||
for _, v := range deletedVolumes {
|
||||
vi, err := storage.NewVolumeInfoFromShort(v)
|
||||
@@ -198,8 +197,15 @@ func (t *Topology) IncrementalSyncDataNodeRegistration(newVolumes, deletedVolume
|
||||
continue
|
||||
}
|
||||
oldVis = append(oldVis, vi)
|
||||
t.UnRegisterVolumeLayout(vi, dn)
|
||||
}
|
||||
dn.DeltaUpdateVolumes(newVis, oldVis)
|
||||
|
||||
for _, vi := range newVis {
|
||||
t.RegisterVolumeLayout(vi, dn)
|
||||
}
|
||||
for _, vi := range oldVis {
|
||||
t.UnRegisterVolumeLayout(vi, dn)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
|
||||
DeletedByteCount: 34524,
|
||||
ReadOnly: false,
|
||||
ReplicaPlacement: uint32(0),
|
||||
Version: uint32(1),
|
||||
Version: uint32(needle.CurrentVersion),
|
||||
Ttl: 0,
|
||||
}
|
||||
volumeMessages = append(volumeMessages, volumeMessage)
|
||||
@@ -66,17 +66,65 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
|
||||
DeletedByteCount: 345240,
|
||||
ReadOnly: false,
|
||||
ReplicaPlacement: uint32(0),
|
||||
Version: uint32(1),
|
||||
Version: uint32(needle.CurrentVersion),
|
||||
Ttl: 0,
|
||||
}
|
||||
volumeMessages = append(volumeMessages, volumeMessage)
|
||||
}
|
||||
topo.SyncDataNodeRegistration(volumeMessages, dn)
|
||||
|
||||
//rp, _ := storage.NewReplicaPlacementFromString("000")
|
||||
//layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL)
|
||||
//assert(t, "writables", len(layout.writables), volumeCount)
|
||||
|
||||
assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
|
||||
assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
|
||||
}
|
||||
|
||||
{
|
||||
volumeCount := 6
|
||||
newVolumeShortMessage := &master_pb.VolumeShortInformationMessage{
|
||||
Id: uint32(3),
|
||||
Collection: "",
|
||||
ReplicaPlacement: uint32(0),
|
||||
Version: uint32(needle.CurrentVersion),
|
||||
Ttl: 0,
|
||||
}
|
||||
topo.IncrementalSyncDataNodeRegistration(
|
||||
[]*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
|
||||
nil,
|
||||
dn)
|
||||
rp, _ := storage.NewReplicaPlacementFromString("000")
|
||||
layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL)
|
||||
assert(t, "writables after repeated add", len(layout.writables), volumeCount)
|
||||
|
||||
assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
|
||||
assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
|
||||
|
||||
topo.IncrementalSyncDataNodeRegistration(
|
||||
nil,
|
||||
[]*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
|
||||
dn)
|
||||
assert(t, "writables after deletion", len(layout.writables), volumeCount-1)
|
||||
assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount-1)
|
||||
assert(t, "volumeCount", int(topo.volumeCount), volumeCount-1)
|
||||
|
||||
topo.IncrementalSyncDataNodeRegistration(
|
||||
[]*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
|
||||
nil,
|
||||
dn)
|
||||
|
||||
for vid, _ := range layout.vid2location{
|
||||
println("after add volume id", vid)
|
||||
}
|
||||
for _, vid := range layout.writables{
|
||||
println("after add writable volume id", vid)
|
||||
}
|
||||
|
||||
assert(t, "writables after add back", len(layout.writables), volumeCount)
|
||||
|
||||
}
|
||||
|
||||
topo.UnRegisterDataNode(dn)
|
||||
|
||||
assert(t, "activeVolumeCount2", int(topo.activeVolumeCount), 0)
|
||||
@@ -112,6 +160,7 @@ func TestAddRemoveVolume(t *testing.T) {
|
||||
|
||||
dn.UpdateVolumes([]storage.VolumeInfo{v})
|
||||
topo.RegisterVolumeLayout(v, dn)
|
||||
topo.RegisterVolumeLayout(v, dn)
|
||||
|
||||
if _, hasCollection := topo.FindCollection(v.Collection); !hasCollection {
|
||||
t.Errorf("collection %v should exist", v.Collection)
|
||||
|
||||
@@ -50,7 +50,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
|
||||
vl.accessLock.Lock()
|
||||
defer vl.accessLock.Unlock()
|
||||
|
||||
if _, ok := vl.vid2location[v.Id]; !ok {
|
||||
if _, ok := vl.vid2location[v.Id]; !ok || vl.vid2location[v.Id] == nil {
|
||||
vl.vid2location[v.Id] = NewVolumeLocationList()
|
||||
}
|
||||
vl.vid2location[v.Id].Set(dn)
|
||||
@@ -58,7 +58,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
|
||||
for _, dn := range vl.vid2location[v.Id].list {
|
||||
if vInfo, err := dn.GetVolumesById(v.Id); err == nil {
|
||||
if vInfo.ReadOnly {
|
||||
glog.V(3).Infof("vid %d removed from writable", v.Id)
|
||||
glog.V(1).Infof("vid %d removed from writable", v.Id)
|
||||
vl.removeFromWritable(v.Id)
|
||||
vl.readonlyVolumes[v.Id] = true
|
||||
return
|
||||
@@ -66,7 +66,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
|
||||
delete(vl.readonlyVolumes, v.Id)
|
||||
}
|
||||
} else {
|
||||
glog.V(3).Infof("vid %d removed from writable", v.Id)
|
||||
glog.V(1).Infof("vid %d removed from writable", v.Id)
|
||||
vl.removeFromWritable(v.Id)
|
||||
delete(vl.readonlyVolumes, v.Id)
|
||||
return
|
||||
@@ -93,7 +93,8 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
|
||||
defer vl.accessLock.Unlock()
|
||||
|
||||
vl.removeFromWritable(v.Id)
|
||||
delete(vl.vid2location, v.Id)
|
||||
delete(vl.vid2location, v.Id) // somehow this line does not work as expected
|
||||
// vl.vid2location[v.Id] = nil
|
||||
}
|
||||
|
||||
func (vl *VolumeLayout) addToWritable(vid needle.VolumeId) {
|
||||
|
||||
Reference in New Issue
Block a user