volume layout for each replication level

This commit is contained in:
Chris Lu
2012-09-10 00:18:07 -07:00
parent 8684b0999d
commit 6daf221937
7 changed files with 190 additions and 29 deletions

View File

@@ -1,10 +1,50 @@
package storage
import (
)
import ()
type VolumeInfo struct {
Id VolumeId
Size int64
Id VolumeId
Size int64
ReplicationType ReplicationType
}
type ReplicationType int
const (
Copy00 = ReplicationType(00) // single copy
Copy01 = ReplicationType(01) // 2 copies, each on different racks, same data center
Copy10 = ReplicationType(10) // 2 copies, each on different data center
Copy11 = ReplicationType(11) // 3 copies, 2 on different racks and local data center, 1 on different data center
Copy20 = ReplicationType(20) // 3 copies, each on dffereint data center
LengthRelicationType = 5
)
func GetReplicationLevelIndex(v *VolumeInfo) int {
switch v.ReplicationType {
case Copy00:
return 0
case Copy01:
return 1
case Copy10:
return 2
case Copy11:
return 3
case Copy20:
return 4
}
return -1
}
func GetCopyCount(v *VolumeInfo) int {
switch v.ReplicationType {
case Copy00:
return 1
case Copy01:
return 2
case Copy10:
return 2
case Copy11:
return 3
case Copy20:
return 3
}
return 0
}

View File

@@ -8,6 +8,10 @@ import (
type DataNode struct {
NodeImpl
volumes map[storage.VolumeId]*storage.VolumeInfo
ip string
port int
publicUrl string
lastSeen int64 // unix time in seconds
}
func NewDataNode(id string) *DataNode {
@@ -17,12 +21,21 @@ func NewDataNode(id string) *DataNode {
s.volumes = make(map[storage.VolumeId]*storage.VolumeInfo)
return s
}
func (s *DataNode) CreateOneVolume(r int, vid storage.VolumeId) storage.VolumeId {
s.AddVolume(&storage.VolumeInfo{Id: vid, Size: 32 * 1024 * 1024 * 1024})
func (dn *DataNode) CreateOneVolume(r int, vid storage.VolumeId) storage.VolumeId {
dn.AddVolume(&storage.VolumeInfo{Id: vid, Size: 32 * 1024 * 1024 * 1024})
return vid
}
func (s *DataNode) AddVolume(v *storage.VolumeInfo) {
s.volumes[v.Id] = v
s.UpAdjustActiveVolumeCountDelta(1)
s.UpAdjustMaxVolumeId(v.Id)
func (dn *DataNode) AddVolume(v *storage.VolumeInfo) {
dn.volumes[v.Id] = v
dn.UpAdjustActiveVolumeCountDelta(1)
dn.UpAdjustMaxVolumeId(v.Id)
dn.GetTopology().RegisterVolume(v,dn)
}
func (dn *DataNode) GetTopology() *Topology {
p := dn.parent
for p.Parent()!=nil{
p = p.Parent()
}
t := p.(*Topology)
return t
}

View File

@@ -20,10 +20,11 @@ type Node interface {
setParent(Node)
LinkChildNode(node Node)
UnlinkChildNode(nodeId NodeId)
CollectWritableVolumes(freshThreshHold int64, volumeSizeLimit uint64) []storage.VolumeId
IsDataNode() bool
Children() map[NodeId]Node
Parent() Node
Parent() Node
}
type NodeImpl struct {
id NodeId
@@ -65,7 +66,7 @@ func (n *NodeImpl) Children() map[NodeId]Node {
return n.children
}
func (n *NodeImpl) Parent() Node {
return n.parent
return n.parent
}
func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) {
ret := false
@@ -146,3 +147,26 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
fmt.Println(n, "removes", node, "volumeCount =", n.activeVolumeCount)
}
}
func (n *NodeImpl) CollectWritableVolumes(freshThreshHold int64, volumeSizeLimit uint64) []storage.VolumeId {
var ret []storage.VolumeId
if n.IsRack() {
for _, c := range n.Children() {
dn := c.(*DataNode) //can not cast n to DataNode
if dn.lastSeen > freshThreshHold {
continue
}
for _, v := range dn.volumes {
if uint64(v.Size) < volumeSizeLimit {
ret = append(ret, v.Id)
}
}
}
} else {
for _, c := range n.Children() {
ret = append(ret, c.CollectWritableVolumes(freshThreshHold, volumeSizeLimit)...)
}
}
return ret
}

View File

@@ -3,37 +3,59 @@ package topology
import (
_ "fmt"
"math/rand"
"pkg/sequence"
"pkg/storage"
)
type Topology struct {
NodeImpl
//transient vid~servers mapping for each replication type
replicaType2VolumeLayout []*VolumeLayout
pulse int64
volumeSizeLimit uint64
sequence sequence.Sequencer
}
func NewTopology(id string) *Topology {
func NewTopology(id string, dirname string, filename string, volumeSizeLimit uint64, pulse int) *Topology {
t := &Topology{}
t.id = NodeId(id)
t.nodeType = "Topology"
t.children = make(map[NodeId]Node)
t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType)
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
t.sequence = sequence.NewSequencer(dirname, filename)
return t
}
func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, storage.VolumeId) {
vid := t.NextVolumeId()
ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid)
return ret, node, vid
return ret, node, vid
}
func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, storage.VolumeId) {
freeSpace := t.FreeSpace()
for _, node := range except {
freeSpace -= node.FreeSpace()
}
vid := t.NextVolumeId()
ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid)
return ret, node, vid
freeSpace := t.FreeSpace()
for _, node := range except {
freeSpace -= node.FreeSpace()
}
vid := t.NextVolumeId()
ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid)
return ret, node, vid
}
func (t *Topology) NextVolumeId() storage.VolumeId {
vid := t.GetMaxVolumeId()
return vid.Next()
}
func (t *Topology) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
replicationTypeIndex := storage.GetReplicationLevelIndex(v)
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(t.volumeSizeLimit, t.pulse)
}
t.replicaType2VolumeLayout[replicationTypeIndex].RegisterVolume(v, dn)
}

