switch to ReadAt() for thread-safe read
fix bugs during volume compaction
This commit is contained in:
@@ -129,8 +129,8 @@ func (t *Topology) DeleteCollection(collectionName string) {
|
||||
delete(t.collectionMap, collectionName)
|
||||
}
|
||||
|
||||
func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
|
||||
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(v, dn)
|
||||
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
|
||||
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn)
|
||||
}
|
||||
|
||||
func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) {
|
||||
@@ -144,7 +144,7 @@ func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo,
|
||||
dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
|
||||
dn.UpdateVolumes(volumeInfos)
|
||||
for _, v := range volumeInfos {
|
||||
t.RegisterVolumeLayout(&v, dn)
|
||||
t.RegisterVolumeLayout(v, dn)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -33,15 +33,22 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
|
||||
if _, ok := vl.vid2location[v.Id]; !ok {
|
||||
vl.vid2location[v.Id] = NewVolumeLocationList()
|
||||
}
|
||||
if vl.vid2location[v.Id].Add(dn) {
|
||||
if len(vl.vid2location[v.Id].list) == v.ReplicaPlacement.GetCopyCount() {
|
||||
if vl.isWritable(v) {
|
||||
vl.writables = append(vl.writables, v.Id)
|
||||
} else {
|
||||
vl.removeFromWritable(v.Id)
|
||||
}
|
||||
vl.vid2location[v.Id].Set(dn)
|
||||
glog.V(3).Infoln("volume", v.Id, "added to dn", dn, "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount())
|
||||
if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
|
||||
vl.AddToWritable(v.Id)
|
||||
} else {
|
||||
vl.removeFromWritable(v.Id)
|
||||
}
|
||||
}
|
||||
|
||||
func (vl *VolumeLayout) AddToWritable(vid storage.VolumeId) {
|
||||
for _, id := range vl.writables {
|
||||
if vid == id {
|
||||
return
|
||||
}
|
||||
}
|
||||
vl.writables = append(vl.writables, vid)
|
||||
}
|
||||
|
||||
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
|
||||
@@ -156,10 +163,9 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) b
|
||||
vl.accessLock.Lock()
|
||||
defer vl.accessLock.Unlock()
|
||||
|
||||
if vl.vid2location[vid].Add(dn) {
|
||||
if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() {
|
||||
return vl.setVolumeWritable(vid)
|
||||
}
|
||||
vl.vid2location[vid].Set(dn)
|
||||
if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() {
|
||||
return vl.setVolumeWritable(vid)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -18,14 +18,14 @@ func (dnll *VolumeLocationList) Length() int {
|
||||
return len(dnll.list)
|
||||
}
|
||||
|
||||
func (dnll *VolumeLocationList) Add(loc *DataNode) bool {
|
||||
for _, dnl := range dnll.list {
|
||||
if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
|
||||
return false
|
||||
func (dnll *VolumeLocationList) Set(loc *DataNode) {
|
||||
for i := 0; i < len(dnll.list); i++ {
|
||||
if loc.Ip == dnll.list[i].Ip && loc.Port == dnll.list[i].Port {
|
||||
dnll.list[i] = loc
|
||||
return
|
||||
}
|
||||
}
|
||||
dnll.list = append(dnll.list, loc)
|
||||
return true
|
||||
}
|
||||
|
||||
func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {
|
||||
|
||||
Reference in New Issue
Block a user