this can compile now!!!
This commit is contained in:
@@ -2,13 +2,11 @@ package topology
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/types"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage/needle"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/storage"
|
||||
@@ -16,29 +14,26 @@ import (
|
||||
|
||||
type DataNode struct {
|
||||
NodeImpl
|
||||
volumes map[needle.VolumeId]storage.VolumeInfo
|
||||
Ip string
|
||||
Port int
|
||||
PublicUrl string
|
||||
LastSeen int64 // unix time in seconds
|
||||
ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo
|
||||
ecShardsLock sync.RWMutex
|
||||
}
|
||||
|
||||
func NewDataNode(id string) *DataNode {
|
||||
s := &DataNode{}
|
||||
s.id = NodeId(id)
|
||||
s.nodeType = "DataNode"
|
||||
s.volumes = make(map[needle.VolumeId]storage.VolumeInfo)
|
||||
s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
|
||||
s.NodeImpl.value = s
|
||||
return s
|
||||
dn := &DataNode{}
|
||||
dn.id = NodeId(id)
|
||||
dn.nodeType = "DataNode"
|
||||
dn.diskUsages = newDiskUsages()
|
||||
dn.children = make(map[NodeId]Node)
|
||||
dn.NodeImpl.value = dn
|
||||
return dn
|
||||
}
|
||||
|
||||
func (dn *DataNode) String() string {
|
||||
dn.RLock()
|
||||
defer dn.RUnlock()
|
||||
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl)
|
||||
return fmt.Sprintf("Node:%s, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.Ip, dn.Port, dn.PublicUrl)
|
||||
}
|
||||
|
||||
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
|
||||
@@ -47,37 +42,23 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO
|
||||
return dn.doAddOrUpdateVolume(v)
|
||||
}
|
||||
|
||||
func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
|
||||
if oldV, ok := dn.volumes[v.Id]; !ok {
|
||||
dn.volumes[v.Id] = v
|
||||
if v.DiskType == storage.SsdType {
|
||||
dn.UpAdjustSsdVolumeCountDelta(1)
|
||||
} else {
|
||||
dn.UpAdjustVolumeCountDelta(1)
|
||||
}
|
||||
if v.IsRemote() {
|
||||
dn.UpAdjustRemoteVolumeCountDelta(1)
|
||||
}
|
||||
if !v.ReadOnly {
|
||||
dn.UpAdjustActiveVolumeCountDelta(1)
|
||||
}
|
||||
dn.UpAdjustMaxVolumeId(v.Id)
|
||||
isNew = true
|
||||
} else {
|
||||
if oldV.IsRemote() != v.IsRemote() {
|
||||
if v.IsRemote() {
|
||||
dn.UpAdjustRemoteVolumeCountDelta(1)
|
||||
}
|
||||
if oldV.IsRemote() {
|
||||
dn.UpAdjustRemoteVolumeCountDelta(-1)
|
||||
}
|
||||
}
|
||||
isChangedRO = dn.volumes[v.Id].ReadOnly != v.ReadOnly
|
||||
dn.volumes[v.Id] = v
|
||||
func (dn *DataNode) getOrCreateDisk(diskType string) *Disk {
|
||||
c, found := dn.children[NodeId(diskType)]
|
||||
if !found {
|
||||
c = NewDisk(diskType)
|
||||
dn.LinkChildNode(c)
|
||||
}
|
||||
return
|
||||
disk := c.(*Disk)
|
||||
return disk
|
||||
}
|
||||
|
||||
func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
|
||||
disk := dn.getOrCreateDisk(v.DiskType)
|
||||
return disk.AddOrUpdateVolume(v)
|
||||
}
|
||||
|
||||
// UpdateVolumes detects new/deleted/changed volumes on a volume server
|
||||
// used in master to notify master clients of these changes.
|
||||
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) {
|
||||
|
||||
actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
|
||||
@@ -88,22 +69,26 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
|
||||
dn.Lock()
|
||||
defer dn.Unlock()
|
||||
|
||||
for vid, v := range dn.volumes {
|
||||
existingVolumes := dn.getVolumes()
|
||||
|
||||
for _, v := range existingVolumes {
|
||||
vid := v.Id
|
||||
if _, ok := actualVolumeMap[vid]; !ok {
|
||||
glog.V(0).Infoln("Deleting volume id:", vid)
|
||||
delete(dn.volumes, vid)
|
||||
disk := dn.getOrCreateDisk(v.DiskType)
|
||||
delete(disk.volumes, vid)
|
||||
deletedVolumes = append(deletedVolumes, v)
|
||||
if v.DiskType == storage.SsdType {
|
||||
dn.UpAdjustSsdVolumeCountDelta(-1)
|
||||
} else {
|
||||
dn.UpAdjustVolumeCountDelta(-1)
|
||||
}
|
||||
|
||||
deltaDiskUsages := newDiskUsages()
|
||||
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.DiskType(v.DiskType))
|
||||
deltaDiskUsage.volumeCount = -1
|
||||
if v.IsRemote() {
|
||||
dn.UpAdjustRemoteVolumeCountDelta(-1)
|
||||
deltaDiskUsage.remoteVolumeCount = -1
|
||||
}
|
||||
if !v.ReadOnly {
|
||||
dn.UpAdjustActiveVolumeCountDelta(-1)
|
||||
deltaDiskUsage.activeVolumeCount = -1
|
||||
}
|
||||
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
|
||||
}
|
||||
}
|
||||
for _, v := range actualVolumes {
|
||||
@@ -123,18 +108,19 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu
|
||||
defer dn.Unlock()
|
||||
|
||||
for _, v := range deletedVolumes {
|
||||
delete(dn.volumes, v.Id)
|
||||
if v.DiskType == storage.SsdType {
|
||||
dn.UpAdjustSsdVolumeCountDelta(-1)
|
||||
} else {
|
||||
dn.UpAdjustVolumeCountDelta(-1)
|
||||
}
|
||||
disk := dn.getOrCreateDisk(v.DiskType)
|
||||
delete(disk.volumes, v.Id)
|
||||
|
||||
deltaDiskUsages := newDiskUsages()
|
||||
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.DiskType(v.DiskType))
|
||||
deltaDiskUsage.volumeCount = -1
|
||||
if v.IsRemote() {
|
||||
dn.UpAdjustRemoteVolumeCountDelta(-1)
|
||||
deltaDiskUsage.remoteVolumeCount = -1
|
||||
}
|
||||
if !v.ReadOnly {
|
||||
dn.UpAdjustActiveVolumeCountDelta(-1)
|
||||
deltaDiskUsage.activeVolumeCount = -1
|
||||
}
|
||||
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
|
||||
}
|
||||
for _, v := range newVolumes {
|
||||
dn.doAddOrUpdateVolume(v)
|
||||
@@ -144,18 +130,26 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu
|
||||
|
||||
func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
|
||||
dn.RLock()
|
||||
for _, v := range dn.volumes {
|
||||
ret = append(ret, v)
|
||||
for _, c := range dn.children {
|
||||
disk := c.(*Disk)
|
||||
ret = append(ret, disk.GetVolumes()...)
|
||||
}
|
||||
dn.RUnlock()
|
||||
return ret
|
||||
}
|
||||
|
||||
func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) {
|
||||
func (dn *DataNode) GetVolumesById(id needle.VolumeId) (vInfo storage.VolumeInfo, err error) {
|
||||
dn.RLock()
|
||||
defer dn.RUnlock()
|
||||
vInfo, ok := dn.volumes[id]
|
||||
if ok {
|
||||
found := false
|
||||
for _, c := range dn.children {
|
||||
disk := c.(*Disk)
|
||||
vInfo, found = disk.volumes[id]
|
||||
if found {
|
||||
break
|
||||
}
|
||||
}
|
||||
if found {
|
||||
return vInfo, nil
|
||||
} else {
|
||||
return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
|
||||
@@ -193,31 +187,18 @@ func (dn *DataNode) Url() string {
|
||||
func (dn *DataNode) ToMap() interface{} {
|
||||
ret := make(map[string]interface{})
|
||||
ret["Url"] = dn.Url()
|
||||
ret["Volumes"] = dn.GetVolumeCount() + dn.GetSsdVolumeCount()
|
||||
ret["VolumeIds"] = dn.GetVolumeIds()
|
||||
ret["EcShards"] = dn.GetEcShardCount()
|
||||
ret["Max"] = dn.GetMaxVolumeCount() + dn.GetMaxSsdVolumeCount()
|
||||
ret["Free"] = dn.FreeSpace()
|
||||
ret["PublicUrl"] = dn.PublicUrl
|
||||
ret["Disks"] = dn.diskUsages.ToMap()
|
||||
return ret
|
||||
}
|
||||
|
||||
func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
|
||||
m := &master_pb.DataNodeInfo{
|
||||
Id: string(dn.Id()),
|
||||
VolumeCount: uint64(dn.GetVolumeCount()),
|
||||
MaxVolumeCount: uint64(dn.GetMaxVolumeCount()),
|
||||
MaxSsdVolumeCount: uint64(dn.GetMaxSsdVolumeCount()),
|
||||
SsdVolumeCount: uint64(dn.GetSsdVolumeCount()),
|
||||
FreeVolumeCount: uint64(dn.FreeSpace()),
|
||||
ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()),
|
||||
RemoteVolumeCount: uint64(dn.GetRemoteVolumeCount()),
|
||||
}
|
||||
for _, v := range dn.GetVolumes() {
|
||||
m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage())
|
||||
}
|
||||
for _, ecv := range dn.GetEcShards() {
|
||||
m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage())
|
||||
for _, c := range dn.Children() {
|
||||
disk := c.(*Disk)
|
||||
m.DiskInfos[string(disk.Id())] = disk.ToDiskInfo()
|
||||
}
|
||||
return m
|
||||
}
|
||||
@@ -226,11 +207,21 @@ func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
|
||||
func (dn *DataNode) GetVolumeIds() string {
|
||||
dn.RLock()
|
||||
defer dn.RUnlock()
|
||||
ids := make([]int, 0, len(dn.volumes))
|
||||
existingVolumes := dn.getVolumes()
|
||||
ids := make([]int, 0, len(existingVolumes))
|
||||
|
||||
for k := range dn.volumes {
|
||||
for k := range existingVolumes {
|
||||
ids = append(ids, int(k))
|
||||
}
|
||||
|
||||
return util.HumanReadableIntsMax(100, ids...)
|
||||
}
|
||||
|
||||
func (dn *DataNode) getVolumes() []storage.VolumeInfo {
|
||||
var existingVolumes []storage.VolumeInfo
|
||||
for _, c := range dn.children {
|
||||
disk := c.(*Disk)
|
||||
existingVolumes = append(existingVolumes, disk.GetVolumes()...)
|
||||
}
|
||||
return existingVolumes
|
||||
}
|
||||
Reference in New Issue
Block a user