View File

@@ -0,0 +1,48 @@
package topology
import (
"errors"
"fmt"
"math/rand"
"pkg/storage"
)
type VolumeLayout struct {
vid2location map[storage.VolumeId]*DataNodeLocationList
writables []storage.VolumeId // transient array of writable volume id
pulse int64
volumeSizeLimit uint64
}
func NewVolumeLayout(volumeSizeLimit uint64, pulse int64) *VolumeLayout {
return &VolumeLayout{
vid2location: make(map[storage.VolumeId]*DataNodeLocationList),
writables: *new([]storage.VolumeId),
pulse: pulse,
volumeSizeLimit: volumeSizeLimit,
}
}
func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewDataNodeLocationList()
}
vl.vid2location[v.Id].Add(dn)
if len(vl.vid2location[v.Id].list) >= storage.GetCopyCount(v) {
vl.writables = append(vl.writables,v.Id)
}
}
func (vl *VolumeLayout) PickForWrite(count int) (int, *DataNodeLocationList, error) {
len_writers := len(vl.writables)
if len_writers <= 0 {
fmt.Println("No more writable volumes!")
return 0, nil, errors.New("No more writable volumes!")
}
vid := vl.writables[rand.Intn(len_writers)]
locationList := vl.vid2location[vid]
if locationList != nil {
return count, locationList, nil
}
return 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
}

View File

@@ -0,0 +1,21 @@
package topology
import (
)
type DataNodeLocationList struct {
list []*DataNode
}
func NewDataNodeLocationList() *DataNodeLocationList {
return &DataNodeLocationList{}
}
func (dnll *DataNodeLocationList) Add(loc *DataNode){
for _, dnl := range dnll.list {
if loc.ip == dnl.ip && loc.port == dnl.port {
break
}
}
dnll.list = append(dnll.list, loc)
}